生产者消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。
这里我们实现如下的情况的生产-消费模型:
生产者不断交替地生产两组数据“姓名--1-->内容--1”,“姓名--2-->内容--2”,这里的“姓名--1”和“姓名--2”模拟为数据的名称,“内容--1 ”和“内容--2 ”模拟为数据的内容。
由于本程序中牵扯到线程运行的不确定性,因此可能会出现以下问题:
1.假设生产者线程刚向数据存储空间添加了数据的名称,还没有加入该信息的内容,程序就切换到了消费者线程,消费者线程把信息的名称和上一个信息的内容联系到了一起;
2.生产者生产了若干条数据,消费者才可以取数据,或者是,消费者取完一次数据后,还没等生产者放入新的数据,又重复取出了已取过的数据。
通过分析我们可知:
第一个问题可以通过同步来解决,第二个问题就需要用到线程通信。生产者线程放入数据后,通知消费者线程取出数据,消费者线程取出数据后,通知生产者线程生产数据,这里用wait\notigy机制来实现。
package thread; public class Info { private String name = "name"; private String content = "content"; //设置标志位,用来进行线程通信 private boolean flag =true; /** * 设置消息,此处用到线程同步 * @param name * @param content */ public synchronized void set(String name,String content) { while (!flag) { try { super.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name; //设置名称 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } this.content=content; //设置内容 flag =false; //设置标志位,表示现在生产停止,可以取走! } public synchronized void get() { while (flag) { try { super.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name + " --> " + content) ; flag = true ; // 改变标志位,表示可以生产 super.notify(); } }
public class Producer implements Runnable { private Info info=null; public Producer(Info info) { this.info=info; } @Override public void run() { boolean flag = true ; // 定义标记位 for(int i=0;i<10;i++){ if(flag){ this.info.set("姓名--1","内容--1") ; // 设置名称 flag = false ; }else{ this.info.set("姓名--2","内容--2") ; // 设置名称 flag = true ; } } } }
public class Consumer implements Runnable { private Info info = null ; public Consumer(Info info){ this.info = info ; } public void run(){ for(int i=0;i<10;i++){ this.info.get() ; } } public static void main(String[] args) { Info info = new Info(); // 实例化Info对象 Producer pro = new Producer(info) ; // 生产者 Consumer con = new Consumer(info) ; // 消费者 new Thread(pro).start() ; //启动了生产者线程后,再启动消费者线程 try{ Thread.sleep(500) ; }catch(InterruptedException e){ e.printStackTrace() ; } new Thread(con).start() ; } }
BlockingQueue
任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题。Java通过BlockingQueue
提供了开箱即用的支持来控制这些方法的调用(一个线程创建资源,另一个消费资源)。java.util.concurrent
包下的BlockingQueue
接口是一个线程安全的可用于存取对象的队列。
BlockingQueue是一种数据结构,支持一个线程往里存资源,另一个线程从里取资源。这正是解决生产者消费者问题所需要的,那么让我们开始解决该问题吧。
public class InfoPlus { private String name = "name"; private String content = "content"; public InfoPlus(String name, String content) { this.name = name; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return "InfoPlus{" + "name=‘" + name + ‘\‘‘ + ", content=‘" + content + ‘\‘‘ + ‘}‘; } }
import java.util.concurrent.BlockingQueue; public class ProducerPlus implements Runnable { private BlockingQueue<InfoPlus> queue; public ProducerPlus(BlockingQueue<InfoPlus> queue) { this.queue = queue; } @Override public void run() { for (int i=0;i<10;i++) { try { Thread.sleep(1000); queue.put(new InfoPlus("name"+i,"content"+i)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; public class ConsumerPlus implements Runnable{ private BlockingQueue<InfoPlus> queue; public ConsumerPlus(BlockingQueue<InfoPlus> queue) { this.queue = queue; } public void run() { while (true) { try { System.out.println(this.queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { BlockingQueue<InfoPlus> blockingQueue = new LinkedBlockingDeque<>(); ProducerPlus producerPlus = new ProducerPlus(blockingQueue); ConsumerPlus consumerPlus = new ConsumerPlus(blockingQueue); ConsumerPlus consumerPlus1 = new ConsumerPlus(blockingQueue); new Thread(producerPlus).start(); new Thread(consumerPlus).start(); new Thread(consumerPlus1).start(); } }
原文:https://www.cnblogs.com/MrSaver/p/9409838.html