问题描述:一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能够并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将生产的产品放入一个缓冲区内,消费者可以从缓冲区中取走产品进行消费;
生产者和消费者必须保持同步,不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经投放了产品的缓冲区再次投放产品;
代码实现:
1).wait()和notify()方式来实现
public class Storage{ //缓冲区最大承载量 private final int MAX_SIZE = 100; //缓冲区存储的载体 private LinkedList<Object> list = new LinkedList<Object>(); /** * * 生产num个产品 */ public void produce(int num){ //获取锁 synchronized(list){ //缓存区剩余容量不足 while(list.size() + num > MAX_SIZE){ System.out.println("要生产的产品数量:"+num+"/t 库存量:"+list.size+"/t 暂时不能执行生产任务!"); try{ //由于条件不满足,生产阻塞 list.wait() }catch(InterruptedException e){ e.printStackTrace(); } } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ list.add(new Object()); } System.out.println("已经生产产品数:"+num+"/t 现仓储量为:"+list.size()); list.notifyAll(); } } /** * * 消费num个产品 */ public void consume(int num){ //获取锁 synchronized(list){ //缓存区剩余容量不足 while(list.size() < num){ System.out.println("要消费的产品数量:"+num+"/t 库存量:"+list.size+"/t 暂时不能执行消费任务!"); try{ //由于条件不满足,消费阻塞 list.wait() }catch(InterruptedException e){ e.printStackTrace(); } } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ list.remove(); } System.out.println("已经消费产品数:"+num+"/t 现仓储量为:"+list.size()); list.notifyAll(); } } //list getter//setter //MAX_SIZE getter }
public class Producer extends Thread{ //每次生产的产品数量 private int num; //仓库对象 private Storage storage; //构造函数,设置仓库 public Producer(Storage storage){ this.storage = storage; } //线程run函数 public void run(){ produce(); } //调用仓库的生产方法 public void produce(){ storage.produce(); } //num storage的getter(),setter()方法 }
/** * * 消费者线程类 */ public class Consumer extends Thread{ private int num; private Storage storage; public void Consumer(Storage storage){ this.storage = storage; } public void run(){ consume(); } public void consume(){ storage.consume(); } //num和storage的getter(),setter()方法 }
public class Test{ public static void main(String args[]){ //仓库对象 Storage storage = new Storage(); //生产者对象 Producer p1 = new Producer(); Producer p2 = new Producer(); Producer p3 = new Producer(); Producer p4 = new Producer(); Producer p5 = new Producer(); Producer p6 = new Producer(); //消费者对象 Consumer c1 = new Consumer(); Consumer c2 = new Consumer(); Consumer c3 = new Consumer(); Consumer c4 = new Consumer(); Consumer c5 = new Consumer(); //设置生产者生产数量 p1.setNum(10); p2.setNum(10); p3.setNum(10); p4.setNum(40); p5.setNum(50); p6.setNum(30); //设置消费者消费数量 c1.setNum(10); c2.setNum(20); c3.setNum(10); c4.setNum(30); c5.setNum(10); //执行线程 p1.start(); p2.start(); p3.start(); p4.start(); p5.start(); p6.start(); c1.start(); c2.start(); c3.start(); c4.start(); c5.start(); } }
2).await()和signal()方式实现
public class Storage{ //缓冲区最大承载量 private final int MAX_SIZE = 100; //缓冲区存储的载体 private LinkedList<Object> list = new LinkedList<Object>(); //锁 private final Lock lock = new ReentrantLock(); //仓库满的条件变量 private final Condition full = lock.newCondition(); //仓库空的条件变量 private final Condition empty = lock.newCondition(); /** * * 生产num个产品 */ public void produce(int num){ //获取锁 lock.lock(); //缓存区剩余容量不足 while(list.size() + num > MAX_SIZE){ System.out.println("要生产的产品数量:"+num+"/t 库存量:"+list.size+"/t 暂时不能执行生产任务!"); try{ //由于条件不满足,生产阻塞 full.await() }catch(InterruptedException e){ e.printStackTrace(); } } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ list.add(new Object()); } System.out.println("已经生产产品数:"+num+"/t 现仓储量为:"+list.size()); //唤醒其它所有线程 full.signalAll(); empty.signalAll(); //释放锁 lock.unlock(); } /** * * 消费num个产品 */ public void consume(int num){ //获取锁 lock.lock(); //缓存区剩余容量不足 while(list.size() < num){ System.out.println("要消费的产品数量:"+num+"/t 库存量:"+list.size+"/t 暂时不能执行消费任务!"); try{ //由于条件不满足,消费阻塞 empty.await() }catch(InterruptedException e){ e.printStackTrace(); } } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ list.remove(); } System.out.println("已经消费产品数:"+num+"/t 现仓储量为:"+list.size()); full.signalAll(); empty.signalAll(); //释放锁 lock.unlock(); } //list getter//setter //MAX_SIZE getter }
3).BlockingQueue阻塞队列来实现
public class Storage{ //缓冲区最大承载量 private final int MAX_SIZE = 100; //缓冲区存储的载体 private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100); /** * * 生产num个产品 */ public void produce(int num){ //缓存区剩余容量不足 while(list.size() == MAX_SIZE){ System.out.println(“库存量:"+MAX_SIZE+"/t 暂时不能执行生产任务!"); } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ try{ list.put(new Object());//容量达到最大时候,自动阻塞 }catch(InterruptedException e){ e.printStackTrace(); } } System.out.println("现仓储量为:"+list.size()); } /** * * 消费num个产品 */ public void consume(int num){ //缓存区剩余容量不足 while(list.size() == 0){ System.out.println("库存量:"+0+"/t 暂时不能执行消费任务!"); } //在生产条件满足情况下,生产num歌产品 for(int i=1;i<=num;++i){ try{ list.take();//容量为0时,自动阻塞 }catch(InterruptedException e){ e.printStackTrace(); } } System.out.println("现仓储量为:"+list.size()); } //list getter//setter //MAX_SIZE getter }
原文:http://www.cnblogs.com/hzmaozhetao/p/6394862.html