本文使用java语言借助java并发库去实现生产者和消费者问题。主要设计思路:1.物料池是共享容器;2.生产者只负责生产物料,添加到物料池中;3.消费者从池中获取物料。在这里使用ReenTranLock控制共享容器的同步,使用Conditona做线程间的通知,当物料池满的时候挂起生产者,并且唤醒消费者去消费池中物料,当池中无物料的时候,挂起消费者,唤醒生产者生产物料。
在编码之前我需要先对生产者、消费者、物料池做一个简单的分析:
1.消费者和生产者他们的任务都是单一的,消费者消费物料,生产者生产物料,消费者和生产者对外只需要记住是哪个物料池就行了。
2.共享数据控制同步应该同一个类中完成,这样控制方便,而且简单。
3.发出通知的应该物料池。因为只有它自己知道自己的状态,消费者和生产者才不会关心它。冷暖自知!!
在做了简单的分析之后,清楚了各个对象的功能。接下来就是设计了。涉及到具体的地方无会在代码中注释,就不在这干巴巴的说了。
核心-物料池
package com.autonavi.pc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/***
* 物料池
* @author 零下三度
*
*/
public class Pool {
private ReentrantLock lock = new ReentrantLock();
private Condition putCondition = lock.newCondition();
private Condition getCondition = lock.newCondition();
private Product[] productPool = new Product[10];
private int size;//池中物料的数量
private int currPutIndex;//当前添加物料的索引
private int currGetIndex;//当前获取物料的索引
private Pool(){
size = 0;
currPutIndex = 0;
currGetIndex = 0;
}
public static Pool getPool(){
return new Pool();
}
/**
* 生产者向物料池添加一个商品
* @param product
* @throws InterruptedException
*/
public void put(Product product) throws InterruptedException{
try{
lock.lock();
//如果物料池满了,则不再允许向物料池中添加物料。
while(size == productPool.length){
System.out.println("物料池已经满了,暂时不能添加产品了,请耐心等待.....当前池中物料为:"+size);
putCondition.await();
}
productPool[currPutIndex] = product;
if(++currPutIndex == productPool.length){
currPutIndex = 0;
}
++size;
//注意:由于终端是共享资源,放在此处才能看到真正的测试结果过
System.out.println(product.toString()+"已经添加到物料池中,当前池中产品个数:"+size+",currPutIndex="+currPutIndex);
//添加了物料,池中有可用的物料,通知消费者可以从池中获取物料
getCondition.signal();
}finally{
lock.unlock();
}
}
/***
* 消费者从物料池中获取一个商品。
* @return
* @throws InterruptedException
*/
public Product get() throws InterruptedException{
try{
lock.lock();
//如果池中没有物料,则禁止消费者从池中获取物料
while(size == 0){
System.out.println("目前没有物料,暂时无法获取产品,请耐心等待.....当前池中物料数量:"+size);
getCondition.await();
}
Product p = productPool[currGetIndex];
productPool[currGetIndex] = null;
if(++currGetIndex == productPool.length){
currGetIndex = 0;
}
--size;
System.out.println("出库的是:"+p.toString()+"当前池中还有产品个数:"+size+",currGetIndex="+currGetIndex);
putCondition.signal();
return p;
}finally{
lock.unlock();
}
}
}
生产者:
package com.autonavi.pc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/***
* 生产者
* @author 零下三度
*
*/
public class Producers implements Runnable{
private AtomicLong id = new AtomicLong(0);
private Pool pool;
private String productName;
public Producers(){
}
public Producers(Pool pool,String productName,AtomicLong id){
this.id = id;
this.pool = pool;
this.productName = productName;
}
public Pool getPool() {
return pool;
}
public void setPool(Pool pool) {
this.pool = pool;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public AtomicLong getId() {
return id;
}
public void setId(AtomicLong id) {
this.id = id;
}
/***
* 生产者的工作就是生产商品,添加到物料吃中
* 至于什么时候停止,什么时候开始,是需要被人去给他消息的。
*/
public void run() {
Product p;
try {
while(true){
TimeUnit.MILLISECONDS.sleep(200);
p = new Product(""+id.incrementAndGet(),productName);
pool.put(p);
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start(){
Thread t = new Thread(this);
t.start();
}
}
消费者:
package com.autonavi.pc;
import java.util.concurrent.TimeUnit;
/***
* 消费者
* @author 零下三度
*
*/
public class Consumers implements Runnable{
private Pool pool;
public Pool getPool() {
return pool;
}
public void setPool(Pool pool) {
this.pool = pool;
}
/***
* 消费者的工作就是消费物料池中的商品
*/
public void run() {
try {
Product p;
while(true){
TimeUnit.MILLISECONDS.sleep(200);
p = pool.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start(){
Thread t = new Thread(this);
t.start();
}
}
物料-产品:
package com.autonavi.pc;
/***
* 商品
* @author 零下三度
*
*/
public class Product {
private String id;
private String name;
public Product(){
}
public Product(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
String val = "id:"+id+"\n"+"name:"+name+"\n";
return val;
}
@Override
public boolean equals(Object obj) {
if(obj != null && obj instanceof Product){
Product p = (Product)obj;
if(this.id != null && this.id.equals(p.getId())){
return true;
}
}
return false;
}
@Override
public int hashCode() {
return id.hashCode();
}
}
测试用例:
package com.autonavi.pc;
import java.util.concurrent.atomic.AtomicLong;
public class ProductsAndConsumersTest {
public static void main(String[] args) throws InterruptedException {
Pool pool = Pool.getPool();
Producers p = new Producers(pool,"产品A",new AtomicLong(0));
System.out.println("启动生产者");
p.start();
Consumers c = new Consumers();
c.setPool(pool);
System.out.println("启动消费者");
c.start();
}
}
原文:http://my.oschina.net/u/1246838/blog/311811