并发编程之美系列目录:https://www.cnblogs.com/binarylei/p/9569428.html
在前两篇文章中,我们主要是分析在并发编程问题上,计算机硬件、操作系统和编程语言分别提供了那些支持:
有了这些基础,我们继续分析一下 AbstractQueuedSynchronizer(简称 AQS) 的源码实现。AQS 涉及以下主要几个知识点:
性能是否可以成为“重复造轮子”的理由呢?Java1.5 中 synchronized 性能不如 AQS,但 1.6 之后,synchronized 做了很多优化,将性能追了上来。显然性能不能重复造轮子的理由,因为性能问题优化一下就可以了,完全没必要“重复造轮子”。
在前面在介绍死锁问题的时候,我们知道可以通过破坏死锁产生的条件从而避免死锁,但这个方案 synchronized 没有办法解决。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态,也释放不了线程已经占有的资源。我们需要新的方案解决这问题。
如果我们重新设计一把互斥锁去解决这个问题,那该怎么设计呢?AQS 提供了以下方案:
这三种方案可以全面弥补 synchronized 的问题。这三个方案就是“重复造轮子”的主要原因,体现在 API 上,就是 Lock 接口的三个方法。详情如下:
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
推荐文章:锁原理 - 信号量 vs 管程:JDK 为什么选择管程?
并发编程的两大核心问题:一是互斥,即同一时刻只允许一个线程访问共享资源;二是同步,即线程之间的 "wait-notify" 机制。管程能够解决这两大问题。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
你可能认为,锁的可见性不是显而易见的问题吗?还真没这么简单,这涉及到 JMM 提供的 happens-before 原则。
synchronized 之所以能够保证有序性,是因为满足了 synchronized 规则,那 AQS 又满足了那条规则呢?如果我们仔细分析一下 JUC 并发工具类,可以发现一个通用化的实现模式:
说明: AQS 就是根据上述模式实现的,在 lock.lock() 获取锁时,会读取 volatile 变量,同时 lock.unlock() 释放锁时,会修改 volatile 变量。这样满足了 volatile 规则,所以 AQS 能够保证可见性。
volatile 最开始只能保证可见性,禁止指定重排这个语义是在 JSR-133 加强的。禁止指定重排指,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入。这样,volatile 读其实和 lock.lock() 具有相同的语义,volatile 写具有 lock.unlock() 语义。
CLH 队列锁是一种利用 CAS 实现的无锁队列。
AQS 中的同步队列在 CLH 的基础上做了改进。CLH 是单身链表,但 AQS 使用双向链表,但要注意的是目前并不存在双向链表的原子性算法,AQS 也保证 node.prev 域的原子性,并不能保证 node.next 域的原子性。如果通过 tail 反向遍历可以查找到所有的结点,但从 head 正向遍历则不一定了,node.next 域只是起辅助作用。
AQS 包含 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题。
AQS 使用共享变量 state 来管理锁的状态,很显然 state 必须使用 volatile 修辞,AQS 提供的如下三个方法来访问或修改同步状态:
思考1:state 为什么要提供 setState 和 compareAndSetState 两种修改状态的方法?
这个问题,关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState。
说明: 首先,需要注意的是:如果没有锁竞争,线程可以直接获取到锁,就不会进入同步队列。也就说,没有锁竞争时,同步队列(syncQueue)是空的,当存在锁竞争时,线程会进入到同步队列中。一旦进入到同步队列中,就会有线程切换。
同步队列特点:
标准的 CHL 无锁队列是单向链表,同步队列(syncQueue) 在 CHL 基础上做了改进:
volatile int waitStatus; // 结点状态
volatile Node prev; // 同步队列:互斥等待队列 Lock
volatile Node next; // 同步队列
volatile Thread thread; // 阻塞的线程
Node nextWaiter; // 等待队列:条件等待 Condition
Node 结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。变量 waitStatus 则表示当前 Node 结点的等待状态,共有 5 种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、INITIAL。
说明: 大致可以分为两种场景,Lock 和 Condition 两种。
AQS 支持阻塞、响应中断、锁超时、非阻塞获取锁四种场景,我们就以最常用的阻塞方式获取锁为例。
与获取锁有关的方法如下:
与释放锁有关的方法如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
说明: tryAcquire 尝试获取锁,如果成功,就不用进入同步队列。否则,就需要通过 acquireQueued 进入等待队列。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋修改前驱结点的状态为SIGNAL,然后挂起线程,直到被唤醒和抢占锁成功
final Node p = node.predecessor(); // p表示前驱结点
if (p == head && tryAcquire(arg)) { // 1. 抢占锁成功,唤醒该线程
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 2. 自旋修改前驱结点的状态为SIGNAL
parkAndCheckInterrupt()) // 3. 挂起当前线程,并判断是否被中断
interrupted = true;
}
} finally {
if (failed) // 异常则将结点状态设置为CANCELLED
cancelAcquire(node);
}
}
说明: acquireQueued 进入同步队列中,直到线程被唤醒。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
说明: 释放锁相比获取锁要简单一些,因为此时线程已经获取到锁,可以不使用 CAS 原子性操作。
private void unparkSuccessor(Node node) {
// 1. node表示需要删除的结点,将其状态重新设置为INITIAL。允许失败?
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 2. 这一段比较难理解,涉及到同步队列的线程安全问题,目前就记住一点就可以:
// node.prev是线程安全的,而node.next则不是线程安全的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 3. 唤醒同步线程,当然这个线程不一定能抢占到锁。比如非公平锁
if (s != null)
LockSupport.unpark(s.thread);
}
说明: unparkSuccessor 方法用于唤醒 node 的后继结点,有几个小细节需要关注一下:
管程的两个主要功能,我们已经分析了 Lock 是如何解决互斥问题,下面再看一下 Condition 是如何解决同步问题。条件同步 Condition 的典型用法如下:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
// 等待
public void conditionWait() throws InterruptedException {
lock.lock();
try {
while(条件不满足)
condition.await();
} finally {
lock.unlock();
}
}
// 通知
public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
Condition 等待队列(waitQueue)要比 Lock 同步队列(syncQueue)简单很多,最重要的原因是 waitQueue 的操作都是在获取锁的线程中执行,不存在数据竞争的问题。
ConditionObject 重要的方法说明:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1. 添加到等待队列中
int savedState = fullyRelease(node); // 2. 释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 3. 判断线程是否在同步队列中
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 重新竞争锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
说明: 条件同步调用 await 方法后主要完成以下几件事。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 如果唤醒失败,就一直向下唤醒
(first = firstWaiter) != null);
}
说明: doSignal 唤醒等待的线程,transferForSignal 都是真正唤醒等待线程的方法。如果该线程已经被唤醒或取消,则继续唤醒下一个线程。
final boolean transferForSignal(Node node) {
// 1. 结点的状态必须是CONDITION。如果是其它状态,则要么已经唤醒,或已经取消
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 2. 重新加入到同步队列中,竞争锁
Node p = enq(node);
int ws = p.waitStatus;
// 3. 设置前驱结点的状态为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
说明: transferForSignal 唤醒等待的线程,重新加入到 syncQueue 同步队列来竞争锁。
AQS 中同步队列是双向链表,node.prev 和 node.next 不可能同时通过 CAS 保证其原子性。AQS 中选择了 node.prev 前驱结点的原子性,而 node.next 后继结点则是辅助结点。
思考1:AQS 为什么选择 node.prev 前驱结点的原子性,而 node.next 后继结点则是辅助结点?
说明: 通过上图,前驱结点只需要一次原子性操作就可以,而后继结点则需要二次原子性操作,复杂性就会大提升,这就是 AQS 选择前驱结点进行原子性操作的原因。
思考2:AQS 明知道 node.next 有可见性问题,为什么还要设计成双向链表?
唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。
以下两种情况,则认为 node.next 不可靠,需要从 tail 反向遍历。
思考3:AQS 同步队列什么时候删除结点?
总结: 物理删除,acquireQueued 自旋时会修改 node.prev。而逻辑删除,会先标记为 CANCELLED 状态,并修改 node.next。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; // ① 不存在锁竞争(因为还没有添加到链表中)
if (compareAndSetTail(t, node)) { // ② node.prev一定是对其它线程可见的
t.next = node; // ③ 可能存在并发操作,此时t.next=null
return t;
}
}
}
}
说明: 线程进入等待队列时,node.prev 是绝对线程安全的,但 node.next 就不一定了。如果线程只好在此时被唤醒,unparkSuccessor 通过 prev.next 就无法查找到该结点,只能反向遍历。
删除结点,这里指的是物理删除,修改 node.prev 域,只有在 prev 链中断开才能真正的删除。
(1)acquireQueued 获取锁时,需要重新设置头结点,会将 node.prev=null:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
说明: head 头结点只有在获取锁才会更新,所以不需要 CAS 原子性操作。
(2)shouldParkAfterFailedAcquire(cancelAcquire 方法类似) 清除取消结点代码如下:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
说明: node 结点所在的线程自旋时,修改自身的属性 node.prev 不存在锁竞争,但如果修改其它结点的属性(eg pred.next)则会存在锁竞争。
取消结点,这里指的是逻辑删除,将结点的状态标记为 CANCELLED,同时修改 node.next 域。取消结点操作比较复杂,因为要考虑取消的结点可能为尾结点、中间结点、头结点三种情况。
private void cancelAcquire(Node node) {
node.thread = null;
// 1. 查找前驱结点:忽略CANCELLED结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 2. 逻辑删除:此时已经无法从同步队列中查找到该结点,直到其它线程获取锁时会真正物理删除
node.waitStatus = Node.CANCELLED;
// 3.1 被取消的结点是尾结点:直接将 tail.next=null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 3.2 被取消的结点是中间结点:前驱结点必须改成SIGNAL状态,否则直接唤醒线程
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
// 3.3 被取消的结点是头结点:直接唤醒后继结点
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
说明: cancelAcquire 唯一可以确定的是将 node.waitStatus 修改成 CANCELLED。如果被取消的是头结点时,需要唤醒后继结点。至于取消的结点是尾结点或中间结点,并不能保证操作成功与否。
从上图可以看到:
总结: cancelAcquire 方法只是逻辑删除,将结点状态标记为 CANCELLED,同时可以修改 node.next 域。从这我们也可以看到为什么 unparkSuccessor 方法唤醒后继结点时,如果后继结点已经 CANCELLED,就需要从 tail 反向遍历结点,因为 next 域可能已经被修改。
唤醒结点时,需要查找后继有效结点。如果 next=null 或 next.waitStatus>0 则需要反向遍历。
private void unparkSuccessor(Node node) {
...
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
}
说明: node.next 是辅助结点,存在可见性问题,但 node.prev 一定可以遍历所有的结点。
参考:
每天用心记录一点点。内容也许不重要,但习惯很重要!
锁原理 - AQS 源码分析:AQS 为什么可以使用 volatile 实现锁的语义
原文:https://www.cnblogs.com/binarylei/p/12555166.html