使用BlockingQuery实现生产者者消费者:考虑并发,解耦。
生产者消费者模式是面向过程的设计模式。
生产者制造数据 ------》 生产者把数据放入缓冲区 -------》 消费者把数据取出缓冲区 --------》相当于消费者处理数据
BlockingQuery学习
支持两个附加操作的 Queue
,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
插入 | add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 | remove() |
poll() |
take() |
poll(time, unit) |
检查 | element() |
peek() |
不可用 | 不可用 |
生产者code
package com.liruilong.concurrent.Producer_Consuner; import javax.swing.text.StyledEditorKit; import java.util.concurrent.BlockingQueue; /** * @Description : 生产者 * @Author: Liruilong * @Date: 2019/8/22 7:24 */ public class Producer implements Runnable{ private find BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { try { String temp = "产品:"+Thread.currentThread().getName(); System.out.println("生产产品: "+Thread.currentThread().getName()); queue.put(temp); //队列已满,阻塞队列。 }catch (InterruptedException e){ e.printStackTrace(); } } }
消费者code
package com.liruilong.concurrent.Producer_Consuner; import java.util.concurrent.BlockingQueue; /** * @Description : 消费者 * @Author: Liruilong * @Date: 2019/8/22 7:56 */ public class Consumer implements Runnable{ private find BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { try { // 队列为空,阻塞当前线程 String temp = queue.take(); System.out.println("消费产品:" + temp); }catch (InterruptedException e){ e.printStackTrace(); } } }
测试
package com.liruilong.concurrent.Producer_Consuner; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * @Description : 生产者消费者测试 * @Author: Liruilong * @Date: 2019/8/22 8:01 */ public class Test { public static void main(String[] args) { // 一个基于已链接节点的、任选范围的阻塞双端队列。 BlockingQueue<String> query = new LinkedBlockingQueue<>(2); Consumer consumer = new Consumer(query); Producer producer = new Producer(query); for (int i = 0; i < 5; i ++){ new Thread(producer,"Producer" + (i + 1)).start(); new Thread(consumer, "Consumer" + (i + 1)).start(); } } }
原文:https://www.cnblogs.com/liruilong/p/11392293.html