本系列研究总结高并发下的几种同步锁的使用以及之间的区别,分别是:ReentrantLock、CountDownLatch、CyclicBarrier、Phaser、ReadWriteLock、StampedLock、Semaphore、Exchanger、LockSupport。由于博客园对博客字数的要求限制,会分为三个篇幅:
高并发之ReentrantLock、CountDownLatch、CyclicBarrier
高并发之Phaser、ReadWriteLock、StampedLock
高并发之Semaphore、Exchanger、LockSupport
ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞。在java关键字synchronized隐式支持重入性(关于synchronized可以看这篇文章),synchronized通过获取自增,释放自减的方式实现重入。与此同时,ReentrantLock还支持公平锁和非公平锁两种方式。那么,要想完完全全的弄懂ReentrantLock的话,主要也就是ReentrantLock同步语义的学习:1. 重入性的实现原理;2. 公平锁和非公平锁。
要想支持重入性,就要解决两个问题:
1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功;
2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功。
我们知道,同步组件主要是通过重写AQS的几个protected方法来表达自己的同步语义。针对第一个问题,我们来看看ReentrantLock是怎样实现的,以非公平锁为例,判断当前线程能否获得锁为例,核心方法为nonfairTryAcquire:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//1. 如果该锁未被任何线程占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//2.若被占有,检查占有线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 3. 再次获取,计数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
为了支持重入性,在第二步增加了处理逻辑,如果该锁已经被线程所占有了,会继续检查占有线程是否为当前线程,如果是的话,同步状态加1返回true,表示可以再次获取成功。每次重新获取都会对同步状态进行加一的操作,那么释放的时候处理思路是怎样的了?(依然还是以非公平锁为例)核心方法为tryRelease:
protected final boolean tryRelease(int releases) {
//1. 同步状态减1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//2. 只有当同步状态为0时,锁成功被释放,返回true
free = true;
setExclusiveOwnerThread(null);
}
// 3. 锁未被完全释放,返回false
setState(c);
return free;
}
需要注意的是,重入锁的释放必须得等到同步状态为0时锁才算成功释放,否则锁仍未释放。如果锁被获取n次,释放了n-1次,该锁未完全释放返回false,只有被释放n次才算成功释放,返回true。到现在我们可以理清ReentrantLock重入性的实现了,也就是理解了同步语义的第一条。
ReentrantLock支持两种锁:公平锁和非公平锁。何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO。ReentrantLock的构造方法无参时是构造非公平锁,源码为:
public ReentrantLock() {
sync = new NonfairSync();
}
另外还提供了另外一种方式,可传入一个boolean值,true时为公平锁,false时为非公平锁,源码为:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
这段代码的逻辑与nonfairTryAcquire基本上一致,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁。
CountDownLatch,这个词语是由Count Down、Latch两部分组成,意思是倒数门闩,为了形象记忆,可以理解为一个门上有很多个门闩,只有所有门闩都打开,也就是为0时才可以通过这个门。
创建对象时需要在构造CountDownLatch中传入一个整数n,设置门闩的个数, 在这个整数“倒数”到0之前,线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次(门闩减一)。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,必要时可以对各个任务的执行结果进行汇总,然后线程才继续往下执行。
CountDownLatch主要有两个方法:countDown()和await()。countDown()方法用于使计数器减一,其一般是执行任务的线程调用,await()方法则使调用该方法的线程处于等待状态,其一般是主线程调用。这里需要注意的是,countDown()方法并没有规定一个线程只能调用一次,当同一个线程调用多次countDown()方法时,每次都会使计数器减一;另外,await()方法也并没有规定只能有一个线程执行该方法,如果多个线程同时执行await()方法,那么这几个线程都将处于等待状态,并且以共享模式享有同一个锁。
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
Service service = new Service(latch);
Runnable task = () -> service.exec();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(task);
thread.start();
}
System.out.println("main thread await. ");
latch.await();
System.out.println("main thread finishes await. ");
}
}
其中的Service类如下:
public class Service {
private CountDownLatch latch;
//需要将countdownLatch传递进来
public Service(CountDownLatch latch) {
this.latch = latch;
}
public void exec() {
try {
System.out.println(Thread.currentThread().getName() + " execute task. ");
sleep(2);
System.out.println(Thread.currentThread().getName() + " finished task. ");
} finally {
//放在finally中是为了异常也能执行,不至于主线程死锁!
latch.countDown();
}
}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
首先声明了一个CountDownLatch对象,并且由主线程创建了5个线程,分别执行任务,在每个任务中,当前线程会休眠2秒。在启动线程之后,主线程调用了CountDownLatch.await()方法,此时,主线程将在此处等待创建的5个线程执行完任务之后才继续往下执行。
如下是执行结果:
main thread await.
Thread-1 execute task.
Thread-3 execute task.
Thread-0 execute task.
Thread-2 execute task.
Thread-4 execute task.
Thread-2 finished task.
Thread-3 finished task.
Thread-1 finished task.
Thread-0 finished task.
Thread-4 finished task.
main thread finishes await.
从输出结果可以看出,主线程先启动了五个线程,然后主线程进入等待状态,当这五个线程都执行完任务之后主线程才结束了等待。上述代码中需要注意的是,在执行任务的线程中,使用了try...finally结构,该结构可以保证创建的线程发生异常时CountDownLatch.countDown()方法也会执行,也就保证了主线程不会一直处于等待状态。
CountDownLatch非常适合于对任务进行拆分,使其并行执行,比如某个任务执行2s,其对数据的请求可以分为五个部分,那么就可以将这个任务拆分为5个子任务,分别交由五个线程执行,执行完成之后再由主线程进行汇总,此时,总的执行时间将决定于执行最慢的任务,平均来看,还是大大减少了总的执行时间。
另外一种比较合适使用CountDownLatch的地方是使用某些外部链接请求数据的时候,比如图片,因为我们使用的图片服务只提供了获取单个图片的功能,而每次获取图片的时间不等,一般都需要1.5s~2s。当我们需要批量获取图片的时候,比如列表页需要展示一系列的图片,如果使用单个线程顺序获取,那么等待时间将会极长,此时我们就可以使用CountDownLatch对获取图片的操作进行拆分,并行的获取图片,这样也就缩短了总的获取时间。
CountDownLatch是基于AbstractQueuedSynchronizer实现的,在AbstractQueuedSynchronizer中维护了一个volatile类型的整数state,volatile可以保证多线程环境下该变量的修改对每个线程都可见,并且由于该属性为整型,因而对该变量的修改也是原子的。创建一个CountDownLatch对象时,所传入的整数n就会赋值给state属性,当countDown()方法调用时,该线程就会尝试对state减一,而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。
public void countDown() {
sync.releaseShared(1);
}
这里sync也即一个继承了AbstractQueuedSynchronizer的类实例,该类是CountDownLatch的一个内部类,
其声明如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState(); // 获取当前state属性的值
if (c == 0) // 如果state为0,则说明当前计数器已经计数完成,直接返回
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc)) // 使用CAS算法对state进行设置
return nextc == 0; // 设置成功后返回当前是否为最后一个设置state的线程
}
}
}
这里tryReleaseShared(int)方法即对state属性进行减一操作的代码。可以看到,CAS也即compare and set的缩写,jvm会保证该方法的原子性,其会比较state是否为c,如果是则将其设置为nextc(自减1),如果state不为c,则说明有另外的线程在getState()方法和compareAndSetState()方法调用之间对state进行了设置,当前线程也就没有成功设置state属性的值,其会进入下一次循环中,如此往复,直至其成功设置state属性的值,即countDown()方法调用成功。
在countDown()方法中调用的sync.releaseShared(1)调用时实际还是调用的tryReleaseShared(int)方法,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以看到,在执行sync.releaseShared(1)方法时,其在调用tryReleaseShared(int)方法时会在无限for循环中设置state属性的值,设置成功之后其会根据设置的返回值(此时state已经自减了一),即当前线程是否为将state属性设置为0的线程,来判断是否执行if块中的代码。doReleaseShared()方法主要作用是唤醒调用了await()方法的线程。需要注意的是,如果有多个线程调用了await()方法,这些线程都是以共享的方式等待在await()方法处的,试想,如果以独占的方式等待,那么当计数器减少至零时,就只有一个线程会被唤醒执行await()之后的代码,这显然不符合逻辑。
private void doReleaseShared() {
for (;;) {
Node h = head; // 记录等待队列中的头结点的线程
if (h != null && h != tail) { // 头结点不为空,且头结点不等于尾节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // SIGNAL状态表示当前节点正在等待被唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 清除当前节点的等待状态
continue;
unparkSuccessor(h); // 唤醒当前节点的下一个节点
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果h还是指向头结点,说明前面这段代码执行过程中没有其他线程对头结点进行过处理
break;
}
}
在doReleaseShared()方法中(始终注意当前方法是最后一个执行countDown()方法的线程执行的),首先判断头结点不为空,且不为尾节点,说明等待队列中有等待唤醒的线程,这里需要说明的是,在等待队列中,头节点中并没有保存正在等待的线程,其只是一个空的Node对象,真正等待的线程是从头节点的下一个节点开始存放的,因而会有对头结点是否等于尾节点的判断。在判断等待队列中有正在等待的线程之后,其会清除头结点的状态信息,并且调用unparkSuccessor(Node)方法唤醒头结点的下一个节点,使其继续往下执行。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // 清除当前节点的等待状态
Node s = node.next;
if (s == null || s.waitStatus > 0) { // s的等待状态大于0说明该节点中的线程已经被外部取消等待了
s = null;
// 从队列尾部往前遍历,找到最后一个处于等待状态的节点,用s记录下来
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒离传入节点最近的处于等待状态的节点线程
}
可以看到,unparkSuccessor(Node)方法的作用是唤醒离传入节点最近的一个处于等待状态的线程,使其继续往下执行。前面我们讲到过,等待队列中的线程可能有多个,而调用countDown()方法的线程只唤醒了一个处于等待状态的线程,这里剩下的等待线程是如何被唤醒的呢?其实这些线程是被当前唤醒的线程唤醒的。具体的我们可以看看await()方法的具体执行过程。
如下是await()方法的代码:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await()方法实际还是调用了Sync对象的方法acquireSharedInterruptibly(int)方法,如下是该方法的具体实现:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
可以看到acquireSharedInterruptibly(int)方法判断当前线程是否需要以共享状态获取执行权限,这里tryAcquireShared(int)方法是AbstractQueuedSynchronizer中的一个模板方法,其具体实现在前面的Sync类中,可以看到,其主要是判断state是否为零,如果为零则返回1,表示当前线程不需要进行权限获取,可直接执行后续代码,返回-1则表示当前线程需要进行共享权限。具体的获取执行权限的代码在doAcquireSharedInterruptibly(int)方法中。
如下是该方法的具体实现:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED); // 使用当前线程创建一个共享模式的节点
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前一个节点
if (p == head) { // 判断前一个节点是否为头结点
int r = tryAcquireShared(arg); // 查看当前线程是否获取到了执行权限
if (r >= 0) { // 大于0表示获取了执行权限
setHeadAndPropagate(node, r); // 将当前节点设置为头结点,并且唤醒后面处于等待状态的节点
p.next = null; // help GC
failed = false;
return;
}
}
// 走到这一步说明没有获取到执行权限,就使当前线程进入“搁置”状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在doAcquireSharedInterruptibly(int)方法中,首先使用当前线程创建一个共享模式的节点。然后在一个for循环中判断当前线程是否获取到执行权限,如果有(r >= 0判断)则将当前节点设置为头节点,并且唤醒后续处于共享模式的节点;如果没有,则对调用shouldParkAfterFailedAcquire(Node, Node)和parkAndCheckInterrupt()方法使当前线程处于“搁置”状态,该“搁置”状态是由操作系统进行的,这样可以避免该线程无限循环而获取不到执行权限,造成资源浪费,这里也就是线程处于等待状态的位置,也就是说当线程被阻塞的时候就是阻塞在这个位置。当有多个线程调用await()方法而进入等待状态时,这几个线程都将等待在此处。这里回过头来看前面将的countDown()方法,其会唤醒处于等待队列中离头节点最近的一个处于等待状态的线程,也就是说该线程被唤醒之后会继续从这个位置开始往下执行,此时执行到tryAcquireShared(int)方法时,发现r大于0(因为state已经被置为0了),该线程就会调用setHeadAndPropagate(Node, int)方法,并且退出当前循环,也就开始执行awat()方法之后的代码。
这里我们看看setHeadAndPropagate(Node, int)方法的具体实现:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node); // 将当前节点设置为头节点
// 检查唤醒过程是否需要往下传递,并且检查头结点的等待状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 如果下一个节点是尝试以共享状态获取获取执行权限的节点,则将其唤醒
doReleaseShared();
}
}
setHeadAndPropagate(Node, int)方法主要作用是设置当前节点为头结点,并且将唤醒工作往下传递,在传递的过程中,其会判断被传递的节点是否是以共享模式尝试获取执行权限的,如果不是,则传递到该节点处为止(一般情况下,等待队列中都只会都是处于共享模式或者处于独占模式的节点)。也就是说,头结点会依次唤醒后续处于共享状态的节点,这也就是共享锁与独占锁的实现方式。这里doReleaseShared()方法也就是我们前面讲到的会将离头结点最近的一个处于等待状态的节点唤醒的方法。
CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。 CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。
使用方法
可以想象飞虎队(American Volunteer Group,AVG)要执行某项任务,需要等所有人到齐之后才能开始行动
class AVG implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public AVG(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
System.out.println(name + "就位");
try {
cyclicBarrier.await();
Random random =new Random();
double time = random.nextDouble() + 9;
System.out.println(name + ": "+ time);
} catch (Exception e) {
}
}
}
就位:
class Ready {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(8);
public void start() {
List<Athlete> avgList = new ArrayList<>();
athleteList.add(new AVG(cyclicBarrier,"何永道"));
athleteList.add(new AVG(cyclicBarrier,"约翰·理查德·罗西"));
athleteList.add(new AVG(cyclicBarrier,"查尔斯·庞德"));
athleteList.add(new AVG(cyclicBarrier,"罗伯特·尼尔"));
athleteList.add(new AVG(cyclicBarrier,"罗伯特·桑德尔"));
athleteList.add(new AVG(cyclicBarrier,"法兰克·史基尔"));
athleteList.add(new AVG(cyclicBarrier,"约翰·牛柯克"));
athleteList.add(new AVG(cyclicBarrier,"大卫·李·希尔"));
Executor executor = Executors.newFixedThreadPool(8);
for (AVG avgmember : avgList) {
executor.execute(avgmember);
}
}
}
CyclicBarrier实现主要基于ReentrantLock
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
...省略后面代码
}
其中Generation用来控制屏障的循环使用,如果generation.broken为true的话,说明这个屏障已经损坏,当某个线程await的时候,直接抛出异常。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//利用ReentrantLock加锁
lock.lock();
try {
final Generation g = generation;
//如果已经损坏,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//即使没有被中断,也将完成等待,因此该中断被视为“属于”后续执行。
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
解释
重新生成Generation对象
恢复count值
举例子:CD:司机在等人坐满了才开车,阻塞主体是外部线程。 CB:人在等其他人来了再上车,阻塞主体是多个线程。
高并发之ReentrantLock、CountDownLatch、CyclicBarrier
原文:https://www.cnblogs.com/Courage129/p/14406756.html