Condition是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件,只有满足条件时,线程才会被唤醒。
结论:
? 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在AQS等待队列,则自旋等待尝试获取锁。
? 释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程。
public class ConditionAwaitDemo implements Runnable {
private ReentrantLock lock;
private Condition condition;
public ConditionAwaitDemo(ReentrantLock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin---await");
try {
lock.lock();
condition.await();
System.out.println("end---await");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionSignalDemo implements Runnable {
private ReentrantLock lock;
private Condition condition;
public ConditionSignalDemo(ReentrantLock lock, Condition condition) {
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin---signal");
try {
lock.lock();
condition.signal();
System.out.println("end---await");
} finally {
lock.unlock();
}
}
}
public class Test {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(new ConditionAwaitDemo(lock, condition)).start();
new Thread(new ConditionSignalDemo(lock, condition)).start();
}
}
? 调用Condition,需要获得Lock锁,所以会意味着存在一个AQS同步队列,先看Condition.await()方法。它会使线程进入等待队列并释放锁,同时将线程状态变为等待状态。
1,AQS.await(),其实阻塞调用的还是LockSupport.park()方法。
public final void await() throws InterruptedException {
//表示await允许被中断
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断这个节点是否在AQS队列上,第一次判断肯定是false,因为前面已经释放锁了
while (!isOnSyncQueue(node)) {
LockSupport.park(this);// 第一次总是park自己,开始阻塞等待
// 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在 isOnSyncQueue 中判断自己是否在队列上
// isOnSyncQueue 判断当前node还在队列上且不是CONDITION状态了,就结束循环和阻塞.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
2,AQS.addConditionWaiter()
//把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3,AQS.fullyRelease(Node )
//彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); //获取当前重入次数
//释放锁并且唤醒下一个同步队列中的线程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
4,AQS.isOnSyncQueue(Node )
? condition队列中的节点会重新加入到aqs队列去重新竞争锁,也就是当调用signal的时候,会把当前节点从condition队列转移到aqs队列。所以需要判断节点是否在aqs队列中。
a,如果不在aqs队列,则说明当前节点没有被唤醒去争抢锁,所以需要阻塞等待其它线程调用signal唤醒。
b,如果在aqs队列,则说明当前节点需要去竞争锁获得执行权
node.waitStatus == Node.CONDITION || node.prev == null 判断不在aqs队列?
? aqs队列中的节点一定没有状态为condition的节点。在aqs队列中只有head节点的prev==null,head节点又是获得锁的节点,所以不可能。
? node.next不为空一定存在condition的队列,因为aqs是双向列表
c,findNodeFromTail:从tail节点往前扫描aqs队列,一但发现aqs队列有节点和当前节点相等,那么该节点也一定存在于aqs队列。
//判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
5,ConditionObject.checkInterruptWhileWaiting(Node )
? 判断线程在condition队列被阻塞的过程中,有没有被其他线程触发过中断请求
private int checkInterruptWhileWaiting(Node node) {
//如果被中断,则调用transferAfterCancelledWait判断后续的处理应该是抛出异常还是重新中断
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
6,transferAfterCancelledWait(Node )
final boolean transferAfterCancelledWait(Node node) {
//如果设置成功,证明线程在signal之前已经被中断过,所以重新假如aqs队列
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//其实如果调用signal唤醒,则线程已经在aqs队列,该步循环判断是否在aqs队列上,如果不在让出CPU执行权
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
7,AQS.acquireQueued(Node , int )
? 该方法,aqs分析过,被唤醒的线程去抢占同步锁,并且回复到原来的重入次数。
8,reportInterruptAfterWait(int )
根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException(); //抛出中断异常
else if (interruptMode == REINTERRUPT) //则重新响应中断
selfInterrupt();
}
1,AQS.signal()
public final void signal() {
//先判断当前线程是否获得了锁,逻辑是:判断当前线程和获得锁的线程是否相等
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
if (first != null)
doSignal(first);
}
2,ConditionObject.doSignal(Node )
? 对condition队列从首部开始第一个节点执行transferForSignal操作,将将node从condition队移动到aqs队列中,同时修改aqs队列中原先尾节点的状态。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
3,AQS.transferForSignal(Node )
final boolean transferForSignal(Node node) {
//更新节点的状态为0,如果更新失败只有一种可能就是这个节点被CANCELLED了
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
//如果 node 的 prev 节点已经是signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
return true;
}
原文:https://www.cnblogs.com/gaojf/p/12774133.html