CyclicBarrier 开始于JDK 1.5, 一个同步工具类,允许一组线程都等待彼此到达公共屏障点。CyclicBarrier 在程序中非常有用,涉及到固定参数的线程数量等待彼此,这个 barrier 被称为
cyclic 是由于它可以所有的等待线程释放之后,重复使用。
CyclicBarrier 支持一个可选的 Runnable 在每一个屏障点执行一次,在所有参与的线程到达之后,但是在执行之前所有的线程都释放了, barrierAction非常有用的对于在任何参与者继续之前更新共享状态。
public class Test2 {
static class A extends Thread {
private CyclicBarrier cyclicBarrier;
public A(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
static class B extends Thread {
private CyclicBarrier cyclicBarrier;
public B(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
static class C extends Thread {
private CyclicBarrier cyclicBarrier;
public C(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> System.out.println("完成"));
new A(cyclicBarrier, "A").start();
new B(cyclicBarrier, "B").start();
new C(cyclicBarrier, "C").start();
}
}
/**
output:
A 准备完毕
B 准备完毕
C 准备完毕
完成
*/
在看源码前,我们可以对源码的实现进行一些猜想,根据 CyclicBarrier 前面的定义,可以猜想里面有一个变量来表示参与者的数量,在使用调用 await 方法是时候,参与者的数量减一,
知道参与者数量为 0,存在 barrierAction,就执行barrierAction,由于可以重复使用,所以在barrierAction执行对参与者的数量进行恢复。
下面看一下源码实现是否于猜想的类似。
构造方法
parties 参与者的数量
barrierAction 最后执行的动作(可选)
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
await 方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 减去一个参与者
int index = --count;
// 如果参与者数量为0,判断barrierAction是否为null,不为null, 将执行run方法,调用nextGeneration恢复状态
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await(); // 在屏障点等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We‘re about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration 方法
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); // 唤醒参与者
// set up next generation
count = parties; // 恢复参与者的数量
generation = new Generation(); // 下一代
}
借助AtomicInteger 来简单实现
public class SimpleBarrier {
private AtomicInteger count;
int size;
private Runnable command;
public SimpleBarrier(int n) {
this.count = new AtomicInteger(n);
this.size = n;
}
public SimpleBarrier(int n, Runnable barrierAction) {
this(n);
this.command = barrierAction;
}
public void await() {
int position = count.getAndDecrement();
if (position == 1) {
command.run();
count.set(size);
} else {
while (count.get() != 0) {
}
}
}
}
public class Test2 {
static class A extends Thread {
private SimpleBarrier cyclicBarrier;
public A(SimpleBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
}
}
static class B extends Thread {
private SimpleBarrier cyclicBarrier;
public B(SimpleBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
}
}
static class C extends Thread {
private SimpleBarrier cyclicBarrier;
public C(SimpleBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
setName(name);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 准备完毕");
cyclicBarrier.await();
}
}
public static void main(String[] args) {
SimpleBarrier cyclicBarrier = new SimpleBarrier(3, () -> System.out.println("完成"));
new A(cyclicBarrier, "A").start();
new B(cyclicBarrier, "B").start();
new C(cyclicBarrier, "C").start();
}
}
/**
output:
A 准备完毕
B 准备完毕
C 准备完毕
完成
*/
最后一个线程来执行。
原文:https://www.cnblogs.com/lzeffort/p/14832560.html