AQS类全称AbstractQueuedSynchronizer,Java在AQS类中提供了一系列的模板代码,用来实现Java中一系列的同步工具,很多同步工具都是通过内部类继承自AQS类,通过重写AQS类的方法来实现的,如ReentrantLock类、ReentrantReadWriteLock等。
AQS依赖内部的队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
队列的基本结构如图所示:
AQS包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点,AQS提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update)来保证加入队列的过程的线程安全,它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
将节点加入到队列的过程如图所示:
队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点;
设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。
public final void acquire(int arg) {
//首先交由子类实现的tryAcquire方法去尝试加锁,当加锁失败后将线程放入FIFO等待队列,并再次尝试加锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其主要逻辑是:首先调用自定义实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以自旋的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode) {
//创建一个节点
Node node = new Node(Thread.currentThread(), mode);
//获取尾节点
Node pred = tail;
//判断是否有尾节点,即判断链表是否初始化了
if (pred != null) {
//将尾节点设置为新节点的前置节点
node.prev = pred;
//设置尾节点
if (compareAndSetTail(pred, node)) {
//将老尾节点的后置节点设置为新节点
pred.next = node;
return node;
}
}
//链表未初始化或者设置尾节点失败
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋,确保成功
for (;;) {
//尾节点
Node t = tail;
//无尾节点,即链表未初始化
if (t == null) {
//创建链表
if (compareAndSetHead(new Node()))
tail = head;
} else {
//链表已经初始化,将node节点前置设置为尾节点
node.prev = t;
//设置尾节点
if (compareAndSetTail(t, node)) {
//将老尾节点的后置节点设置为新节点
t.next = node;
return t;
}
}
}
}
简而言之,addWaiter方法是将获取锁失败的线程添加至等待链表的尾节点;
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取node前置节点
final Node p = node.predecessor();
//判断是否为头节点并且尝试加锁
if (p == head && tryAcquire(arg)) {
//获取锁成功设置头节点为当前节点
setHead(node);
//释放头节点后继指向指针
p.next = null; // help GC
//标志加锁成功
failed = false;
return interrupted;
}
//判断当前线程是否中断并且将线程阻塞放入等待队列
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//加锁失败
if (failed)
//取消竞争
cancelAcquire(node);
}
}
独占锁获取流程如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//首先判断head节点不为空,且节点状态不为初始状态
if (h != null && h.waitStatus != 0)
//唤醒head节点的后继节点
unparkSuccessor(h);
//返回成功
return true;
}
return false;
}
该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。
public final void acquireShared(int arg) {
//交由子类tryAcquireShared方法尝试加锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//首先将线程加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
//获取当前线程的上一个节点
final Node p = node.predecessor();
if (p == head) {
//当p是头节点时,再次尝试加锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//加锁成功,设置头节点为当前线程,并且唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//当p不为头节点,或者加锁失败时
//判断是否应该需要阻塞当前线程,需要的话阻塞当前线程并设置阻塞标志位为true
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享锁加锁流程大致与独占锁加锁流程一致,唯一不同的是,在共享锁获取锁成功后还会去唤醒当前线程的后继节点,核心代码如下:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don‘t know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// propagate大于0,代表可以继续acquire
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 判断后继节点是否存在,如果存在是否是共享模式的节点
// 然后进行共享模式的释放
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 只需要处理头节点和尾节点都存在,且队列内的节点总数超过1个的情况
if (h != null && h != tail) {
int ws = h.waitStatus;
// 两种模式下都需要SIGNAL信号来判断是否唤醒后继节点
if (ws == Node.SIGNAL) {
// 如果CAS操作失败了就继续循环处理
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
continue;
}
// CAS操作成功后,就将后继节点解除阻塞
unparkSuccessor(h);
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
continue;
}
// 当状态码是PROPAGATE的时候,就可以结束循环了
}
// 在循环过程中,为了防止在上述操作过程中新添加了节点的情况,
// 通过检查头节点是否改变了,如果改变了就继续循环
if (h == head)
break;
}
}
释放时通过AQS类的releaseShared()方法实现的,源码:
public final boolean releaseShared(int arg) {
//首先由子类的实现去释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
原文:https://www.cnblogs.com/Sirius-/p/13934598.html