?
CountDownLatch
public class CountDownLatchTest { private CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); private static int THREAD_COUNT = 3; @Test public void test() throws InterruptedException { long startTime = System.currentTimeMillis(); for (int i = 0; i < THREAD_COUNT; i++) { new Thread(()->{ if(Thread.currentThread().getName().equals("thread_1")){ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } countDownLatch.countDown(); },"thread_"+i).start(); } countDownLatch.await(); long cost = System.currentTimeMillis() - startTime; Assert.assertTrue(cost>2000); } }
CyslicBarrier
public class CyclicBarrierTest { private CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT); private static final int THREAD_COUNT = 3; @Test public void test(){ for (int i = 0; i < THREAD_COUNT; i++) { new Thread(()->{ long startTime = System.currentTimeMillis(); try { if(Thread.currentThread().getName().equals("thread_1")){ TimeUnit.SECONDS.sleep(2); } cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } long cost = System.currentTimeMillis() - startTime; Assert.assertTrue(cost>2000); System.out.println(Thread.currentThread().getName()+"执行了 "+cost+"ms"); },"thread_"+i).start(); } while(true){} } }
Exchanger
public class ExchangeTest { Exchanger exchanger = new Exchanger(); @Test public void test(){ Thread thread = new Thread(new M()); thread.start(); Thread thread1 = new Thread(new N()); thread1.start(); while(true){} } class M implements Runnable{ private String m_value = "m类的变量"; @Override public void run() { try { TimeUnit.SECONDS.sleep(2); Object exchange = exchanger.exchange(m_value); System.out.println("M类-交换后的值:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } } } class N implements Runnable{ private int n_value = 1; @Override public void run() { try { Object exchange = exchanger.exchange(n_value); System.out.println("N类-交换后的值:"+exchange); } catch (InterruptedException e) { e.printStackTrace(); } } } }
ForkJoin
public class ForkJoinTest { //处理的最大任务 private static final int MAX = 200; static class MyForkJoinTask extends RecursiveTask<Integer>{ //子任务开始计算的元素 private Integer startValue; //子任务结束计算的元素 private Integer endValue; public MyForkJoinTask(Integer startValue, Integer endValue) { this.startValue = startValue; this.endValue = endValue; } @Override protected Integer compute() { if(endValue-startValue < MAX){ System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue); return addTotal(startValue,endValue); }else{ MyForkJoinTask myForkJoinTask1 = new MyForkJoinTask(startValue, (startValue+endValue)/2); myForkJoinTask1.fork(); MyForkJoinTask myForkJoinTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1, endValue); myForkJoinTask2.fork(); return myForkJoinTask1.join()+myForkJoinTask2.join(); } } private Integer addTotal(Integer startValue, Integer endValue){ Integer total = 0; for(int i = startValue; i<=endValue; i++){ total+=i; } return total; } } @Test public void test() throws Exception{ ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> task = forkJoinPool.submit(new MyForkJoinTask(1, 1001)); Integer integer = task.get(); System.out.println(integer); } }
Semaphore
public class SemaphoreTest { private static int PARALLEL_COUNT = 3; //并行数量 private static int THREAD_COUNT = 10 * PARALLEL_COUNT;//线程数量 private Semaphore semaphore = new Semaphore(PARALLEL_COUNT); private CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT); @Test public void test() throws InterruptedException { for (int i = 0; i < THREAD_COUNT; i++) { new Thread(()->{ try { semaphore.acquire(); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"开过"); semaphore.release(); countDownLatch.countDown(); if(countDownLatch.getCount()%PARALLEL_COUNT == 0){ System.out.println("-------------");//每三条打印一次 } }, "车"+i).start(); } countDownLatch.await(); System.out.println("---------所有车开过了-------"); } }
Phaser
public class PhaserTest { Phaser phaseMarry = new Phaser(6){ @Override protected boolean onAdvance(int phase, int registeredParties) { switch (phase){ case 0: System.out.println(registeredParties+"个人到达现场"); return false; case 1: System.out.println(registeredParties+"个人用餐完毕"); return false; case 2: System.out.println(registeredParties+"个人开始洞房"); return false; default: return true; } } }; @Test public void test(){ int COUNT = 4; for (int i = 0; i < COUNT; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"到达"); phaseMarry.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"吃饭"); phaseMarry.arriveAndAwaitAdvance(); phaseMarry.arriveAndDeregister(); },"嘉宾"+COUNT).start(); } new Thread(()->{ System.out.println(Thread.currentThread().getName()+"到达"); phaseMarry.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"吃饭"); phaseMarry.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"洞房"); phaseMarry.arriveAndAwaitAdvance(); },"新郎").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"到达"); phaseMarry.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"吃饭"); phaseMarry.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+"洞房"); phaseMarry.arriveAndAwaitAdvance(); },"新娘").start(); } }
线程部分实现
原文:https://www.cnblogs.com/sewell/p/15212281.html