一、阻塞队列
当阻塞队列是空,从队列中获取元素的操作会被阻塞
当阻塞队列是满,往队列中添加元素的操作会被阻塞
二、为什么用,有什么好处?
三、常见的阻塞队列
ArrayBlockingQueue由数组构成的有界阻塞队列.
LinkedBlockingQueue由链表构成的有界阻塞队列(默认值为Integer.MAX_VALUE)
public class BlockingQueueDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3); /** * 1、抛出异常 add()/remove() */ // System.out.println(blockingQueue.add("a")); // System.out.println(blockingQueue.add("b")); // System.out.println(blockingQueue.add("c")); // System.out.println(blockingQueue.add("d")); // System.out.println(blockingQueue.element()); //检查队首元素 // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); // System.out.println(blockingQueue.remove()); /** * 2、返回布尔类型 offer()/pull() */ // System.out.println(blockingQueue.offer("a")); // System.out.println(blockingQueue.offer("b")); // System.out.println(blockingQueue.offer("c")); // System.out.println(blockingQueue.offer("d")); // // System.out.println(blockingQueue.peek()); //检查队首元素 // // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); // System.out.println(blockingQueue.poll()); /** * 3、阻塞 put()/take() */ // blockingQueue.put("a"); // blockingQueue.put("b"); // blockingQueue.put("c"); // System.out.println("############"); // blockingQueue.put("d"); // // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); /** *4、超时 */ System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c",2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d",2L, TimeUnit.SECONDS)); } }
SynchronousQueue是一个不存储元素的阻塞队列,也即单个元素的队列
public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + " put 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() + " put 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() + " put 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "AAA").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() + " take " + blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "BBB").start(); } }
四、BlockQueue的核心方法
五、使用场景
1、生产者-消费者模式
问题: 一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮
1)传统版的生产者-消费者模式
/** * Created by wujuhong on 2019/7/3. * 传统版的生产者消费者模式 */ public class ProductConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "AA").start(); new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "BB").start(); } } class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws InterruptedException { lock.lock(); try { //1、判断 while (number != 0) { //等待,不能生产 condition.await(); } //2、干活 number++; System.out.println(Thread.currentThread().getName() + " " + number); //3、通知唤醒 condition.signalAll(); } finally { lock.unlock(); } } public void decrement() throws InterruptedException { lock.lock(); try { //1、判断 while (number == 0) { //等待,不能生产 condition.await(); } //2、干活 number--; System.out.println(Thread.currentThread().getName() + " " + number); //3、通知唤醒 condition.signalAll(); } finally { lock.unlock(); } } }
2)阻塞队列的生产者-消费者模式
class MyResource { private volatile boolean FLAG = true; //默认生产,进行生产+消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } public void myProduct() throws InterruptedException { String data = ""; boolean returnValue; while (FLAG) { data = atomicInteger.incrementAndGet() + ""; returnValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (returnValue) { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + " 插入队列" + data + "失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 生产动作结束,FLAG = false"); } public void myConsume() throws InterruptedException { String result = ""; while (FLAG) { result = blockingQueue.poll(2, TimeUnit.SECONDS); if (null == result || result.equalsIgnoreCase("")) { FLAG = false; System.out.println(Thread.currentThread().getName() + " 超过2s钟没有取到蛋糕,消费队列退出"); return; } System.out.println(Thread.currentThread().getName() + " 消费队列" + result + "成功"); TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + " 消费动作结束,FLAG = false"); } public void stop() { this.FLAG = false; } } public class ProductConsumer_BlockQueueDemo { public static void main(String[] args) throws InterruptedException { MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 生产线程启动"); try { myResource.myProduct(); } catch (InterruptedException e) { e.printStackTrace(); } }, "product").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 消费线程启动"); try { myResource.myConsume(); } catch (InterruptedException e) { e.printStackTrace(); } }, "consume").start(); TimeUnit.SECONDS.sleep(5); System.out.println("5秒钟时间到,main线程叫停,活动结束"); myResource.stop(); } }
2、线程池
3、消息中间件
阻塞队列有没有好的一面?
不得不阻塞,你如何管理?
原文:https://www.cnblogs.com/wjh123/p/11123579.html