首先需要一个可供测试的程序做为栗子。就是下面这个了。一个固定长度的 队列,其中定义可阻塞的put和take方法,并通过两个计数器进行控制。
import java.util.concurrent.Semaphore;
public class BoundedBuffer<E> {
private final Semaphore availableItems, availableSpaces;
private final E[] Items;
private int putPosition = 0, takePosition = 0;
public BoundedBuffer(int capacity) {
availableItems = new Semaphore(0);
availableSpaces = new Semaphore(capacity);
Items = (E[]) new Object[capacity];
}
public boolean isEmpty() {
return availableItems.availablePermits() == 0;
}
public boolean isFull() {
return availableSpaces.availablePermits() == 0;
}
public void put(E x) throws InterruptedException {
availableSpaces.acquire();
doInsert(x);
availableItems.release();
}
public E take() throws InterruptedException {
availableItems.acquire();
E item = doExtra();
availableSpaces.release();
return item;
}
private synchronized void doInsert(E x) {
// TODO Auto-generated method stub
int i = putPosition;
Items[i] = x;
putPosition = (++i == Items.length) ? 0 : i;
}
private synchronized E doExtra() {
int i = takePosition;
E x = Items[i];
Items[i] = null;
takePosition = (++i == Items.length) ? 0 : i;
return x;
}
}
使用JUnit就可以了
import static org.junit.Assert.*;
import org.junit.Test;
public class BoundedBufferTest {
@Test
public void test() {
testIsEmptyWhenConstructed();
try {
testIsFullAfterPuts();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
void testIsEmptyWhenConstructed() {
BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
assertTrue(bb.isEmpty());
assertFalse(bb.isFull());
}
void testIsFullAfterPuts() throws InterruptedException {
BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
for (int i = 0; i < 10; i++) {
bb.put(i);
}
assertFalse(bb.isEmpty());
assertTrue(bb.isFull());
}
}
测试代码尝试从空的缓存中take一个元素,如果能成功那么就测试失败了。然后再等待一段时间 ,再中断该线程。如果获取线程正确的在take中阻塞,那么将抛出interruptedException。捕获到异常的catch块将此试为测试成功并让线程退出,然后主测试线程尝试与获取线程合并,通过调用isAlive方法验证Join方法是否成功返回。如果获取线程可以中断线程,那么 join能很快完成。
void testTakeBlocksWhenEmpty() {
final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
Thread taker = new Thread() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
int unused = bb.take();
fail();
} catch (InterruptedException success) {
}
}
};
try {
taker.start();
Thread.sleep(LOCKUP_DETECT_TIMEOUT);
taker.interrupt();
taker.join(LOCKUP_DETECT_TIMEOUT);
assertFalse(taker.isAlive());
} catch (Exception unException) {
fail();
}
}测试在数据竞争条件下是否会发生错误。这就需要一个并发的测试程序。或许比编写本身要测试的类更加困难。
通过一个对顺序敏感的校验和计算函数来计算 所有入列的元素以及出列元素的检验和,并进行比较。如果二者相等,那么测试就是成功的。如果只有一个生产者一个消费者,那么 这种方法能发挥最大的作用。因为它不仅能测试出是否取出了正确的元素,而且还能测试出元素被取出的顺序是否正确、
如果要将这种方法扩展到多生产者多消费者的情况时,就需要一个对元素入列和出列顺序不敏感的校验和函数。从而在测试程序运行完以后,可以将多个检验和以不同的顺序组合起来,如果不是这样,多个线程就需要访问 同一个共享的检验和变量 ,因此就需要同步,这将成为一个并发的瓶颈。
要确保测试程序能够正确地测试所有要点,就一定不能让编译器可以预先猜测到检验 和的值。那么会对许多 其他 的测试造成影响。由于大多数随机类生成器都是线程安全的。并且会带来额外的同步开销。所以还不如用一个简单的伪随机函数 。
static int XorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
import org.junit.Test;
public class PutTakeTest {
private static final ExecutorService pool = Executors.newCachedThreadPool();
private final AtomicInteger putSum = new AtomicInteger(0);
private final AtomicInteger takeSum = new AtomicInteger(0);
private final CyclicBarrier barrier;
private final BoundedBuffer<Integer> bb;
private final int Ntrials, nPairs;
public PutTakeTest(int capacity, int ntrials, int nPairs) {
this.bb = new BoundedBuffer<Integer>(capacity);
Ntrials = ntrials;
this.nPairs = nPairs;
this.barrier = new CyclicBarrier(nPairs * 2 + 1);
}
public static void main(String[] args) {
new PutTakeTest(10, 100000, 10).test();
pool.shutdown();
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await();
barrier.await();
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException(e);
}
}
class Producer implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = Ntrials; i > 0; i--) {
bb.put(seed);
sum += seed;
seed = XorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
try {
barrier.await();
int sum = 0;
for (int i = Ntrials; i > 0; i--) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
// TODO: handle exception
throw new RuntimeException(e);
}
}
}
static int XorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
}
原文:http://blog.csdn.net/lingchixin/article/details/39269909