CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用一次CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,你只需要把这个CountDownLatch的引用传递到线程里。
如果有某个解析sheet的线程处理的比较慢,我们不可能让主线程一直等待,所以我们可以使用另外一个带指定时间的await方法,await(long time, TimeUnit unit): 这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。
注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。
public class CountDownLatchTest { private AtomicInteger atomicInteger = new AtomicInteger(); @Test public void cuntDownLatchTest() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(3); ExecutorService executorService = Executors.newFixedThreadPool(3); executorService.execute(() -> { atomicInteger.addAndGet(1000); countDownLatch.countDown(); }); executorService.execute(() -> { atomicInteger.addAndGet(1000); countDownLatch.countDown(); }); executorService.execute(() -> { atomicInteger.addAndGet(1000); countDownLatch.countDown(); }); System.out.println("等待子线程执行"); countDownLatch.await(); System.out.println("子线程执行完毕"); System.out.println(atomicInteger); } }
执行结果
等待子线程执行 子线程执行完毕 3000
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
public class CyclicBarrierTest { private static final int THREAD_NUMBER = 3; private static CyclicBarrier sCyclicBarrier = new CyclicBarrier( THREAD_NUMBER, new Runnable() { @Override public void run() { System.out.println("大家都到达了宿舍楼下,一起出发吧。。。"); } }); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); for (int i = 0; i < THREAD_NUMBER; i++) { executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i)); } try { Thread.sleep(10000);//主线程睡眠 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("CyclicBarrier重用"); for (int i = THREAD_NUMBER; i < THREAD_NUMBER * 2; i++) { executorService.execute(new WalkFromDomitoryToCanteenRunnable(sCyclicBarrier, "同学" + i)); } } /** * 从宿舍到食堂线程 * * @author LiuYi */ public static class WalkFromDomitoryToCanteenRunnable implements Runnable { private CyclicBarrier mCyclicBarrier; private String mName; public WalkFromDomitoryToCanteenRunnable(CyclicBarrier cyclicBarrier, String name) { this.mCyclicBarrier = cyclicBarrier; this.mName = name; } @Override public void run() { System.out.println(mName + "开始从宿舍出发。。。"); try { Thread.sleep(1000); mCyclicBarrier.await();// 等待别同学 // 前往食堂 System.out.println(mName + "开始从宿舍楼下出发。。。"); Thread.sleep(1000); System.out.println(mName + "达到食堂。。。"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }
执行结果
同学0开始从宿舍出发。。。 同学2开始从宿舍出发。。。 同学1开始从宿舍出发。。。 大家都到达了宿舍楼下,一起出发吧。。。 同学0开始从宿舍楼下出发。。。 同学2开始从宿舍楼下出发。。。 同学1开始从宿舍楼下出发。。。 同学0达到食堂。。。 同学2达到食堂。。。 同学1达到食堂。。。 CyclicBarrier重用 同学3开始从宿舍出发。。。 同学4开始从宿舍出发。。。 同学5开始从宿舍出发。。。 大家都到达了宿舍楼下,一起出发吧。。。 同学3开始从宿舍楼下出发。。。 同学4开始从宿舍楼下出发。。。 同学5开始从宿舍楼下出发。。。 同学4达到食堂。。。 同学5达到食堂。。。 同学3达到食堂。。。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
Semaphore翻译成字面意思为 信号量,Semaphore可以控同时访问的线程个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
Semaphore类位于java.util.concurrent包下,它提供了2个构造器:
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits); }
假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:
public class Test { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i<N;i++) new Worker(i,semaphore).start(); } static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人"+this.num+"占用一个机器在生产..."); Thread.sleep(2000); System.out.println("工人"+this.num+"释放出机器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
执行结果
工人0占用一个机器在生产... 工人1占用一个机器在生产... 工人2占用一个机器在生产... 工人4占用一个机器在生产... 工人5占用一个机器在生产... 工人0释放出机器 工人2释放出机器 工人3占用一个机器在生产... 工人7占用一个机器在生产... 工人4释放出机器 工人5释放出机器 工人1释放出机器 工人6占用一个机器在生产... 工人3释放出机器 工人7释放出机器 工人6释放出机器
CountDownLatch、CyclicBarrier和Semaphore
原文:https://www.cnblogs.com/Genesisx/p/9367730.html