读多写少。读写锁,读读不互斥,写写、写读互斥。读多写少的话可以避免读读的互斥。
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
* 默认创建一个非公平锁的读写锁实例
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
//默认的构造函数传的是false,sync为NonfairSync也就是非公平锁的实现
sync = fair ? new FairSync() : new NonfairSync();
//初始化读锁和写锁
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
ReadLock和WriteLock是ReentrantReadWriteLock的内部类,构造函数将ReentrantReadWriteLock的sync传递给它们,读写锁持有同一个sync实例
/**
* Acquires the read lock.
* 获取读锁
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
* 获取毒素哦如果写锁没有被其他线程持有然后马上返回
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
* 如果写锁被其他线程持有那么当前线程将休眠直到读锁
*/
public void lock() {
sync.acquireShared(1);
}
说明读锁和写锁是互斥的,接着看sync#acquireShared,sync是继承与AQS的,实际调用的是AQS#acquireShared
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
//① 尝试获取共享锁
if (tryAcquireShared(arg) < 0)
//② 获取失败 如果符合条件可再次尝试获取锁 如果还是失败可能会进入休眠状态
doAcquireShared(arg);
}
tryAcquireShared需要子类实现,这里为Sync的实现
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
/*
* 参考走读
* 1. 如果读锁被其他线程获取,失败
* 2. 否则,这个线程是符合条件的对于锁状态,所以是否需要被阻塞是由队列策略觉得的。如果不,尝试cas跟新state。
* 注意那个步骤不会检查重入获取,延期全版本去避免检查count持有在一些不可重入的情况
* 3. 如果步骤2失败了,不是由于线程不符合条件,或者cas失败,或者count溢出,链式去版本全循环重试
*/
//获取当前线程
Thread current = Thread.currentThread();
//获取state
int c = getState();
//STEP① exclusiveCount来判断当前state是否含有写锁,如果写锁数量不等于0并且当前写锁持有线程不是当前线程则返回-1表示未获取到锁,这里可以看出写锁是可以降级为读锁的
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取读锁计数
int r = sharedCount(c);
//STEP② readerShouldBlock意思为读锁是否需要阻塞 r < MAX_COUNT表示读节点数量小于65535
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//读计数+1
compareAndSetState(c, c + SHARED_UNIT)) {
//如果读节点为0
if (r == 0) {
//第一个读线程设置为当前线程
firstReader = current;
//第一个读线程持有计数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {
//如果第一个读线程是当前线程,则第一个线程的持有计数+1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
//读引用计数+1
rh.count++;
}
//获取读锁成功
return 1;
}
//如果获取读锁失败 则调用fullTryAcquireShared
return fullTryAcquireShared(current);
}
STEP① Sync对于读锁和写锁的计数的存储格式,是将state的高16位作为读锁计数,低16位作为写锁计数
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
Sync#fullTryAcquireShared
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
//获取状态值
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we‘re not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//读锁计数达到了最大值则抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//cas设置读锁计数+1
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
回到doAcquireShared
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//新增AQS的共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果该节点的前继节点是头节点则再次尝试获取读锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//读锁获取到后,将当前节点设置为头节点,并尝试传播唤醒后继读节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//判断是否需要休眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Attempts to release this lock.
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
sync.releaseShared(1);
}
AQS#releaseShared
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
//一个读线程是当前线程 firstReaderHoldCount如果为1则将firstReader置为null
if (firstReaderHoldCount == 1)
firstReader = null;
else
//否则将第一个读线程的引用计数-1
firstReaderHoldCount--;
} else {
//当前线程不是第一个读线程
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//缓存的HoldCounter为空或者当前缓存的HoldCounter不属于当前线程则从本地线程缓存中获取HoldCounter
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
//移除掉计数小于等于1的HoldCounter
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
//HoldCounter计数-1
--rh.count;
}
//cas自旋减少state高16位读锁计数 减少到0才算释放读锁
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
AQS#doReleaseShared 该方法释放共享锁,之前的文章已经解读过,再次不做赘述
/**
* Acquires the write lock.
* 获取写锁
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately, setting the write lock hold count to
* one.
* 获取写锁在读和写锁都没有被其他线程持有的情况下,设置写锁计数为1
* <p>If the current thread already holds the write lock then the
* hold count is incremented by one and the method returns
* immediately.
* 如果当前线程已经持有写锁则持有计数将递增1
* <p>If the lock is held by another thread then the current
* thread becomes disabled for thread scheduling purposes and
* lies dormant until the write lock has been acquired, at which
* time the write lock hold count is set to one.
* 如果锁被另外一个线程持有,则当前线程休眠至写锁获取,在写锁计数被设置为0的时候
*/
public void lock() {
sync.acquire(1);
}
AQS#acquire
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Sync#tryAcquire
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 如果读锁计数不为0或者写锁计数不为0,并且锁的持有者不是当前线程,则失败
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 如果计数溢出,失败
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
* 其他,这个线程符合获取锁,如果它不是一个自旋获取或者队列允许。如果是这样,跟新state并且设置锁的持有者为自己
*/
Thread current = Thread.currentThread();
int c = getState();
//写锁计数
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//state计数不为0,但是w写计数为0,或者写锁持有者不是自己,说明有读锁,返回false 这里说明了不支持锁升级
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//判断写锁计数溢出,不能超过65535
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//重入获取
setState(c + acquires);
return true;
}
//writerShouldBlock 写入是否需要阻塞,非公平锁的实现总是返回false
//compareAndSetState cas设置state
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//设置独占多线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
AQS#acquireQueued 增加排队节点,(如果该节点的前继节点是头节点则再尝试一次获取锁),判断是否需要休眠。该方法之前已经讲过,不再赘述。
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then
* the hold count is decremented. If the hold count is now
* zero then the lock is released. If the current thread is
* not the holder of this lock then {@link
* IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
AQS#release
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒同步队列的节点
unparkSuccessor(h);
return true;
}
return false;
}
Sync#tryRelease
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
//如果当前线程不是锁的独占线程抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//判断写锁计数是否变为0了
if (free)
//将独占锁持有线程设置为0
setExclusiveOwnerThread(null);
//设置state
setState(nextc);
return free;
}
原文:https://www.cnblogs.com/lomoye/p/12745208.html