import java.util.concurrent.Semaphore; public class BoundedBuffer<E> { private final Semaphore availableItems, availableSpaces; private final E[] items; private int putPosition = 0; private int takePosition = 0; @SuppressWarnings("unchecked") public BoundedBuffer(int capacity) { availableItems = new Semaphore(0); availableSpaces = new Semaphore(capacity); items = (E[]) new Object[capacity]; } public boolean isEmpty() { return availableItems.availablePermits() == 0; } public boolean isFull() { return availableSpaces.availablePermits() == 0; } public void put(E x) throws InterruptedException { availableSpaces.acquire(); doInsert(x); availableItems.release(); } public E take() throws InterruptedException { availableItems.acquire(); E item = doExtract(); availableSpaces.release(); return item; } private synchronized void doInsert(E x) { int i = putPosition; items[i] = x; putPosition = (++i == items.length)?0 : i; } private synchronized E doExtract() { int i = takePosition; E x = items[i]; items[i] = null; takePosition = (++i == items.length)?
0 : i; return x; } }
import static org.junit.Assert.*; import org.junit.Test; public class BoundedBufferTests { @Test public void testIsEmptyWhenConstructed() { BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); assertTrue(bb.isEmpty()); assertFalse(bb.isFull()); } @Test public void testIsFullAfterPuts() throws InterruptedException { BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); for (int i = 0; i < 10; i++) { bb.put(i); } assertTrue(bb.isFull()); assertTrue(bb.isEmpty()); } }
@Test public void testTakeBlocksWhenEmpty(){ final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10); Thread taker = new Thread(){ @Override public void run() { try { int unused = bb.take(); fail(); //假设运行到这里。那么表示出现了一个错误 } catch (InterruptedException e) { } } }; try { taker.start(); Thread.sleep(LOCKUP_DETECT_TIMEOUT); taker.interrupt(); taker.join(LOCKUP_DETECT_TIMEOUT); assertFalse(taker.isAlive()); } catch (InterruptedException e) { fail(); } }
被堵塞线程并不须要进入WAITING或者TIMED_WAITING等状态,因此JVM能够选择通过自旋等待来实现堵塞。
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; public class PutTakeTest extends TestCase { private static final ExecutorService pool = Executors.newCachedThreadPool(); private final AtomicInteger putSum = new AtomicInteger(0); private final AtomicInteger takeSum = new AtomicInteger(0); private final CyclicBarrier barrier; private final BoundedBuffer<Integer> bb; private final int nTrials, nPairs; public static void main(String[] args) { new PutTakeTest(10, 10, 100000).test(); // 演示样例參数 pool.shutdown(); } static int xorShift(int y) { y ^= (y << 6); y ^= (y >>> 21); y ^= (y << 7); return y; } public PutTakeTest(int capacity, int nPairs, int nTrials) { this.bb = new BoundedBuffer<Integer>(capacity); this.nTrials = nTrials; this.nPairs = nPairs; this.barrier = new CyclicBarrier(nPairs * 2 + 1); } void test() { try { for (int i = 0; i < nPairs; i++) { pool.execute(new Producer()); pool.execute(new Consumer()); } barrier.await(); // 等待全部的线程就绪 barrier.await(); // 等待全部的线程运行完毕 assertEquals(putSum.get(), takeSum.get()); } catch (Exception e) { throw new RuntimeException(e); } } class Producer implements Runnable { @Override public void run() { try { int seed = (this.hashCode() ^ (int) System.nanoTime()); int sum = 0; barrier.await(); for (int i = nTrials; i > 0; --i) { bb.put(seed); sum += seed; seed = xorShift(seed); } putSum.getAndAdd(sum); barrier.await(); } catch (Exception e) { throw new RuntimeException(e); } } } class Consumer implements Runnable { @Override public void run() { try { barrier.await(); int sum = 0; for (int i = nTrials; i > 0; --i) { sum += bb.take(); } takeSum.getAndAdd(sum); barrier.await(); } catch (Exception e) { throw new RuntimeException(e); } } } }
class Big { double[] data = new double[100000]; }; void testLeak() throws InterruptedException{ BoundedBuffer<Big> bb = new BoundedBuffer<Big>(CAPACITY); int heapSize1 = /* 生成堆的快照 */; for (int i = 0; i < CAPACITY; i++){ bb.put(new Big()); } for (int i = 0; i < CAPACITY; i++){ bb.take(); } int heapSize2 = /* 生成堆的快照 */; assertTrue(Math.abs(heapSize1 - heapSize2) < THRESHOLD); }
this .timer = new BarrierTimer(); this .barrier = new CyclicBarrier(nPairs * 2 + 1, timer); public class BarrierTimer implements Runnable{ private boolean started ; private long startTime ; private long endTime ; @Override public synchronized void run() { long t = System.nanoTime(); if (!started ){ started = true ; startTime = t; } else { endTime = t; } } public synchronized void clear(){ started = false ; } public synchronized long getTime(){ return endTime - startTime; } }
void test(){ try { timer.clear(); for (int i = 0; i < nPairs; i++){ pool .execute( new Producer()); pool .execute( new Consumer()); } barrier .await(); barrier .await(); long nsPerItem = timer.getTime() / ( nPairs * (long )nTrials ); System. out .println("Throughput: " + nsPerItem + " ns/item"); assertEquals(putSum.get(), takeSum.get() ) } catch (Exception e) { throw new RuntimeException(e); } }
public static void main(String[] args) throws InterruptedException { int tpt = 100000; // 每一个线程中的測试次数 for (int cap = 1; cap <= tpt; cap *= 10){ System. out .println("Capacity: " + cap); for (int pairs = 1; pairs <= 128; pairs *= 2){ TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt); System. out .println("Pairs: " + pairs + "\t"); t.test(); System. out .println("\t" ); Thread. sleep(1000); t.test(); System. out .println(); Thread. sleep(1000); } } pool .shutdown(); }
版权声明:本文博主原创文章,博客,未经同意不得转载。
原文:http://www.cnblogs.com/gcczhongduan/p/4850070.html