public class SemaphoreTest {private static final int NUMBER = 5; //限制资源访问数private static final Semaphore avialable = new Semaphore(NUMBER,true);public static void main(String[] args) {ExecutorService pool = Executors.newCachedThreadPool();Runnable r = new Runnable(){public void run(){try {avialable.acquire(); //此方法阻塞Thread.sleep(10*1000);System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完毕");avialable.release();} catch (InterruptedException e) {e.printStackTrace();}}};System.out.println(avialable.availablePermits());for(int i=0;i<10;i++){pool.execute(r);}System.out.println(avialable.availablePermits());pool.shutdown();}public static String getNow(){SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");return sdf.format(new Date());}}
synchronized(object){//do process to object- }
private final ReentrantLock lock = new ReentrantLock();public void m() {lock.lock(); // block until condition holdstry {// ... method body} finally {lock.unlock()}}
public class ConditionTest {private static final ReentrantLock lock = new ReentrantLock(true);//从锁中创建一个绑定条件private static final Condition condition = lock.newCondition();private static int count = 1;public static void main(String[] args) {Runnable r1 = new Runnable(){public void run(){lock.lock();try{while(count<=5){System.out.println(Thread.currentThread().getName()+"--"+count++);Thread.sleep(1000);}condition.signal(); //线程r1释放条件信号,以唤醒r2中处于await的代码继续执行。} catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}};Runnable r2 = new Runnable(){public void run(){lock.lock();try{if(count<=5){System.out.println("----$$$---");condition.await(); //但调用await()后,lock锁会被释放,让线程r1能获取到,与Object.wait()方法一样System.out.println("----------");}while(count<=10){System.out.println(Thread.currentThread().getName()+"--"+count++);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}};new Thread(r2).start(); //让r2先执行,先获得lock锁,但条件不满足,让r2等待await。try {Thread.sleep(100); //这里休眠主要是用于测试r2.await()会释放lock锁,被r1获取} catch (InterruptedException e) {e.printStackTrace();}new Thread(r1).start();}}
public class ConditionMain {public static void main(String[] args) {final BoundleBuffer buf = new ConditionMain().new BoundleBuffer();new Thread(new Runnable(){public void run() {for(int i=0;i<1000;i++){try {buf.put(i);System.out.println("入值:"+i);Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();new Thread(new Runnable(){public void run() {for(int i=0;i<1000;i++){try {int x = buf.take();System.out.println("出值:"+x);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}public class BoundleBuffer {final Lock lock = new ReentrantLock();final Condition notFull = lock.newCondition();final Condition notEmpty = lock.newCondition();final Integer[] items = new Integer[10];int putptr, takeptr, count;public void put(int x) throws InterruptedException {System .out.println("put wait lock");lock.lock();System .out.println("put get lock");try {while (count == items.length){System.out.println("buffer full, please wait");notFull.await();}items[putptr] = x;if (++putptr == items.length)putptr = 0;++count;notEmpty.signal();} finally {lock.unlock();}}public int take() throws InterruptedException {System .out.println("take wait lock");lock.lock();System .out.println("take get lock");try {while (count == 0){System.out.println("no elements, please wait");notEmpty.await();}int x = items[takeptr];if (++takeptr == items.length)takeptr = 0;--count;notFull.signal();return x;} finally {lock.unlock();}}}}
抛出异常 特殊值 阻塞 超时插入 add(e) offer(e) put(e) offer(e, time, unit)移除 remove() poll() take() poll(time, unit)检查 element() peek() 不可用 不可用
public class CompletionServiceTest {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(8); //需要2s,如果将8改成10,则只需要1sCompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(pool);Callable<Boolean> task = new Callable<Boolean>(){public Boolean call(){try {Thread.sleep(1000);System.out.println("插入1000条数据完成");} catch (InterruptedException e) {e.printStackTrace();}return true;};};System.out.println(getNow()+"--开始插入数据");for(int i=0;i<10;i++){cs.submit(task);}for(int i=0;i<10;i++){try {//ExecutorCompletionService.take()方法是阻塞的,如果当前没有完成的任务则阻塞System.out.println(cs.take().get());//实际使用时,take()方法获取的结果可用于处理,如果插入失败,则可以进行重试或记录等操作} catch (InterruptedException|ExecutionException e) {e.printStackTrace();}}System.out.println(getNow()+"--插入数据完成");pool.shutdown();}public static String getNow(){SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");return sdf.format(new Date());}}
public class CountDownLatchTest {private static SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");public static void main(String[] args) {final CountDownLatch start = new CountDownLatch(1); //用一个信号控制一组线程的开始,初始化为1final CountDownLatch end = new CountDownLatch(10); //要等待N个线程的结束,初始化为N,这里是10Runnable r = new Runnable(){public void run(){try {start.await(); //阻塞,这样start.countDown()到0,所有阻塞在start.await()处的线程一起执行Thread.sleep((long) (Math.random()*10000));System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完成");end.countDown();//非阻塞,每个线程执行完,让end--,这样10个线程执行完end倒数到0,主线程的end.await()就可以继续执行} catch (InterruptedException e) {e.printStackTrace();}}};for(int i=0;i<10;i++){new Thread(r).start(); //虽然开始了10个线程,但所有线程都阻塞在start.await()处}System.out.println(getNow()+"--线程全部启动完毕,休眠3s再让10个线程一起执行");try {Thread.sleep(3*1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(getNow()+"--开始");start.countDown(); //start初始值为1,countDown()变成0,触发10个线程一起执行try {end.await(); //阻塞,等10个线程都执行完了才继续往下。} catch (InterruptedException e) {e.printStackTrace();}System.out.println(getNow()+"--10个线程都执行完了,主线程继续往下执行!");}private static String getNow(){return sdf.format(new Date());}}
1.一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。也就是说,这一组线程的执行分几个节点,每个节点往下执行,都需等待其他线程,这就需要这种等待具有循环性。CyclicBarrier在这样的情况下就很有用。
2.CyclicBarrier与CountDownLacth的区别:
1)CountDownLacth用于一个线程与一组线程之间的相互等待。常用的就是一个主线程与一组分治线程之间的等待:主线程发号令,一组线程同时执行;一组线程依次执行完,再唤醒主线程继续执行;
CyclicBarrier用于一组线程执行时,每个线程执行有多个节点,每个节点的处理需要相互等待。如:对5个文件进行处理,按行将各个文件数字挑出来合并成一行,排序,并输出到另一个文件,那每次处理都需要等待5个线程读入下一行。(api示例可供参考)
2)CountDownLacth的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次countDown(),那么cdLatch减1,等所有线程都调用过countDown(),那么cdLatch值达到0,那么线程从await()处接着玩下执行。
CyclicBarrier的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次await(),那么barrier加1,等所有线程都调用过await(),那么barrier值达到初始值N,所有线程接着往下执行,并将barrier值重置为0,再次循环下一个屏障;
3)由2)可以知道,CountDownLatch只可以使用一次,而CyclicBarrier是可以循环使用的。
3.个人用于理解的示例:
public class CyclicBarrierTest {private static final CyclicBarrier barrier = new CyclicBarrier(5,new Runnable(){public void run(){ //每次线程到达屏障点,此方法都会执行System.out.println("\n--------barrier action--------\n");}});public static void main(String[] args) {for(int i=0;i<5;i++){new Thread(new CyclicBarrierTest().new Worker()).start();}}class Worker implements Runnable{public void run(){try {System.out.println(Thread.currentThread().getName()+"--第一阶段");Thread.sleep(getRl());barrier.await(); //每一次await()都会阻塞,等5个线程都执行到这一步(相当于barrier++操作,加到初始化值5),才继续往下执行System.out.println(Thread.currentThread().getName()+"--第二阶段");Thread.sleep(getRl());barrier.await(); //每一次5个线程都到达共同的屏障节点,会执行barrier初始化参数中定义的Runnable.run()System.out.println(Thread.currentThread().getName()+"--第三阶段");Thread.sleep(getRl());barrier.await();System.out.println(Thread.currentThread().getName()+"--第四阶段");Thread.sleep(getRl());barrier.await();System.out.println(Thread.currentThread().getName()+"--第五阶段");Thread.sleep(getRl());barrier.await();System.out.println(Thread.currentThread().getName()+"--结束");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}}public static long getRl(){return Math.round(10000);}}
原文:http://www.cnblogs.com/You0/p/6670201.html