队列同步器AQS(AbstractQueuedSynchronizer)是用来构建锁或者其它同步组件的基础框架,使用int成员变量state表示同步状态,通过内部的FIFO双向等待队列来完成线程的排队工作。同步器中的等待队列可以简单的理解为"等待锁的线程队列"。
队列同步器AQS可以简单的理解为"同步状态的管理者",为了保证对同步状态的原子操作,使用CAS;当线程获取同步状态时,需要将获取同步状态失败的线程以及等待状态等信息构成一个节点(Node)放入等待队列,同时阻塞该线程,因此需要同步队列。
同步器采用了模板方法的设计模式,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法会调用使用者重写的方法。在重写同步器指定的方法时,需要对同步状态进行更改时,使用同步器提供的如下3个方法:getState()、setState(int newState)、compareAndSetState(int expect, int update)来进行操作,因为CAS操作保证同步状态的改变是线程安全的。下面是AQS源码中state的声明和三个方法的定义:
1 /** 同步状态 */ 2 private volatile int state; 3 4 /** 5 * 返回当前的同步状态。此操作的内存语义为 {@code volatile} read. 6 */ 7 protected final int getState() { 8 return state; 9 } 10 11 /** 12 * 设置新的同步状态。此操作的内存语义为 {@code volatile} write. 13 */ 14 protected final void setState(int newState) { 15 state = newState; 16 } 17 18 /** 19 * 如果当前的同步状态与期望值相同,通过原子操作更新状态值。 20 * 此操作的内存语义为{@code volatile} read and write. 21 * 如果当前的同步状态与期望值不同,返回false. 22 */ 23 protected final boolean compareAndSetState(int expect, int update) { 24 // See below for intrinsics setup to support this 25 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); 26 }
子类需要重写部分或全部一下方法。如果是独占锁就重写 tryAcquire() 和 tryRelease()方法,在独占模式下,每次只能有一个线程持有锁,RenentrantLock就是以独占方式实现的互斥锁;如果是共享锁就重写 tryAcquireShared() 和 tryReleaseShared() 方法,在共享模式下,允许多个线程同时获取锁,并发访问,共享资源,如:ReentrantReadWriteLock里的读锁,它的读锁是可以被共享的,但是它的写锁是独占的。AQS的内部类Node定义了两个常量SHARED和EXCLUSIVE,用来表示AQS队列中等待线程的锁获取模式。
1 /** 2 * 以独占模式获取同步状态,实现该方法需要查询当前状态并判断状态是否符合预期,然 3 * 后再进行CAS设置同步状态。 4 */ 5 protected boolean tryAcquire(int arg) { 6 throw new UnsupportedOperationException(); 7 } 8 9 /** 10 * 以独占模式释放同步锁,等待获取同步状态的线程有机会获取同步状态。 11 */ 12 protected boolean tryRelease(int arg) { 13 throw new UnsupportedOperationException(); 14 } 15 16 /** 以共享模式获取同步状态。*/ 17 protected int tryAcquireShared(int arg) { 18 throw new UnsupportedOperationException(); 19 } 20 21 /** 以共享模式释放同步状态,该方法总是由执行释放的线程调用。*/ 22 protected boolean tryReleaseShared(int arg) { 23 throw new UnsupportedOperationException(); 24 } 25 26 /** 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占。*/ 27 protected boolean isHeldExclusively() { 28 throw new UnsupportedOperationException(); 29 }
Java并发包根据是否允许多个线程同时获取锁,将加锁模式分为共享锁和独占锁。对于synchronized来说,如果一个线程在等待锁,那么结果只有两种,获得这把锁继续执行,或者线程保持等待;而AQS提供了另一种可能,这就是线程可以被中断,并且优先响应中断,也就是在等待锁的过程中,程序可以根据需要取消对锁的需求,可以避免死锁。除了等待外部通知(中断操作interrupt)之外,限时等待也可以避免死锁,给定一个时间,如果线程没有在给定的时间内获取到锁,让线程自动放弃。
不管是等待方式,中断方式 还是 定时方式,它们的主要功能获取锁、释放锁的逻辑方法是一样的,中断方式只是在获取锁之前增加了中断判断,定时方式只是增加了定时设计。因此,上面的重写方法是它们的共有逻辑,但实现方式可以不同,将它们放在子类中具体实现。
1 /** 2 * 以独占模式获取,忽略中断。 通过调用至少一次tryAcquire(int)实现,成功返回。 3 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。 4 */ 5 public final void acquire(int arg) { 6 if (!tryAcquire(arg) && 7 acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) 8 selfInterrupt(); 9 } 10 11 /** 12 * 以独占模式获取,优先中断。首先检查中断状态,然后调用至少一次tryAcquire(int)实现,成功返回。 13 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。 14 */ 15 public final void acquireInterruptibly(int arg) 16 throws InterruptedException { 17 if (Thread.interrupted()) 18 throw new InterruptedException(); 19 if (!tryAcquire(arg)) 20 doAcquireInterruptibly(arg); 21 } 22 23 /** 24 * 以独占模式获取,优先中断,如果超时则失败。 25 * 首先检查中断状态,然后调用至少一次tryAcquire(int)实现,成功返回。 26 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquire(int)。 27 */ 28 public final boolean tryAcquireNanos(int arg, long nanosTimeout) 29 throws InterruptedException { 30 if (Thread.interrupted()) 31 throw new InterruptedException(); 32 return tryAcquire(arg) || 33 doAcquireNanos(arg, nanosTimeout); 34 } 35 36 /** 37 * 以独占模式释放同步状态,调用tryRelease(int)实现。 38 * 该方法会在释放同步状态成功后,将同步队列中的队首节点(这时已更新队首节点)包含的线程唤醒。 39 */ 40 public final boolean release(int arg) { 41 if (tryRelease(arg)) { 42 AbstractQueuedSynchronizer.Node h = head; 43 if (h != null && h.waitStatus != 0) 44 unparkSuccessor(h); 45 return true; 46 } 47 return false; 48 } 49 50 /** 51 * 以共享模式获取,忽略中断。 通过调用至少一次tryAcquireShared(int)实现,成功返回。 52 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。 53 */ 54 public final void acquireShared(int arg) { 55 if (tryAcquireShared(arg) < 0) 56 doAcquireShared(arg); 57 } 58 59 /** 60 * 以共享模式获取,优先中断。 通过调用至少一次tryAcquireShared(int)实现,成功返回。 61 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。 62 */ 63 public final void acquireSharedInterruptibly(int arg) 64 throws InterruptedException { 65 if (Thread.interrupted()) 66 throw new InterruptedException(); 67 if (tryAcquireShared(arg) < 0) 68 doAcquireSharedInterruptibly(arg); 69 } 70 71 /** 72 * 以共享模式获取,优先中断,如果超时则失败。 73 * 通过调用至少一次tryAcquireShared(int)实现,成功返回。 74 * 否则线程排队,可能会重复阻塞和解除阻塞,直到成功才调用tryAcquireShared(int)。 75 */ 76 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) 77 throws InterruptedException { 78 if (Thread.interrupted()) 79 throw new InterruptedException(); 80 return tryAcquireShared(arg) >= 0 || 81 doAcquireSharedNanos(arg, nanosTimeout); 82 } 83 84 /** 以共享模式释放同步状态。*/ 85 public final boolean releaseShared(int arg) { 86 if (tryReleaseShared(arg)) { 87 doReleaseShared(); 88 return true; 89 } 90 return false; 91 } 92 93 /** 获取在等待队列上的线程集合。*/ 94 public final Collection<Thread> getQueuedThreads() { 95 ArrayList<Thread> list = new ArrayList<Thread>(); 96 for (AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) { 97 Thread t = p.thread; 98 if (t != null) 99 list.add(t); 100 } 101 return list; 102 }
不管是共享锁还是独占锁,或者公平锁还是非公平锁,锁的逻辑都是一样的:线程获取锁 --> 成功继续执行,不成功阻塞等待 --> 释放锁 --> 后续线程获取锁。所以AQS采用模板方法的设计模式是非常适合的,让子类重写tryAcquire(),tryRelease(),tryAcquireShared(),tryReleaseShared()方法,不同的锁可以有不同的实现方式,比如ReentrantLock的内部类NonfairSync和FairSync对 tryAcquire()的实现方式就不同,一个是非公平方式,一个是公平方式;然后AQS中的模板方法 ,如果是获取锁的方法,就调用相应的子类中重写的获取锁的方法,如果是 释放锁的方法,就调用相应的子类中重写的释放锁的方法。这样子类重写的方法就被"隐藏"了,只需要调用AQS中的几个重要的模板方法就可以了。
当线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时阻塞当前线程;当同步状态释放时,会把队首节点中的线程唤醒,使其再次获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,AQS的内部内Node的定义如下:
1 static final class Node { 2 /** 表示节点使用共享模式等待 */ 3 static final Node SHARED = new Node(); 4 /** 表示节点使用独占模式等待 */ 5 static final Node EXCLUSIVE = null; 6 7 /** 等待状态可能值,由于在同步队列中等待的线程等待超时或者被中断,该节点不会参与 8 同步状态的竞争,需要从同步队列中取消等待,节点进入该状态后将不会再变化; */ 9 static final int CANCELLED = 1; 10 /** 等待状态可能值,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态 11 或者被取消,将会通知后继节点,使后继节点的线程得以运行 */ 12 static final int SIGNAL = -1; 13 /** 等待状态可能值,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition 14 * 调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中;*/ 15 static final int CONDITION = -2; 16 /** 17 * 等待状态可能值,表示下一次共享式同步状态获取将会无条件地传播下去; 18 */ 19 static final int PROPAGATE = -3; 20 21 /** 等待状态值 */ 22 volatile int waitStatus; 23 24 /** 25 * 前驱节点 26 */ 27 volatile Node prev; 28 29 /** 30 * 后继节点 31 */ 32 volatile Node next; 33 34 /** 35 * 获取同步状态的线程,使用后置null. 36 */ 37 volatile Thread thread; 38 39 /** 40 * 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量, 41 * 也就是说节点类型(独占和共享)和等待队列中的后继节点公用同一个字段 42 */ 43 Node nextWaiter; 44 45 /** 46 * Returns true if node is waiting in shared mode. 47 */ 48 final boolean isShared() { 49 return nextWaiter == SHARED; 50 } 51 52 /** 53 * 返回前驱节点 54 */ 55 final Node predecessor() throws NullPointerException { 56 Node p = prev; 57 if (p == null) 58 throw new NullPointerException(); 59 else 60 return p; 61 } 62 63 Node() { // Used to establish initial head or SHARED marker 64 } 65 66 Node(Thread thread, Node mode) { // Used by addWaiter 67 this.nextWaiter = mode; 68 this.thread = thread; 69 } 70 71 Node(Thread thread, int waitStatus) { // Used by Condition 72 this.waitStatus = waitStatus; 73 this.thread = thread; 74 } 75 }
节点是构成同步队列的基础,同步器用有首节点head和尾节点tail。同步队列的基本结构入下图所示:
获取同步状态失败的线程要被构造成节点,并加入到同步队列尾部,加入队列的过程要保证线程安全,同步器提供了基于CAS的设置节点的方法:
1 /** 2 * 仅用于入队。 3 */ 4 private final boolean compareAndSetTail(Node expect, Node update) { 5 return unsafe.compareAndSwapObject(this, tailOffset, expect, update); 6 }
添加节点的操作通过addWaiter(Node)方法完成,源码如下:
1 /** 2 * 为当前线程创建一个节点并入列,然后返回这个节点 3 * 使用CAS入队并设置尾节点 4 */ 5 private Node addWaiter(Node mode) { 6 Node node = new Node(Thread.currentThread(), mode); 7 // Try the fast path of enq; backup to full enq on failure 8 Node pred = tail; 9 //如果队尾不为null,则尝试插入队列 10 if (pred != null) { 11 node.prev = pred; 12 if (compareAndSetTail(pred, node)) { 13 pred.next = node; 14 return node; 15 } 16 } 17 //如果队尾为null,则调用enq(Node)方法插入 18 enq(node); 19 return node; 20 }
当同步队列存在时,addWaiter(Node)方法使用快速入队,即将构造好的node的前驱指针指向当前尾节点,然后通过CAS操作将刚构造的node作为新的尾节点,
再把原尾节点的后继指针指向新尾节点;否则,采用enq(Node)入队。enq(Node)源码如下:
1 private Node enq(final Node node) { 2 for (;;) { 3 Node t = tail; 4 if (t == null) { // Must initialize 5 // 队列必须初始化,若有多个线程并发执行此操作, 6 // 通过CAS能保证只有一个线程执行成功 7 if (compareAndSetHead(new Node())) 8 tail = head; 9 } else { 10 // 采用快速入队的方式入队 11 node.prev = t; 12 if (compareAndSetTail(t, node)) { 13 t.next = node; 14 return t; 15 } 16 } 17 } 18 }
enq(Node)方法使用死循环和CAS的方式来保证节点正确添加到同步队列中。同步器将节点加入到同步队列的过程如下图:
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头结点的方法不需要使用CAS操作来保证线程安全,它只需要将首节点设置成原首节点的后继节点并断开原首节点的next引用即可。设置首节点的操作通过setHead(Node)方法来完成,源码如下:
1 private void setHead(Node node) { 2 head = node; 3 node.thread = null; //处于 GC 考虑 4 node.prev = null; 5 }
设置首节点的过程如下图所示:
原文:https://www.cnblogs.com/yocapl/p/12397056.html