首先,需要考虑锁的基本组成元素。
AbstractOwnableSynchronizer是一个同步器,可能被单个线程排他占用。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
// 持有同步器的线程,解决了上面说的第二个问题,锁被哪个线程持有了
@Getter@Setter
private transient Thread exclusiveOwnerThread;
}
AQS
提供了一个实现阻塞锁以及相关同步器的框架,是大多数同步器的基础。
注意:该框架提供了锁的两个核心元素,即上文所述的1和3
FIFO
的线程等待队列state
子类必须定义改变state
的方法,以及在锁被释放的时候state
代表了什么。一般情况下:
那么最后,锁利用什么机制来阻塞和唤醒线程呢?JDK中的Unsafe类提供了阻塞和唤醒线程的一对操作原语,LockSupport
则提供了对于Unsafe
的简单封装、
某个线程获取锁失败,需要思考后续流程怎么处理2
// 阻塞的是当前调用的线程
public static void park() {
UNSAFE.park(false, 0L);
}
// 当前线程可以精准唤醒一个线程
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}
AQS
使用一个volatile的int类型的成员变量来表示锁状态,通过内置的双向链表队列来支持线程等待,并通过CAS
来完成对state
值的修改。
首先看下AQS中定义的最基本的数据结构,Node
static final class Node {
????/**
?????*?waitStatus:?节点代表的线程当前等待锁的状态
?????*?0???????????? 初始化后的默认值
?????*?CANCELLED?1??线程的请求已经取消了
?????*?CONDITION?-2?当前节点正在一个条件队列中
?????*?PROPAGATE?-3?当前线程处于SHARED情况下
?????*?SIGNAL????-1? 当前线程已经准备好,等待资源释放
?????*/
????volatile?int?waitStatus;
????/**
?????*?前驱节点
?????*/
????volatile?Node?prev;
????/**
?????*?后继节点
?????*/
????volatile?Node?next;
????/**
?????*?该节点代表的线程
?????*/
????volatile?Thread?thread;
????/**
?????*?用于Condition或者共享锁,这里暂不讨论
?????*/
????Node?nextWaiter;
}
/**
* AQS中维护了一个state的字段,代表同步状态,用于展示当前临界资源的获锁的情况。
*/
private volatile int state;
/**
* 头节点,懒初始化,除了初始化以外,只可以通过setHead来修改,状态一定不是CANCELLED。
* 值得注意的是,头节点中没有线程,dummy node
*/
private transient volatile Node head;
/**
* 尾节点,懒初始化,只可以通过enq添加新的等待节点的时候修改
*/
private transient volatile Node tail;
Reentrantlock
支持公平锁
和非公平锁
,默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
/**
* 非公平锁直接CAS获取锁,获取不到再去排队,调用的是父类AQS的acquire方法
*/
final void lock() {
// 直接获取锁,如果成功,则设置独占锁,获取锁成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 没成功,排队
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires)
}
}
// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
/**
* 非公平获锁
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前同步资源的状态
int c = getState();
// 如果可用,则CAS
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 将持锁线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}
}
// 如果被占用,如果当前线程已经拥有了该锁
else if (current == getExclusiveOwnerThread()) {
// 线程重入数 + acquires
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新锁状态,由于持锁的当前线程,所以可以直接setState
setState(nextc);
return true;
}
// 其他线程持有锁,获锁失败
return false;
}
}
/**
* 分析代码可以得出结论,公平锁是直接去队列中去等待,但是非公平锁会先去尝试直接获取锁
*/
static final class FairSync extends Sync {
final void lock() {
// 调用的是AQS中的acquire方法
acquire(1);
}
/**
* 公平锁tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁状态
int c = getState();
// 没有被持有
if (c == 0) {
// 检查是否有前置的等待线程,没有的话获锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果锁被持有了,判断是不是当前线程可重入的
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 判断公平锁是否还需要继续排队
// 0. h == t 头节点等于尾节点,说明未插入新节点,即没有线程获取到锁,返回false
// 1. h != t 头节点不等于尾节点,说明队列中已经有其他线程了,进行下一步判断
// s = h.next 获取头节点的下一个节点
// 假设线程一获取到了锁,头节点的下个节点变成A节点,但是操作是非原子性的
// 2.1 头节点的下一个节点为空,CAS设置tail成功了,可是head.next = node还没有执行完成
// 2.2 头节点的下一个节点不为空,但是不是当前线程,
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
}
这里加一张图解释下hasQueuedPredecessors里面(s = h.next) == null
的逻辑
// 这个方法是Sync的父类AQS的核心方法 final 模板方法,流程
// 如果当前线程没获取到 tryAcquire(arg) = false
// 那么执行addWaiter(Node.EXCLUSIVE) -> acquireQueued()
public final void acquire(int arg) {
// 这里的获锁方法其实是个模板方法,tryAcquire的具体逻辑在子类实现
if (!tryAcquire(arg) &&
// static final Node EXCLUSIVE = null;
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 如果第一步tryAquire失败了,会执行addWaiter,将当前线程封装成一个node,插入到队尾
// 并将当前的尾节点设置为当前线程节点,并返回当前节点
private Node addWaiter(Node mode) {
// 先利用当前线程和锁模式新建一个节点
Node node = new Node(Thread.currentThread(), mode);
// 先快速CAS插入,不行就一直循环CAS
// 先获取旧的尾部节点
Node pred = tail;
if (pred != null) {
// 旧尾部节点不为空的话,将当前节点的前置节点设置为旧尾节点
node.prev = pred;
// 利用CAS将当前节点为尾部节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 走到这里有两种场景:
// ① pred == null 代表还未初始化
// ② 发生了并发竞争,其他线程设置了新的尾结点,当前线程CAS失败鸟
// 自旋 初始化+CAS设置尾节点
enq(node);
return node;
}
/**
* 返回的是节点的前驱节点
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果尾节点为空,则需要进行dummy node初始化 head = tail = new Node()
if (t == null) {
if (compareAndSetHead(new Node()))
// 注意:这里不是原子操作,会出现短暂的tail != head
tail = head;
} else {
// 如果不为空,说明已经初始化过了,自旋设置尾部节点为当前线程节点
node.prev = t;
// 这里不是原子的
if (compareAndSetTail(t, node)) {
// 这里也不是原子的,可能出现旧的tail的next指针还是指向null的场景
t.next = node;
return t;
}
}
}
}
// 执行到这,代表
// ①没有直接获取到锁 ②线程在等待队列中排队获取锁
// acquireQueued会把队列中的线程不断获取锁,判断是否需要挂起
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否中断过
boolean interrupted = false;
// 自旋,要么获取锁,要么中断
for (;;) {
// 这里尝试获取锁是需要满足条件的
final Node p = node.predecessor();
// 必须前驱节点是头节点,代表当前节点位于队列首位,才会去拿锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,头指针移动到当前node,清空线程和prev指针
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 这里也比较关键,获锁失败后需要判断是否要将线程挂起,如果需要的话则挂起并记录中断状态
// ① 不是第一个节点
// ② 获锁失败了
// 这里如果返回false,代表前驱节点的状态发生了变化,重新自旋判断下
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {???
head = node;???
node.thread = null;???
node.prev = null;
}
/**
* 判断当前线程是否应该阻塞 需要判断前置节点的状态
* 如果是独占锁,只需要考虑CANCELLED和SIGNAL
*
* CANCELLED:由于超时或者中断导致取消,节点不会流转到其他状态了。
* SIGNAL:当前节点的下个节点正在或将要阻塞,因此当前线程在释放或者取消的时候需要唤醒后继节点
* 值得注意的是,节点的这个状态时后继节点设置的
*
* 举个例子:排队,和前面的哥们打个招呼,兄弟,你进去了或者不想等了,都叫醒我。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取头节点的节点状态
int ws = pred.waitStatus;
// 说明头结点处于唤醒状态
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// 删除掉所有的取消节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这里只能是初始状态0了,给前面哥们提个醒
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private void cancelAcquire(Node node) {
if (node == null)
return;
// 设置该节点不关联任何线程,虚节点
node.thread = null;
// 跳过取消状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 将当前节点的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将尾节点从当前节点设置为当前节点的前驱结点
// 如果tail更新成功,更新tail的next为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 更新失败的话,
// 1.如果当前节点的前置节点不是head节点
// 2.1 前置节点的ws为Node.SIGNAL
// 2.2 ws<=0 且 将前置节点的ws设置为Node.SIGNAL成功
// 3 前置节点的线程不为空
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 如果当前节点的下个节点设置为前驱节点的后继节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点 或者 上述条件均不满足,唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
原文:https://www.cnblogs.com/zerodsLearnJava/p/13123509.html