文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多。还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油!
private transient volatile Node head; private transient volatile Node tail; // 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁 // 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1 private volatile int state; // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入 // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread;
static final class Node { // 标识节点当前在共享模式下 static final Node SHARED = new Node(); // 标识节点当前在独占模式下 static final Node EXCLUSIVE = null; // ================================================ 下面的几个int常量是给waitStatus用的 =============================================== // 代码此线程取消了争抢这个锁 static final int CANCELLED = 1; // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒 static final int SIGNAL = -1; // 本文不分析condition,所以略过吧,下一篇文章会介绍这个 static final int CONDITION = -2; // 同样的不分析,略过吧 static final int PROPAGATE = -3; // ================================================================================================================================ // 取值为上面的1、-1、-2、-3,或者0(以后会讲到) // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待, // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的. volatile int waitStatus; // ================================================================================================================================ // 用于阻塞队列 volatile Node prev; volatile Node next; // 这个就是线程本尊 volatile Thread thread; // 用于条件队列 Node nextWaiter; }
public void lock() { sync.lock(); }
// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。 // 否则,acquireQueued方法会将线程压到队列中 public final void acquire(int arg) { // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试 // 因为有可能直接就成功了呢,也就不需要进队列排队了, // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的) if (!tryAcquire(arg) && // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } }
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁 // 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state == 0 此时此刻没有线程持有锁 if (c == 0) { // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到, // 看看有没有别人在队列中等了半天了 if (!hasQueuedPredecessors() && // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了, // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_= // 因为刚刚还没人的,我判断过了 compareAndSetState(0, acquires)) { // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁 setExclusiveOwnerThread(current); return true; } } // 会进入这个else if分支,说明是重入了,需要操作:state=state+1 // 这里不存在并发问题 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁 // 回到上面一个外层调用方法继续看: // if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); return false; }
tryAcquire(arg) 如果返回false,那么代码将执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这个方法,首先需要执行: addWaiter(Node.EXCLUSIVE)
// 此方法的作用是把线程包装成node,同时进入到队列中 // 参数mode此时是Node.EXCLUSIVE,代表独占模式 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后 Node pred = tail; // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧) if (pred != null) { // 将当前的队尾节点,设置为自己的前驱 node.prev = pred; // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴 if (compareAndSetTail(pred, node)) { // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连, // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了 pred.next = node; // 线程入队了,可以返回了 return node; } } // 仔细看看上面的代码,如果会到这里, // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队) enq(node); return node; }
// 采用自旋的方式入队 // 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队, // 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的 private Node enq(final Node node) { for (;;) { Node t = tail; // 之前说过,队列为空也会进来这里 if (t == null) { // 初始化head节点 // 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的 // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢 if (compareAndSetHead(new Node())) // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了 // 这个时候有了head,但是tail还是null,设置一下, // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了 // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return // 所以,设置完了以后,继续for循环,下次就到下面的else分支了 tail = head; } else { // 下面几行,和上一个方法 addWaiter 是一样的, // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
tryAcquire(arg) 如果返回false,那么代码将执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这个方法
// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话, // 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false // 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列 // 所以当前节点可以去试抢一下锁 // 这里我们说一下,为什么可以去试试: // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node, // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程 // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试, // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头, // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 什么时候 failed 会为 true??? // tryAcquire() 方法抛异常的情况 if (failed) cancelAcquire(node); } }
// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?" // 第一个参数是前驱节点,第二个参数才是代表当前线程的节点 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true if (ws == Node.SIGNAL) return true; // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。 // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。 // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点, // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队, // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 仔细想想,如果进入到这个分支意味着什么 // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0 // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0 // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 这个方法返回 false,那么会再走一次 for 循序, // 然后再次进来此方法,此时会从第一个分支返回 true return false; }
// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的 // 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
下面我画了张图帮大家理一下思路,转载注明出处
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否完全释放锁 boolean free = false; // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
// 唤醒后继节点 // 从上面调用处知道,参数node是head头结点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 如果head节点当前waitStatus<0, 将其修改为0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒线程 LockSupport.unpark(s.thread); }
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 刚刚线程被挂起在这里了 // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态 return Thread.interrupted(); }
我们借Reentrantlock来看一下条件锁
Lock lock = new ReentrantLock(); Condition notFull = lock.newCondition(); Condition notEmpty = lock.newCondition();
final ConditionObject newCondition() { // 实例化一个 ConditionObject return new ConditionObject(); }
ConditionObject是AQS中的一个内部类,类似于之前提到的Node
public class ConditionObject implements Condition, java.io.Serializable { // 条件队列的第一个节点 private transient Node firstWaiter; // 条件队列的最后一个节点 private transient Node lastWaiter;
回顾一下Node的属性
// 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile int waitStatus; // 用于阻塞队列 volatile Node prev; volatile Node next; volatile Thread thread; // 用于条件队列 Node nextWaiter;
基本上,把上面那张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。
condition1.await()
方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;condition1.signal()
触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
上面的 2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法等,不过把这里弄懂是这节的主要目的。
// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly() // 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断 public final void await() throws InterruptedException { // 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态,
// interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态 if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的条件队列中 Node node = addConditionWaiter(); // 释放锁,返回值是释放锁之前的 state 值 // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 这里退出循环有两种情况,之后再仔细分析 // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了 // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后,将进入阻塞队列,等待获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
将节点加入到条件队列
// 将当前线程对应的节点入队,插入队尾 private Node addConditionWaiter() { Node t = lastWaiter; // 如果条件队列的最后一个节点取消了,将其清除出去 // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队? if (t != null && t.waitStatus != Node.CONDITION) { // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列 unlinkCancelledWaiters(); t = lastWaiter; } // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); // t 此时是 lastWaiter,队尾 // 如果队列为空 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
回到 wait 方法,节点入队了以后,会调用 int savedState = fullyRelease(node); 方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。
考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。
// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值 // 对于最简单的操作:先 lock.lock(),然后 condition1.await()。 // 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1 // 相应的,如果 lock 重入了 n 次,savedState == n // 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED
,这个已经入队的节点之后会被后继的节点”请出去“。
下面我们再回到await() 中,从 int savedState = fullyRelease(node); 后继续:
// 如果不在阻塞队列中,注意了,是阻塞队列 while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); // 这里可以先不用看了,等看到它什么时候被 unpark 再说 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION // 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列, // 这个方法就是判断 node 是否已经移动到阻塞队列了 final boolean isOnSyncQueue(Node node) { // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到 // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中 // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的) if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 如果 node 已经有后继节点 next 的时候(注意是next,阻塞队列独有的,不是条件队列的),那肯定是在阻塞队列了 if (node.next != null) return true; // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列 // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。 // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail, // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。 return findNodeFromTail(node); } // 从阻塞队列的队尾往前遍历,如果找到,返回 true private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
// 唤醒等待了最久的线程 // 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列 public final void signal() { // 调用 signal 方法的线程必须持有当前的独占锁 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
// 从条件队列队头往后遍历,找出第一个需要转移的 node // 因为前面我们说过,有些线程会取消排队,但是可能还在队列中 private void doSignal(Node first) { do { // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了 // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推 }
// 将节点从条件队列转移到阻塞队列 // true 代表成功转移 // false 代表在 signal 之前,节点已经取消了 final boolean transferForSignal(Node node) { // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消, // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点 // 否则,将 waitStatus 置为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // enq(node): 自旋进入阻塞队列的队尾 // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点 Node p = enq(node); int ws = p.waitStatus; // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释 // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1) if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节 LockSupport.unpark(node.thread); return true; }
signal 之后,回到刚刚await () 挂起的地方继续
while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0
有以下几种情况会让 LockSupport.park(this); 这句返回继续往下执行:
线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。
下面是await () 的中断处理部分(不了解中断的请去复习中断,不然可能会有点懵逼)
// 1. 如果在 signal 之前已经中断,返回 THROW_IE // 2. 如果是 signal 之后中断,返回 REINTERRUPT // 3. 没有发生中断,返回 0 private int checkInterruptWhileWaiting(Node node) { // Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true, // 同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
判断是 signal 调用之前中断的,还是 signal 之后发生的中断。
// 只有线程处于中断状态,才会调用此方法 // 如果需要的话,将这个已经取消等待的节点转移到阻塞队列 // 返回 true:如果此线程在 signal 之前被取消, final boolean transferAfterCancelledWait(Node node) { // 用 CAS 将节点状态设置为 0 // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 将节点放入阻塞队列 // 这里我们看到,即使中断了,依然会转移到阻塞队列 enq(node); return true; } // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0 // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成 // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断 while (!isOnSyncQueue(node)) Thread.yield(); return false; }
从刚刚await () 的 while (!isOnSyncQueue(node)) 循环后的地方继续
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); // 处理中断状态 if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。
处理中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
public final boolean await(long time, TimeUnit unit) throws InterruptedException { // 等待这么多纳秒 long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); // 当前时间 + 等待时长 = 过期时间 final long deadline = System.nanoTime() + nanosTimeout; // 用于返回 await 是否超时 boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { // 时间到啦 if (nanosTimeout <= 0L) { // 这里因为要 break 取消等待了。取消等待的话一定要调用 transferAfterCancelledWait(node) 这个方法 // 如果这个方法返回 true,在这个方法内,将节点转移到阻塞队列成功 // 返回 false 的话,说明 signal 已经发生,signal 方法将节点转移了。也就是说没有超时嘛 timedout = transferAfterCancelledWait(node); break; } // spinForTimeoutThreshold 的值是 1000 纳秒,也就是 1 毫秒 // 也就是说,如果不到 1 毫秒了,那就不要选择 parkNanos 了,自旋的性能反而更好 if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; // 得到剩余时间 nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; }
超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁。
AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。
对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。
countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 这也是老套路了,中断那一节说过了 if (Thread.interrupted()) throw new InterruptedException(); // state 为初始化的值。 // 也就是说,这个 if 返回 true,然后往里看 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
// 只有当 state == 0 的时候,这个方法才会返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 1. 入队 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { // 同上,只要 state 不等于 0,那么这个方法返回 -1 int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 2 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
经过第 1 步 addWaiter 入队以后
由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,将 head 的 waitStatus 值设置为 -1
public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true // 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了 // 将 state 减到 0 的那个操作才是最复杂的,继续往下吧 if (tryReleaseShared(arg)) { // 唤醒 await 的线程 doReleaseShared(); return true; } return false; }
// 调用这个方法的时候,state == 0 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了 if (ws == Node.SIGNAL) { // 将 head 的 waitStatue 设置为 0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
之后被唤醒的线程会回到await() 的阻塞的地方继续
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); // 2. 这里是下一步 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && // 1. 唤醒后这个方法返回 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
接下来,线程会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4 // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了 doReleaseShared(); } }
就这样,醒来的线程唤醒下一个线程,下一个线程唤醒下下个线程
原文:https://www.cnblogs.com/fatmanhappycode/p/12269340.html