首页 > 其他 > 详细

AQS源码解析

时间:2020-02-11 01:05:49      阅读:87      评论:0      收藏:0      [点我收藏+]

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多。还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油!

 

AQS 结构

 

属性

private transient volatile Node head;

private transient volatile Node tail;

// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int state;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread;

技术分享图片

 


 

内部类

Node

static final class Node {
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ================================================ 下面的几个int常量是给waitStatus用的 ===============================================
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    // 本文不分析condition,所以略过吧,下一篇文章会介绍这个
    static final int CONDITION = -2;
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // ================================================================================================================================

    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    // ps: 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的.
    volatile int waitStatus;
    // ================================================================================================================================
    
    // 用于阻塞队列
    volatile Node prev;
    volatile Node next;

    // 这个就是线程本尊
    volatile Thread thread;
    
    // 用于条件队列
    Node nextWaiter;
}

 


 

获取独占锁

 

lock () (摘自Reentrantlock)

public void lock() {        
    sync.lock();     
}

 

acquire (int arg)

// 我们看到,这个方法,如果tryAcquire(arg) 返回true, 也就结束了。
// 否则,acquireQueued方法会将线程压到队列中
public final void acquire(int arg) { // 首先调用tryAcquire(1)一下,名字上就知道,这个只是试一试
    // 因为有可能直接就成功了呢,也就不需要进队列排队了,
    // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待(又是挂起,又是等待被唤醒的)
    if (!tryAcquire(arg) &&
        // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}        

 

tryAcquire (int acquires) (实现来自ReentrantLock)

// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    // state == 0 此时此刻没有线程持有锁
    if (c == 0) {
        // 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
        // 看看有没有别人在队列中等了半天了
        if (!hasQueuedPredecessors() &&
            // 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
            // 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
            // 因为刚刚还没人的,我判断过了
        compareAndSetState(0, acquires)) {

            // 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 会进入这个else if分支,说明是重入了,需要操作:state=state+1
    // 这里不存在并发问题
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
    // 回到上面一个外层调用方法继续看:
    // if (!tryAcquire(arg) 
    //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    //     selfInterrupt();
    return false;
}                

 

tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法,首先需要执行: addWaiter(Node.EXCLUSIVE) 

 

addWaiter(Node.EXCLUSIVE) 

// 此方法的作用是把线程包装成node,同时进入到队列中
// 参数mode此时是Node.EXCLUSIVE,代表独占模式
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
    Node pred = tail;

    // tail!=null => 队列不为空(tail==head的时候,其实队列是空的,不过不管这个吧)
    if (pred != null) {
        // 将当前的队尾节点,设置为自己的前驱 
        node.prev = pred;
        // 用CAS把自己设置为队尾, 如果成功后,tail == node 了,这个节点成为阻塞队列新的尾巴
        if (compareAndSetTail(pred, node)) {
            // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
            // 上面已经有 node.prev = pred,加上下面这句,也就实现了和之前的尾节点双向连接了
            pred.next = node;
            // 线程入队了,可以返回了
            return node;
        }
    }
    // 仔细看看上面的代码,如果会到这里,
    // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
    enq(node);
    return node;
}

 

enq (final Node node)

// 采用自旋的方式入队
// 之前说过,到这个方法只有两种可能:等待队列为空,或者有线程竞争入队,
// 自旋在这边的语义是:CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 之前说过,队列为空也会进来这里
        if (t == null) { // 初始化head节点
            // 细心的读者会知道原来 head 和 tail 初始化的时候都是 null 的
            // 还是一步CAS,你懂的,现在可能是很多线程同时进来呢
            if (compareAndSetHead(new Node()))
                // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了

                // 这个时候有了head,但是tail还是null,设置一下,
                // 把tail指向head,放心,马上就有线程要来了,到时候tail就要被抢了
                // 注意:这里只是设置了tail=head,这里可没return哦,没有return,没有return
                // 所以,设置完了以后,继续for循环,下次就到下面的else分支了
                tail = head;
        } else {
            // 下面几行,和上一个方法 addWaiter 是一样的,
            // 只是这个套在无限循环里,反正就是将当前线程排到队尾,有线程竞争的话排不上重复排
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 

tryAcquire(arg) 如果返回false,那么代码将执行   acquireQueued(addWaiter(Node.EXCLUSIVE), arg)   这个方法

 

acquireQueued(final Node node, int arg)

// 下面这个方法,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,
// 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
// 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
            // 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
            // 所以当前节点可以去试抢一下锁
            // 这里我们说一下,为什么可以去试试:
            // 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
            // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
            // 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
            // tryAcquire已经分析过了, 忘记了请往前看一下,就是简单用CAS试操作一下state
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
            // 要么就是tryAcquire(arg)没有抢赢别人,继续往下看
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 什么时候 failed 会为 true???
        // tryAcquire() 方法抛异常的情况
        if (failed)
            cancelAcquire(node);
    }
}

 

shouldParkAfterFailedAcquire(Node pred, Node node)

// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
    if (ws == Node.SIGNAL)
        return true;

    // 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
    // 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
    // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
    // 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
    // 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 仔细想想,如果进入到这个分支意味着什么
        // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
        // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
        // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
        // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 这个方法返回 false,那么会再走一次 for 循序,
    //     然后再次进来此方法,此时会从第一个分支返回 true
    return false;
}

 

parkAndCheckInterrupt

// 这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
// 这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

 

下面我画了张图帮大家理一下思路,转载注明出处

 附录

技术分享图片

 


 

 

释放独占锁

 

unlock () (摘自Reentrantlock)

public void unlock() {     
    sync.release(1); 
}

 

release (int arg)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

 

tryRelease (int arg)(实现来自Reentrantlock)

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全释放锁
    boolean free = false;
    // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

 

unparkSuccessor (Node node)

// 唤醒后继节点
// 从上面调用处知道,参数node是head头结点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果head节点当前waitStatus<0, 将其修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
    // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

 

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

 

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 刚刚线程被挂起在这里了
    // interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
    return Thread.interrupted();
}

 


 

条件锁

 

我们借Reentrantlock来看一下条件锁

 

Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition();
Condition notEmpty = lock.newCondition();

 

final ConditionObject newCondition() {
    // 实例化一个 ConditionObject
    return new ConditionObject();
}

 

ConditionObject是AQS中的一个内部类,类似于之前提到的Node

 

ConditionObject 

public class ConditionObject implements Condition, java.io.Serializable {
        // 条件队列的第一个节点
        private transient Node firstWaiter;
        // 条件队列的最后一个节点
        private transient Node lastWaiter;

 技术分享图片

 

 回顾一下Node的属性

// 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile int waitStatus; 
// 用于阻塞队列
volatile Node prev;
volatile Node next;
volatile Thread thread;
// 用于条件队列
Node nextWaiter;

 

先捋一下简单流程

基本上,把上面那张图看懂,你也就知道 condition 的处理流程了。所以,我先简单解释下这图,然后再具体地解释代码实现。

 

  1. 条件队列和阻塞队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到阻塞队列中去的;
  2. 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;
  3. 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
  4. 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到阻塞队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

 

上面的 2->3->4 描述了一个最简单的流程,没有考虑中断、signalAll、还有带有超时参数的 await 方法等,不过把这里弄懂是这节的主要目的。

 


 

 

条件锁的await()

 

await ()(实现来自AQS中的ConditionObject内部类中)

技术分享图片

// 首先,这个方法是可被中断的,不可被中断的是另一个方法 awaitUninterruptibly()
// 这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断
public final void await() throws InterruptedException {
    // 老规矩,既然该方法要响应中断,那么在最开始就判断中断状态,
// interrupted()的内部实现是调用的当前线程的isInterrupted(),并且会重置当前线程的中断状态
if (Thread.interrupted()) throw new InterruptedException(); // 添加到 condition 的条件队列中 Node node = addConditionWaiter(); // 释放锁,返回值是释放锁之前的 state 值 // await() 之前,当前线程是必须持有锁的,这里肯定要释放掉 int savedState = fullyRelease(node); int interruptMode = 0; // 这里退出循环有两种情况,之后再仔细分析 // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了 // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒后,将进入阻塞队列,等待获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

 

addConditionWaiter() 

将节点加入到条件队列

// 将当前线程对应的节点入队,插入队尾
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果条件队列的最后一个节点取消了,将其清除出去
    // 为什么这里把 waitStatus 不等于 Node.CONDITION,就判定为该节点发生了取消排队?
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // node 在初始化的时候,指定 waitStatus 为 Node.CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    // t 此时是 lastWaiter,队尾
    // 如果队列为空
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

 

回到 wait 方法,节点入队了以后,会调用  int savedState = fullyRelease(node);  方法释放锁,注意,这里是完全释放独占锁(fully release),因为 ReentrantLock 是可以重入的。

考虑一下这里的 savedState。如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

 

fullyRelease (Node node)

// 首先,我们要先观察到返回值 savedState 代表 release 之前的 state 值
// 对于最简单的操作:先 lock.lock(),然后 condition1.await()。
// 那么 state 经过这个方法由 1 变为 0,锁释放,此方法返回 1
// 相应的,如果 lock 重入了 n 次,savedState == n
// 如果这个方法失败,会将节点设置为"取消"状态,并抛出异常 IllegalMonitorStateException
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

如果一个线程在不持有 lock 的基础上,就去调用 condition1.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

 

下面我们再回到await() 中,从   int savedState = fullyRelease(node);  后继续:

// 如果不在阻塞队列中,注意了,是阻塞队列
while (!isOnSyncQueue(node)) {
    // 线程挂起
    LockSupport.park(this);

    // 这里可以先不用看了,等看到它什么时候被 unpark 再说
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

 

isOnSyncQueue(Node node)

// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 前面我提到,signal 的时候需要将节点从条件队列移到阻塞队列,
// 这个方法就是判断 node 是否已经移动到阻塞队列了
final boolean isOnSyncQueue(Node node) {

    // 移动过去的时候,node 的 waitStatus 会置为 0,这个之后在说 signal 方法的时候会说到
    // 如果 waitStatus 还是 Node.CONDITION,也就是 -2,那肯定就是还在条件队列中
    // 如果 node 的前驱 prev 指向还是 null,说明肯定没有在 阻塞队列(prev是阻塞队列链表中使用的)
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果 node 已经有后继节点 next 的时候(注意是next,阻塞队列独有的,不是条件队列的),那肯定是在阻塞队列了
    if (node.next != null) 
        return true;

    // 下面这个方法从阻塞队列的队尾开始从后往前遍历找,如果找到相等的,说明在阻塞队列,否则就是不在阻塞队列

    // 可以通过判断 node.prev() != null 来推断出 node 在阻塞队列吗?答案是:不能。
    // 这个可以看上篇 AQS 的入队方法,首先设置的是 node.prev 指向 tail,
    // 然后是 CAS 操作将自己设置为新的 tail,可是这次的 CAS 是可能失败的。

    return findNodeFromTail(node);
}

// 从阻塞队列的队尾往前遍历,如果找到,返回 true
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

 


 

 

条件锁的signal

 

signal ()(实现来自AQS中的ConditionObject内部类中)

// 唤醒等待了最久的线程
// 其实就是,将这个线程对应的 node 从条件队列转移到阻塞队列
public final void signal() {
    // 调用 signal 方法的线程必须持有当前的独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

 

doSignal(Node first)

// 从条件队列队头往后遍历,找出第一个需要转移的 node
// 因为前面我们说过,有些线程会取消排队,但是可能还在队列中
private void doSignal(Node first) {
    do {
          // 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
        // 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
      // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
}

 

transferForSignal(Node node)

// 将节点从条件队列转移到阻塞队列
// true 代表成功转移
// false 代表在 signal 之前,节点已经取消了
final boolean transferForSignal(Node node) {

    // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,
    // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点
    // 否则,将 waitStatus 置为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // enq(node): 自旋进入阻塞队列的队尾
    // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。唤醒之后会怎么样,后面再解释
    // 如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节
        LockSupport.unpark(node.thread);
    return true;
}

 

signal 之后,回到刚刚await () 挂起的地方继续

 

while (!isOnSyncQueue(node)) {
    // 线程挂起
    LockSupport.park(this);

    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

 

先解释下 interruptMode。interruptMode 可以取值为 REINTERRUPT(1),THROW_IE(-1),0

  1. REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
  2. THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
  3. 0 :说明在 await 期间,没有发生中断

 

有以下几种情况会让 LockSupport.park(this); 这句返回继续往下执行:

  1. 常规路径。signal -> 转移节点到阻塞队列 -> 获取了锁(unpark)
  2. 线程中断。在 park 的时候,另外一个线程对这个线程进行了中断
  3. signal 的时候我们说过,转移以后的前驱节点取消了,或者对前驱节点的CAS操作失败了
  4. 假唤醒。这个也是存在的,和 Object.wait() 类似,都有这个问题

 

线程唤醒后第一步是调用 checkInterruptWhileWaiting(node) 这个方法,此方法用于判断是否在线程挂起期间发生了中断,如果发生了中断,是 signal 调用之前中断的,还是 signal 之后发生的中断。

 

下面是await () 的中断处理部分(不了解中断的请去复习中断,不然可能会有点懵逼)

 

checkInterruptWhileWaiting (node)

// 1. 如果在 signal 之前已经中断,返回 THROW_IE
// 2. 如果是 signal 之后中断,返回 REINTERRUPT
// 3. 没有发生中断,返回 0
private int checkInterruptWhileWaiting(Node node) {
    // Thread.interrupted():如果当前线程已经处于中断状态,那么该方法返回 true,
    // 同时将中断状态重置为 false,所以,才有后续的 重新中断(REINTERRUPT) 的使用。
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

 

transferAfterCancelledWait(Node node)

判断是 signal 调用之前中断的,还是 signal 之后发生的中断。

// 只有线程处于中断状态,才会调用此方法
// 如果需要的话,将这个已经取消等待的节点转移到阻塞队列
// 返回 true:如果此线程在 signal 之前被取消,
final boolean transferAfterCancelledWait(Node node) {
    // 用 CAS 将节点状态设置为 0 
    // 如果这步 CAS 成功,说明是 signal 方法之前发生的中断,因为如果 signal 先发生的话,signal 中会将 waitStatus 设置为 0
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 将节点放入阻塞队列
        // 这里我们看到,即使中断了,依然会转移到阻塞队列
        enq(node);
        return true;
    }

    // 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
    // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
    // 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

 

从刚刚await () 的 while (!isOnSyncQueue(node))  循环后的地方继续

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
if (node.nextWaiter != null) 
    unlinkCancelledWaiters();
// 处理中断状态
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

 

acquireQueued(node, savedState) 的返回值就是代表线程是否被中断。如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。

 

reportInterruptAfterWait(int interruptMode)

处理中断状态

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

 


 

带超时机制的await ()

public final boolean await(long time, TimeUnit unit)
        throws InterruptedException {
    // 等待这么多纳秒
    long nanosTimeout = unit.toNanos(time);
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 当前时间 + 等待时长 = 过期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    // 用于返回 await 是否超时
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 时间到啦
        if (nanosTimeout <= 0L) {
            // 这里因为要 break 取消等待了。取消等待的话一定要调用 transferAfterCancelledWait(node) 这个方法
            // 如果这个方法返回 true,在这个方法内,将节点转移到阻塞队列成功
            // 返回 false 的话,说明 signal 已经发生,signal 方法将节点转移了。也就是说没有超时嘛
            timedout = transferAfterCancelledWait(node);
            break;
        }
        // spinForTimeoutThreshold 的值是 1000 纳秒,也就是 1 毫秒
        // 也就是说,如果不到 1 毫秒了,那就不要选择 parkNanos 了,自旋的性能反而更好
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 得到剩余时间
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

超时的思路还是很简单的,不带超时参数的 await 是 park,然后等待别人唤醒。而现在就是调用 parkNanos 方法来休眠指定的时间,醒来后判断是否 signal 调用了,调用了就是没有超时,否则就是超时了。超时的话,自己来进行转移到阻塞队列,然后抢锁。

 


 

 

获取共享锁

 

捋一下简单流程

AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。

 

await ()(摘自CountDownLatch)

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

 

acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 这也是老套路了,中断那一节说过了
    if (Thread.interrupted())
        throw new InterruptedException();

    // state 为初始化的值。
    // 也就是说,这个 if 返回 true,然后往里看
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }

 

doAcquireSharedInterruptibly (int arg)

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 1. 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 同上,只要 state 不等于 0,那么这个方法返回 -1
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 2
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

经过第 1 步 addWaiter 入队以后

技术分享图片

 

 由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,将 head 的 waitStatus 值设置为 -1

技术分享图片

 

 技术分享图片

 

 

countDown ()

public void countDown() {
    sync.releaseShared(1);
}

 

releaseShared (int arg)

public final boolean releaseShared(int arg) {
    // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
    // 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了
    // 将 state 减到 0 的那个操作才是最复杂的,继续往下吧
    if (tryReleaseShared(arg)) {
        // 唤醒 await 的线程
        doReleaseShared();
        return true;
    }
    return false;
}

 

doReleaseShared()

// 调用这个方法的时候,state == 0
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
            if (ws == Node.SIGNAL) {
                // 将 head 的 waitStatue 设置为 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                continue;                
        }
        if (h == head)                   
            break;
    }
}

之后被唤醒的线程会回到await() 的阻塞的地方继续

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 2. 这里是下一步
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 1. 唤醒后这个方法返回
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

接下来,线程会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
    // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
            doReleaseShared();
    }
}

就这样,醒来的线程唤醒下一个线程,下一个线程唤醒下下个线程

 

如果你能看到这里,那么恭喜你,我要送你一句话:

技术分享图片

 

 


 

AQS源码解析

原文:https://www.cnblogs.com/fatmanhappycode/p/12269340.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!