首页 > 其他 > 详细


时间:2015-06-06 23:31:57      阅读:214      评论:0      收藏:0      [点我收藏+]




连续看了好几天的JUC相关的源码,现在脑袋真有点晕乎乎的.加上这个类的代码确实有点多,这个就不在整理这个类的分析结果,就直接把代码copy了,并附上一张图.(这里只贴出分析过的那部分代码) 如果有朋友需要一起探讨的,留言就好了.

public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() {

    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;

        static final int CANCELLED = 1;

        static final int SIGNAL = -1;

        static final int CONDITION = -2;

        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

         * Returns true if node is waiting in shared mode.
        final boolean isShared() {
            return nextWaiter == SHARED;

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
                return p;

        Node() {    // Used to establish initial head or SHARED marker

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;

     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node‘s predecessor 返回插入节点的前继节点
    // (也就是所有的线程都把各自要插入的节点的前继节点设置为了队列的尾节点,初看上去不应该这样,但是莫慌)
    private Node enq(final Node node) {
        for (; ; ) {                                          //
            Node t = tail;                                  //
            if (t == null) { // Must initialize             //
                if (compareAndSetHead(new Node()))          //a
                    tail = head;                            //b
            } else {
                node.prev = t;                              //c
                if (compareAndSetTail(t, node)) {           //d
                    t.next = node;                          //e
                    return t;

     * Creates and enqueues node for current thread and given mode.
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
    //参数的值那么是 Node.EXCLUSIVE 要么是 Node.SHARED
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure
        // 下面的代码会先尝试以最快的方式入队列,同时如果失败就会以常规方式入队列作为后备计划
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
        enq(node);                              //a
        return node;

     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     * @param node the node
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;

     * Wakes up node‘s successor, if one exists.
     * @param node the node
    private void unparkSuccessor(Node node) {
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
        int ws = node.waitStatus;
        //如果当且节点对应的线程正处于 Node.SIGNAL OR Node.CONDITION OR Node.PROPAGATE 三个状态之一
        //则通过CAS操作把状态更改为0 如果更改失败也没有关系
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
        Node s = node.next;

        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;

        if (s != null)

     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
    //  多线程,开始状态为0的情况                      :有一个线程把状态更改为了PROPAGATE,然后退出方法.其他线程不做任何改变然后退出方法
    //  多线程,开始状态既不是SIGNAL也不是0的情况        :所有线程不做什么改变然后退出方法
    //  单线程,开始状态为SIGNAL的情况                 :头节点状态被更改为0,且执行了unparkSuccessor操作
    //  单线程,开始状态为0的情况;                     :头节点状态被更改为PROPAGATE
    //  单线程,开始状态既不是SIGNAL也不是0的情况        :线程不做什么改变然后退出方法
    //  如果头节点的状态是SIGNAL,则把状态更改为0,同时执行unparkSuccessor操作
    //  如果头节点状态是0,则把状态更改为PROPAGATE
    //  如果是其他状态,什么也不改变
    private void doReleaseShared() {
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
        for (; ; ) {
            Node h = head;

            if (h != null && h != tail) {

                int ws = h.waitStatus;

                if (ws == Node.SIGNAL) {

                    //把h节点的状态更改为0  如果有多个线程同时执行到这里,且获取到的头节点都是同一个节点,那么只会有一个线程执行成功.
                    //成功更改状态的那个线程就会执行 取消阻塞h节点的后继节点 的操作,然后执行a行代码.失败的线程则又会进入循环
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases

                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS

            if (h == head)                   // loop if head changed         //a


     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     * @param node      the node
     * @param propagate the return value from a tryAcquireShared
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below

        if (propagate > 0
                || h == null
                || h.waitStatus < 0
                || (h = head) == null
                || h.waitStatus < 0
                ) {

            Node s = node.next;

            if (s == null || s.isShared())

    // Utilities for various versions of acquire

     * Cancels an ongoing attempt to acquire.
     * @param node the node
    private void cancelAcquire(Node node) {
        // Ignore if node doesn‘t exist
        if (node == null)

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;

        // 图A  
        //循环条件是 状态为CANCELLED.
        // pred 指向的是 从输入节点的前继节点开始向头节点遍历,遇到的第一个非CANCELLED状态的节点
        // 输入节点的前继节点指向 pred 节点
        // 如果多线程同时执行这个循环,会导致node.prev的值不一致,
        // 所以AQS框架应该会在外部进行限制,使得不会有多个线程同时执行这个方法.呆会会看到这个限制
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        // 上面的分析得出不会有多个线程同时执行这个方法,但是有可能多个线程同时在执行compareAndSetTail操作.
        // 所以如果执行该方法的线程在执行compareAndSetTail时有可能返回false.而如果返回false,则进入到else
        if (node == tail && compareAndSetTail(node, pred)) {   //a
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred‘s next-link
            // so it will get one. Otherwise wake it up to propagate.
            //  输入节点不是尾节点
            //  输入节点是尾节点,但更新尾节点的值时失败.(这种情况的话,node节点又变为了不是尾节点)
            //  所以无论是上述哪一种情况,执行到这里时,node节点肯定不是尾节点了(而此时的尾节点到底是在node节点前还是后呢?)
            int ws;


            if (pred != head
                    && ( (ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                    && pred.thread != null
                    ) {
                //  pred不是头节点 且 pred节点的状态是SIGNAL(如果不是SIGNAL,则要求成功把状态更改为SIGNAL) 且 pred节点的线程不是null
                Node next = node.next;

                if (next != null && next.waitStatus <= 0)
                    //执行条件是 此时的node节点有后继节点 且 后继节点的状态是非CANCELLED状态
                    compareAndSetNext(pred, predNext, next);
            } else {


            node.next = node; // help GC





评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有