public class CountDownLatch extends ObjectA synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A
CountDownLatchis initialized with a given count. Theawaitmethods block until the current count reaches zero due to invocations of thecountDown()method, after which all waiting threads are released and any subsequent invocations ofawaitreturn immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using aCyclicBarrier.A
CountDownLatchis a versatile synchronization tool and can be used for a number of purposes. ACountDownLatchinitialized with a count of one serves as a simple on/off latch, or gate: all threads invokingawaitwait at the gate until it is opened by a thread invokingcountDown(). ACountDownLatchinitialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.A useful property of a
CountDownLatchis that it doesn‘t require that threads callingcountDownwait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past anawaituntil all threads could pass.
用来同步一个或多个任务,强制它们等待其他任务执行的一组操作完成。可以这么理解,有两组任务A,B。A的多个任务等待B组的所有任务结束才能执行。
CountDownLatch对象设置一个初始的值。A中的任务执行前先调用CountDownLatch.await()方法将当前任务阻塞,当CountDownLatch的值为0时才能进行下去。B中的每个任务执行完都调用CountDownLatch.countDown()来减小计数值。这样就可以保证B中的任务可以同时进行,当B的任务全部结束,A的任务才可以开始。
CountDownLatch对象的计数值不能被再次重置,只能使用一次。想要重置,使用CyclicBarrier。
public class CountDownLatchDemo { static final int SIZE = 10; public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(SIZE); for(int i=0;i<10;i++){ exec.execute(new A(latch)); } for(int i=0;i<10;i++){ exec.execute(new B(latch)); } System.out.println("Launched all tasks"); exec.shutdown(); } } class B implements Runnable{ private static int counter =0; private final int id = counter++; private static Random random = new Random(47); private final CountDownLatch latch; public B(CountDownLatch latch){ this.latch=latch; } public void run(){ try{ doWork(); latch.countDown();//B中完成一次任务,计数值减1 }catch(InterruptedException e){ e.printStackTrace(); } } private void doWork() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(random.nextInt(10)); System.out.println(this+" completed"); } public String toString(){ return String.format("B %1$-3d", id); } } class A implements Runnable{ private static int counter =0; private final int id = counter++; private static Random random = new Random(57); private final CountDownLatch latch; public A(CountDownLatch latch){ this.latch=latch; } @Override public void run() { try{ latch.await();//等待计数值为0,在这之前都处于阻塞状态 TimeUnit.MILLISECONDS.sleep(random.nextInt(1)); System.out.println("Latch barrier passedd for "+this); }catch(InterruptedException e){ e.printStackTrace(); } } public String toString(){ return String.format("A %1$-3d", id); } } 输出: Launched all tasks B 3 completed B 2 completed B 6 completed B 5 completed B 0 completed B 7 completed B 1 completed B 4 completed B 9 completed B 8 completed Latch barrier passedd for A 0
Latch barrier passedd for A 3
Latch barrier passedd for A 2
Latch barrier passedd for A 1
Latch barrier passedd for A 4
Latch barrier passedd for A 5
Latch barrier passedd for A 6
Latch barrier passedd for A 7
Latch barrier passedd for A 8
Latch barrier passedd for A 9
试想有一个任务,把它分割成多个子任务交给不同的线程去做,等它们都完成后,再执行下一个步骤。这时可以用CyclicBarrier对象。
初始化CyclicBarrier对象时需要指定任务的数目n,当有n个线程对同一个CyclicBarrier对象的await方法调用并进入阻塞的话,才算达到栅栏点。
以下程序演示对矩阵的每个元素求平方,我们用5个线程分别处理5行数据。等到5个线程都完成工作后,将处理好的矩阵打印输出。
public class CyclicBarrierDemo { public static void main(String[] args) { int[][] matrix={{1,1,1,1,1},{2,2,2,2,2},{3,3,3,3,3},{4,4,4,4,4},{5,5,5,5,5}}; List<int[]> list = new ArrayList<int[]>();//把矩阵每一行放在list里 for(int i=0;i<matrix.length;i++){list.add(matrix[i]);} ExecutorService exec = Executors.newCachedThreadPool(); CyclicBarrier barrier = new CyclicBarrier(5,new Runnable(){ public void run() { System.out.println("Solver are all completed"); //打印处理后的矩阵 for(int i=0;i<matrix.length;i++){ System.out.println(Arrays.toString(list.get(i))); } } }); for(int i=0;i<5;i++){ exec.execute(new Solver(barrier,list.get(i))); } } } class Solver implements Runnable{ private int[] row; private static int count=0; public Random random = new Random(47); private final int id = count++; private CyclicBarrier barrier; public Solver(CyclicBarrier barrier,int[] row){ this.barrier=barrier; this.row=row; } public void run(){ try { //任务开始 int length = row.length; for(int i=0;i<length;i++){ row[i]=row[i]*row[i]; } TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); System.out.println("Solver "+id+" completed"); //任务结束,到达栅栏点 barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch (BrokenBarrierException e) { e.printStackTrace(); } } } 输出: Solver 1 completed Solver 2 completed Solver 0 completed Solver 3 completed Solver 4 completed Solver are all completed [1, 1, 1, 1, 1] [4, 4, 4, 4, 4] [9, 9, 9, 9, 9] [16, 16, 16, 16, 16] [25, 25, 25, 25, 25]
这是一个无界的BlockingQueue,里面放置Delayed对象,其中的对象只能在其到期后才能被取走。如果队列中无到期的对象,则等待。该队列是有序队列,队首是最先过期的那个对象。因此该队列内部的对象需要比较各自的过期时间,故对象必须实现Delayed接口,即实现两个方法compareTo()和getDelay()。一个是用于比较对象之间的时间先后,一个用于获取对象的过期时间。
public class DelayQueueDemo { public static void main(String[] args) { DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>(); ExecutorService exec = Executors.newCachedThreadPool(); Random random = new Random(48); for(int i=0;i<20;i++) queue.put(new DelayedTask(random.nextInt(1000)));//把所有具有延迟到期功能的对象放在DelayQueue对列里 exec.execute(new DelayedTaskConsumer(queue)); queue.put(new DelayedTask(500)); } } //具有延迟到期功能的任务 class DelayedTask implements Runnable,Delayed{ private static int count=0; private final int id = count++; private final int delta; private final long trigger; public DelayedTask(int delayMilliseconds){ delta = delayMilliseconds; trigger = System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask that = (DelayedTask) o; if(trigger<that.trigger) return -1; if(trigger>that.trigger) return 1; return 0; } @Override public long getDelay(TimeUnit unit) { //返回剩余时间 return TimeUnit.NANOSECONDS.convert(System.nanoTime()-trigger,TimeUnit.NANOSECONDS); } @Override public void run() { System.out.println("DelayedTask delayTime ["+delta+"] "+"is running"); } } class DelayedTaskConsumer implements Runnable{ private DelayQueue<DelayedTask> queue ; public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){ this.queue = queue; } @Override public void run() { try{ while(!Thread.interrupted()){queue.take().run();}//取出最先过期的对象,并操作该对象,这里执行了对象的run方法 }catch(InterruptedException e){e.printStackTrace();} } } 输出: DelayedTask delayTime [100] is running DelayedTask delayTime [140] is running DelayedTask delayTime [183] is running DelayedTask delayTime [244] is running DelayedTask delayTime [316] is running DelayedTask delayTime [368] is running DelayedTask delayTime [522] is running DelayedTask delayTime [562] is running DelayedTask delayTime [569] is running DelayedTask delayTime [703] is running DelayedTask delayTime [794] is running DelayedTask delayTime [804] is running DelayedTask delayTime [831] is running DelayedTask delayTime [877] is running DelayedTask delayTime [911] is running DelayedTask delayTime [926] is running DelayedTask delayTime [972] is running DelayedTask delayTime [982] is running DelayedTask delayTime [984] is running DelayedTask delayTime [987] is running
优先级队列,它具有可阻塞的读取操作。该队列总是取出优先级最高的对象。优先级的比较由队列内对象的compareTo方法比较,故对象应实现Comparable接口。
public class PriorityBlockingQueueDemo { public static void main(String[] args) { PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(); ExecutorService exec = Executors.newCachedThreadPool(); Random random = new Random(48); for(int i=0;i<20;i++) queue.put(new PriorityTask(random.nextInt(1000))); exec.execute(new PriorityTaskConsumer(queue)); } } class PriorityTask implements Runnable,Comparable{ private static int count=0; private final int id = count++; private final int priority; public PriorityTask(int priority){ this.priority=priority; } @Override public int compareTo(Object o) { PriorityTask that = (PriorityTask) o; if(this.priority<that.priority) return -1; if(this.priority>that.priority) return 1; return 0; } @Override public void run() { System.out.println("PriorityTask priority ["+priority+"] is runnig"); } } class PriorityTaskConsumer implements Runnable{ private PriorityBlockingQueue<Runnable> queue; Random random = new Random(28); public PriorityTaskConsumer(PriorityBlockingQueue<Runnable> queue){ this.queue = queue; } @Override public void run() { try{ while(!Thread.interrupted()){ queue.take().run(); TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)); } }catch(InterruptedException e){e.printStackTrace();} } } 输出: PriorityTask priority [100] is runnig PriorityTask priority [140] is runnig PriorityTask priority [183] is runnig PriorityTask priority [244] is runnig PriorityTask priority [316] is runnig PriorityTask priority [368] is runnig PriorityTask priority [522] is runnig PriorityTask priority [562] is runnig PriorityTask priority [569] is runnig PriorityTask priority [703] is runnig PriorityTask priority [794] is runnig PriorityTask priority [804] is runnig PriorityTask priority [831] is runnig PriorityTask priority [877] is runnig PriorityTask priority [911] is runnig PriorityTask priority [926] is runnig PriorityTask priority [972] is runnig PriorityTask priority [982] is runnig PriorityTask priority [984] is runnig PriorityTask priority [987] is runnig
原文:http://www.cnblogs.com/wuchaodzxx/p/5991125.html