其次,由于需要保存count当前元素数量,因此也需要保证存取操作put及take方法互斥。
简单实现上述buffer代码如下:
public class BoundedBuffer{ private static final BoundedBuffer bufferInstance = new BoundedBuffer(); private static final int DEFAULT_BUFFER_SIZE = 1; private final Object[] buffer = new Object[DEFAULT_BUFFER_SIZE]; private static final int EMPTY = 0; private int header; private int tail; private int count; private BoundedBuffer(){ } public static BoundedBuffer getInstanceOfBuffer(){ return bufferInstance; } public synchronized void put(Object obj) throws InterruptedException { while (count >= DEFAULT_BUFFER_SIZE) { System.out.println("the buffer is full,wait for a moment,thread:" + Thread.currentThread().getId()); wait(); } if (tail >= DEFAULT_BUFFER_SIZE) { tail = 0; } System.out.println("success to put the data:"+obj+" into the buffer,thread:"+Thread.currentThread().getId()); buffer[tail++] = obj; count++; // then we invoke the thread in the notEmptyCondition wait queue notifyAll(); } /** * take the data from header of the queue * * @return * @throws InterruptedException */ public synchronized Object take() throws InterruptedException { Object res; while (count <= EMPTY) { System.out.println("the buffer is empty,just wait a moment,thread:" + Thread.currentThread().getId()); wait(); } res = buffer[header]; if (++header >= DEFAULT_BUFFER_SIZE) { header = 0; } count--; if(count<0){ count=0; } //notify the thread which wait to put the data to buffer when the buffer is null notifyAll(); return res; } private static class BufferProducor implements Runnable { private Object target; public void run() { try { BoundedBuffer.getInstanceOfBuffer().put(target); } catch (InterruptedException e) { e.printStackTrace(); System.out .println("client interrupt the task added the data to the buffer"); } } public void setTarget(Object target) { this.target = target; } } private static class BufferConsumer implements Runnable { public void run() { try { Object res = BoundedBuffer.getInstanceOfBuffer().take(); System.out.println("we get the result from buffer:" + res); } catch (InterruptedException e) { e.printStackTrace(); System.out .println("client interrupt the task take the data from the buffer"); } } } public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(5); BufferProducor bufferProducor1 = new BufferProducor(); bufferProducor1.setTarget("a"); BufferProducor bufferProducor2 = new BufferProducor(); bufferProducor2.setTarget("b"); BufferConsumer bufferConsumer1 = new BufferConsumer(); BufferConsumer bufferConsumer2 = new BufferConsumer(); service.submit(bufferProducor1); service.submit(bufferProducor2); service.submit(bufferConsumer1); service.submit(bufferConsumer2); } }
分析这个程序,有什么问题?
首先程序想实现通过wait方法来阻塞存取线程,通过notifyAll来唤醒存取线程。
这里说明下,由于用的是内置锁syncronized,并且当前锁对象是bufferInstance单例实例。所以当调用wait时,当前线程被挂起放入当前bufferInstance相关的内置条件队列当中。后续调用notifyAll则是将这个条件队列中所有阻塞的线程唤醒。
这样由于只有一个条件队列用于存放阻塞的线程,所以存数据线程及取数据线程都是放在一个阻塞条件队列当中。
notifyAll会唤醒所有阻塞的线程,比如,当前在阻塞队列中有10个等待存数据到buffer的线程。
然后有一个消费线程从元素满的buffer中取出数据,并通过notifyAll唤醒所有在阻塞队列中的线程,然后在阻塞队列中的三个线程都醒了,其中一个线程可以将数据放入buffer,其它9个线程由于buffer空间已满,又被挂起进入到阻塞队列。
如果需要优化这段代码性能的话,一种是只在引起存取线程阻塞的状态变化上才进行唤醒操作,即如果取操作线程要唤醒被阻塞的存操作线程,条件是:取操作线程进入take方法时,buffer元素是满的,然后取线程取出一个元素,使得buffer有空闲空间让存线程存数据。
进一步优化的话,能不能每次只唤醒一个线程?
对于现在一个条件队列存放两种类型的阻塞线程来讲,这样是不允许的。
考虑如果当前buffer可以容纳一个元素,这时先有三个存线程往buffer放数据,这样其中两个线程被阻塞到条件队列。
然后这时一个取数据线程,从buffer取走一个数据并调用notify方法唤醒条件队列中一个存线程。
这样条件队列中还有一个存线程。
接着存线程要存数据到buffer,但有一个取线程先来到take方法然后发现buffer还是空的,然后这个取线程被放入到了条件队列。
这样条件队列中就有一个存线程及一个取线程。
然后刚才被唤醒的存线程继续做存操作,然后调用notify唤醒条件队列中的一个线程,由于内置锁的条件队列取操作是非公平的因此很有可能这时唤醒的是条件队列中的
存线程。事实上是没有意义的。
所有对于以上一个条件队列中有两种等待不同条件被阻塞的线程的情况时,不能用单个notify。
如果想用单个notify就要想办法将之前阻塞的存线程与取线程分别放在两个队列。
这就要用到Lock的newCondition方法。
重构代码如下:
public class ConditionBoundedBuffer { private static final ConditionBoundedBuffer bufferInstance = new ConditionBoundedBuffer(); private static final int DEFAULT_BUFFER_SIZE = 1; private final Object[] buffer = new Object[DEFAULT_BUFFER_SIZE]; private static final int EMPTY = 0; private final Lock lock = new ReentrantLock(); private int header; private int tail; private int count; private final Condition notFullCondition = lock.newCondition(); private final Condition notEmptyCondition = lock.newCondition(); private ConditionBoundedBuffer(){ } public static ConditionBoundedBuffer getInstanceOfConditionBoundedBuffer(){ return bufferInstance; } public void put(Object obj) throws InterruptedException { lock.lock(); try { while (count == DEFAULT_BUFFER_SIZE) { System.out.println("the buffer is full,wait for a moment for putting ["+obj+"] to the buffer"+",thread:"+Thread.currentThread().getId()); notFullCondition.await(); } if (tail >= DEFAULT_BUFFER_SIZE) { tail = 0; } buffer[tail++] = obj; count++; System.out.println("success put the data ["+obj+"] to buffer,thread:"+Thread.currentThread().getId()); // then we invoke the thread in the notEmptyCondition wait queue notEmptyCondition.signal(); } finally { lock.unlock(); } } /** * take the data from header of the queue * * @return * @throws InterruptedException */ public Object take() throws InterruptedException { lock.lock(); Object res; try { while (count == EMPTY) { System.out.println("the buffer is empty,just wait a moment,thread:"+Thread.currentThread().getId()); notEmptyCondition.await(); } res = buffer[header]; if (++header >= DEFAULT_BUFFER_SIZE) { header = 0; } count--; if(count<EMPTY){ count=0; } notFullCondition.signal(); } finally { lock.unlock(); } return res; } private static class BufferProducor implements Runnable { private Object target; public void run() { try { ConditionBoundedBuffer.getInstanceOfConditionBoundedBuffer().put(target); } catch (InterruptedException e) { e.printStackTrace(); System.out .println("client interrupt the task added the data to the buffer"); } } public void setTarget(Object target) { this.target = target; } } private static class BufferConsumer implements Runnable { public void run() { try { Object res = ConditionBoundedBuffer.getInstanceOfConditionBoundedBuffer().take(); System.out.println("we get the result from buffer:" + res); } catch (InterruptedException e) { e.printStackTrace(); System.out .println("client interrupt the task take the data from the buffer"); } } } public static void main(String[] args) { ExecutorService service=Executors.newFixedThreadPool(5); BufferProducor bufferProducor1=new BufferProducor(); bufferProducor1.setTarget("a"); BufferProducor bufferProducor2=new BufferProducor(); bufferProducor2.setTarget("b"); BufferConsumer bufferConsumer1=new BufferConsumer(); BufferConsumer bufferConsumer2=new BufferConsumer(); service.submit(bufferProducor1); service.submit(bufferProducor2); service.submit(bufferConsumer1); service.submit(bufferConsumer2); } }
原文:http://blog.csdn.net/zhaozhenzuo/article/details/37109015