CountDownLatch countDownLatch = new CountDownLatch(1);
?
new Thread(()->{
try {
Thread.sleep(10000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}).start();
基于AQS实现的功能。
设置CountDownLatch的数量,本质上就是设置state的值,state是volatile关键字修饰的字段。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
进行释放功能
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
当调用一次CountLatch(1),state值会减1。当state的值为0的时候,返回true。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
当countDownLatch的state值降为0,需要唤醒await线程,阻塞线程也是一个双向链表。该方法每一次调用都会unparkSuccessor(h),唤醒头结点后面节点的阻塞。
如果多个线程被阻塞,唤醒流程在后续进行分析。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
?
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如果tryAcquireShared(arg) < 0,标识state 不等于0,需要进行阻塞。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//当前节点添加列表
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取当前节点的前一个节点
final Node p = node.predecessor();
if (p == head) {
//获取state值是否为0,等于0则r=1
int r = tryAcquireShared(arg);
if (r >= 0) {
//关键操作
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//前面已写。设置pre的wait_status值,当前线程进行park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
进行头结点设置,并唤醒下一个被阻塞的wait。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
唤醒机制:先唤醒一个线程(第一个阻塞的线程) 然后被唤醒的线程又会执行到这里唤醒线程,如此重复下去 最终所有线程都会被唤醒,其实这也是AQS共享锁的唤醒原理。
使用代码:
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("线程组执行结束");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new readNum(i,cyclicBarrier)).start();
}
// CyclicBarrier 可以重复利用,
// 这个是CountDownLatch做不到的
for (int i = 11; i < 16; i++) {
new Thread(new readNum(i,cyclicBarrier)).start();
}
}
static class readNum implements Runnable{
private int id;
private CyclicBarrier cyc;
public readNum(int id,CyclicBarrier cyc){
this.id = id;
this.cyc = cyc;
}
@Override
public void run() {
synchronized (this){
System.out.println("id:"+id);
try {
cyc.await();
System.out.println("线程组任务" + id + "结束,其他任务继续");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
当前功能主要就是该方法实现:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
?
if (g.broken)
throw new BrokenBarrierException();
?
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//线程运行到此会减count值;
int index = --count;
//如果所有线程已执行await(),方法会执行线程。
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //没有start,而是使用当前线程执行方法,不用新创建线程
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
?
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
//当前线程阻塞,使用的是条件锁
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We‘re about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
?
if (g.broken)
throw new BrokenBarrierException();
?
if (g != generation)
return index;
?
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
(1):使用到ReentrantLock lock来保证线程安全;
(2):nextGeneration();会唤醒所有的线程,并且重置count值和generation,可以等待下次调用。
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
原文:https://www.cnblogs.com/mayang2465/p/14653201.html