首页 > 其他 > 详细

J.U.C之AbstractQueuedSynchronizer 抽象队列同步器

时间:2020-09-15 23:47:01      阅读:61      评论:0      收藏:0      [点我收藏+]

在AQS源码中我们会看到这样一段注释:

* The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks.

注释说AQS 是CLH 锁的一个变种,CLH 锁通常用于自旋锁.

AQS 是J.U.C中用来构建锁和其他同步组件的基础框架类,在java se5之前,我们主要依靠synchronized关键字实现锁功能,在se5之后加入了更多类型的同步组件,这些组件虽然失去了像synchronize关键字隐式加锁解锁的便捷性,但是却拥有了锁获取和释放的可操作性,可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性,需要注意的是synchronized同步块执行完成或者遇到异常是锁会自动释放,而同步组件必须调用类似unlock()方法释放锁,因此在finally块中释放锁

Node{
    static final Node SHARED = new Node();//共享模式下的等待
    static final Node EXCLUSIVE = null;   //独占模式下的等待
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;//等待状态的线程引用
    Node nextWaiter;
}

Node是构建AQS的基础,通过Node的数据结构来看AQS中包含一包含前驱和后继,并且引用当前处于等待的线程的一个双向链表,是一个FIFO队列。而AQS就是通过头尾节点指针来管理这个双向链表,同时实现在获取锁失败时的线程入链尾,在释放锁时对同步队列中的线程进行通知等核心动作。锁的获取和释放其实就对应着节点的入队和出队操作。其中waitStatus状态值如下:

属性 备注
waitStatus 1. CANCELLED 值为1 indicate thread has cancelled 表示线程已被取消
2. SIGNAL 值为-1 indicate successor‘s thread needs unparking 后继节点包含的线程需要运行,
也就是unpark
3. CONDITION 值为-2 indicate thread is waiting on condition 当前节点在等待condition,
也就是在condition队列中
4. PROPAGATE 值为-3 the next acquireShared should unconditionally propagate
后续的acquireShared 能够得以执行
5. 0 当前节点在sync队列中,等待着获取锁
nextWaiter 存储condition队列中的后继节点
SHARED/EXCLUSIVE 用于标记一个节点在共享模式下等待/独占模式下等待

接下来看AQS独占锁时的获取和释放源码:

独占锁时获取锁:

/**
 * 尝试获取独占锁,不响应中断.  首先通过tryAcquire尝试获取1次锁,如果成功则返回,否则,
 * 将线程包装成node入队列
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire方法流程

首先通过子类判断是否获取了锁,如果获取了就什么也不干。

如果没有获取锁、通过线程创建节点加入同步队列的队尾。

当线程在同步队列中不断的通过自旋去获取同步状态,如果获取了锁,就把其设为同步队列中的头节点,否则在同步队列中不停的自旋等待获取同步状态。

如果在获取同步状态的过程中被中断过最后自行调用interrupted方法进行中断操作。

/**
 * 在队列中获取此节点的锁,不响应中断. 
 * 在队列中会不断检测是否为head的直接后继,并尝试获取锁,
 * 如果获取失败,则会通过LockSupport阻塞当前线程,直至被释放锁的线程唤醒或者被中断,随后再次尝试获取锁,如此反复.
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
/* 阻塞并检测当前线程是否中断
*/
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
/**
 * 检测更新获锁失败的节点的状态,如果需要阻塞,则返回true.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)//当前驱节点释放后,这个节点就会去尝试获取状态
        /*
         * 前驱节点设置为SIGNAL状态,在释放锁的时候会唤醒后继节点,
         * 所以后继节点(也就是当前节点)现在可以阻塞自己.
         */
        return true;
    if (ws > 0) {//前驱节点被取消
        /*
         * 循环尝试找到没有被取消的前驱作为当前node的前驱
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 等待状态为0或者PROPAGATE(-3),设置前驱的等待状态为SIGNAL,
         * 并且之后会回到循环再次重试获取锁.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
/**
 * 唤醒后继节点.
 */
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;//获取头节点的状态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);//通过CAS将头节点的状态设置为初始状态

    /*
     * 通常要唤醒的线程所在节点为当前节点的直接后继,如果后继被取消或为null
     * 则会反向从尾部节点回溯到第一个没有被cancel的节点作为唤醒线程所在节点
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}
/**
 * 新增1个给定模式的节点.
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
     /**
     * 通过循环+CAS在队列中成功插入一个节点后返回
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

独占锁时释放锁:

首先子类自定义的方法如果释放了同步状态,如果头节点不为空并且头节点的等待状态不为0就唤醒其后继节点。主要依赖的就是子类自定义实现的释放操作。

/**
 * 排他锁释放. 
 * This method can be used to implement method {@link Lock#unlock}.
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

AQS共享锁获取和释放:

共享模式下获取锁

/**
 * 获取共享锁,不响应中断. 尝试获取1次共享锁,如果成功则返回.
 * 否则包装成节点入队列, 然后不断的阻塞循环尝试获取锁,直到tryAccquireShared获取成功.
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) //交给子类实现
        doAcquireShared(arg);
}
/**
 * 获取共享锁.将共享节点加入队列Acquires in shared uninterruptible mode.
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                //一旦共享获取成功,设置新的头结点,并且唤醒后继线程
                int r = tryAcquireShared(arg);
                if (r >= 0) {//大于0代表获取到了
                    setHeadAndPropagate(node, r);//设置为头节点并且如果有多余资源一并唤醒
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //判断线程是否可以进行休息如果可以休息就调用park方法
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
/**
 * 设置队列头节点, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 备份老的头节点
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *     or we don‘t know, because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups, but only when there are multiple
     * racing acquires/releases, so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||//大于0代表还有其他资源一并可以唤醒
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
/**
 * 共享模式下释放锁
 */
private void doReleaseShared() {
    /*
     * 此外,我们必须循环以防新节点加入
     * 同样不像其他地方使用
     * unparkSuccessor的方式,我们需要先知道是否cas操作失败,如是,则重新检测
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //等待获取锁
                    continue;            // loop to recheck cases
                unparkSuccessor(h);  //唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//-3
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

共享模式下释放锁

/**
 * 共享模式释放锁. 
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

J.U.C之AbstractQueuedSynchronizer 抽象队列同步器

原文:https://www.cnblogs.com/oxf5deb3/p/13676243.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!