终结(Termination)
phaser可能进入一个终结状态,可以通过isTerminated来检查。当终结的时候,所有的同步方法都不会在等待下一个阶段而直接返回,返回一个负值来表示该状态。类似地,在终结的时候尝试注册没有任何效果。当onAdvance调用返回true的时候就会触发终结。onAdvance默认实现为当一个反注册导致注册parties数降为0的时候返回true。当phser要控制操作在一个固定得迭代次数时,就可以很方便地重写这个方法,当当前阶段号到达阀值得时候就返回true导致终结。forceTermination方法也时另一个可以突然释放等待线程并且允许它们终结。
堆叠(Tiering)
Phaser可以被堆叠在一起(也就是说,以树形结构构造)来降低竞争。Phaser的parties数很大的时候,以一组子phasers共享一个公共父亲能够减轻严重的同步竞争的成本。这样做可以大大提供吞吐量,但同时也会导致每个操作的更高的成本。
在一棵堆叠的phaser树中,子phaser在父亲上的注册和反注册都会被自动管理。当子phaser的注册parties树为非0的时候,子phaser就会注册到父亲上。当由于arriveAndDeregister的调用使注册的parties数变为0时,子phaser就会从父亲中反注册。这样就算父phaser的所有parties都到达了阶段,也必须等待子phaser的所有parties都到达了阶段并显式调用父phaser的awaitAdvance才算到达新的阶段。反之亦然。这样父phaser或者子phaser里注册过的所有parties就可以一起互相等待到新的阶段。另外,在这个堆叠结构的实现里,可以确保root结点必然是最先更新阶段号,然后才到其子结点,逐渐传递下去。
+------+ +------+ +------+
| root | <-- |parent| <-- | this |
+------+ +------+ +------+
parties:3+1 parties:3+1 parties:3
如上图所示,如果parties数多的时候,可以根据堆叠成为一颗树,这里假设root和parent和this都各初始化3个parties数,然后如果当前结点this有注册parties数,则会在parent上注册一个parties,因此事实上root和parent都注册了4个parties数。这样,如果this结点的3个parties数都到达了,就会调用parent的arrive,把parties数减去一,然后parent等待自己3个parties数都到达,就会调用root来减去一,这样root的3个parties数都到达就会一同释放所有等待结点,就实现了整棵树parties之间同步等待的功能。另外这个结构也很容易看到root结点是最快进行阶段增长的。这样做最大的好处就是减少对同一个state变量的CAS竞争带来的性能下降,不过同时每个同步操作也会增加相应的负担(每次获取状态都要和root进行阶段同步),所以一般在高并发下造成的性能下降才考虑。监控(Monitoring)
同步方法只能被注册的parties调用时,phaser的当前状态可以被任何调用者监控。在任何时刻,有getRegisteredParties总共的parties,其中,有getArrivedParties个parites到达getPhase的当前阶段。当剩下getUnarrivedParties个parties到达,phase增加。这些方法的返回值可能反映短暂的状态,因此一般在同步控制中不太有用。toString方法以一种可以方便信息监控的格式返回这些状态的快照。
Phaser的具体用法可以参考例子:这里 private volatile long state;
private static final int MAX_PARTIES = 0xffff;
private static final int MAX_PHASE = Integer.MAX_VALUE;
private static final int PARTIES_SHIFT = 16;
private static final int PHASE_SHIFT = 32;
private static final int UNARRIVED_MASK = 0xffff; // to mask ints
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs
private static final long COUNTS_MASK = 0xffffffffL;
private static final long TERMINATION_BIT = 1L << 63;
// some special values
private static final int ONE_ARRIVAL = 1;
private static final int ONE_PARTY = 1 << PARTIES_SHIFT;
private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY;
private static final int EMPTY = 1;
//内部状态辅助方法
private static int unarrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}
private static int partiesOf(long s) {
return (int)s >>> PARTIES_SHIFT;
}
private static int phaseOf(long s) {
return (int)(s >>> PHASE_SHIFT);
}
private static int arrivedOf(long s) {
int counts = (int)s;
return (counts == EMPTY) ? 0 :
(counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}
state变量为long类型,长度为64位,其中: public int register() {
return doRegister(1);
}
public int bulkRegister(int parties) {
if (parties < 0)
throw new IllegalArgumentException();
if (parties == 0)
return getPhase();
return doRegister(parties);
} 两者实现都很简单,bulkRegister方法中添加了对parties数的检查。两个方法都调用了doRegister方法实现。 private int doRegister(int registrations) {
// adjustment to state
long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
final Phaser parent = this.parent;
int phase;
for (;;) {
long s = (parent == null) ? state : reconcileState();
int counts = (int)s;
int parties = counts >>> PARTIES_SHIFT;
int unarrived = counts & UNARRIVED_MASK;
if (registrations > MAX_PARTIES - parties)
throw new IllegalStateException(badRegister(s));
phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
break;
if (counts != EMPTY) { // not 1st registration
if (parent == null || reconcileState() == s) {
if (unarrived == 0) // wait out advance
root.internalAwaitAdvance(phase, null);
else if (UNSAFE.compareAndSwapLong(this, stateOffset,
s, s + adjust))
break;
}
}
else if (parent == null) { // 1st root registration
long next = ((long)phase << PHASE_SHIFT) | adjust;
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
break;
}
else {
synchronized (this) { // 1st sub registration
if (state == s) { // recheck under lock
phase = parent.doRegister(1);
if (phase < 0)
break;
// finish registration whenever parent registration
// succeeded, even when racing with termination,
// since these are part of the same "transaction".
while (!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
((long)phase << PHASE_SHIFT) | adjust)) {
s = state;
phase = (int)(root.state >>> PHASE_SHIFT);
// assert (int)s == EMPTY;
}
break;
}
}
}
}
return phase;
}
doRegister方法做了以下事情: private long reconcileState() {
final Phaser root = this.root;
long s = state;
if (root != this) {
int phase, p;
// CAS to root phase with current parties, tripping unarrived
while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase << PHASE_SHIFT) |
((phase < 0) ? (s & COUNTS_MASK) :
(((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}
reconcileState主要目的是和根结点保持阶段号同步。前面说过,如果出现堆叠情况,根结点是最先进行阶段号增加,虽然阶段号增加的操作会逐渐传递到子phaser,但某些同步操作,如动态注册等,需要马上获悉整棵树的阶段号状态避免多余的CAS,因此就需要显式和根结点保持同步。reconcileState实现就是如此,如果root!=this,即发生堆叠,就利用自旋CAS把当前修改状态值,要注意的是由于阶段号增加,会同时会把未到达的parties数设置为原来的注册parties数。主要实现都是移位和掩位操作,就不再赘述。 int awaitAdvance(int phase)
int awaitAdvanceInterruptibly(int phase) throws InterruptedException
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException 这三个方法的实现其实都大同小异,主要是增加来对中断和超时的控制,具体实现如下: public int awaitAdvance(int phase) {
final Phaser root = this.root;
long s = (root == this) ? state : reconcileState();
int p = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
if (p == phase)
return root.internalAwaitAdvance(phase, null);
return p;
}
public int awaitAdvanceInterruptibly(int phase)
throws InterruptedException {
//省略一样的代码
if (p == phase) {
QNode node = new QNode(this, phase, true, false, 0L);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
}
return p;
}
public int awaitAdvanceInterruptibly(int phase,
long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
long nanos = unit.toNanos(timeout);
//省略一样的代码
if (p == phase) {
QNode node = new QNode(this, phase, true, true, nanos);
p = root.internalAwaitAdvance(phase, node);
if (node.wasInterrupted)
throw new InterruptedException();
else if (p == phase)
throw new TimeoutException();
}
return p;
}
三者实现大致结构都一样,首先获取当前状态值,如果堆叠则调用reconcileState获取根结点同步后的状态值。然后如果当前阶段号与请求等待的阶段号相等,则调用根结点的internalAwaitAdvance方法(根结点是最先进行阶段号增长)。 static final class QNode implements ForkJoinPool.ManagedBlocker {
//省略其它成员变量以及构造函数
QNode next;
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
if (nanos <= 0L) {
thread = null;
return true;
}
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (!timed)
LockSupport.park(this);
else if (nanos > 0)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
Phaser的等待队列使用的是Treiber无锁算法的栈操作。例子实现可以参考这里。首先可以注意到QNode类是实现了ForkJoinPool.ManagedBlocker接口,这个接口可以确保如果使用ForkJoinWorkerThread的时候就可以保持并发执行任务。 //NCPU是当前CPU数量
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
函数做了以下事情: private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
private int abortWait(int phase) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;) {
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
releaseWaiters方法主要利用自旋从head结点起把队列里的结点出队,如果结点的thread引用为非null,则顺便唤醒。另外注意的是,每次出队前都会判断当前结点的阶段号是否与状态的阶段号相等,这里的状态阶段号用的是root.state,这是考虑到堆叠的情况。 int arrive() //一个party到达
int arriveAndDeregister() //一个party到达并且反注册这个party 这两个函数的实现都很简单: public int arrive() {
return doArrive(ONE_ARRIVAL);
}
public int arriveAndDeregister() {
return doArrive(ONE_DEREGISTER);
} 主要是调用doArrive实现,doArrive实现如下 private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
releaseWaiters(phase);
}
else if (nextUnarrived == 0) { // propagate deregistration
phase = parent.doArrive(ONE_DEREGISTER);
UNSAFE.compareAndSwapLong(this, stateOffset,
s, s | EMPTY);
}
else
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
doArrive看上去很复杂,但其实逻辑并不算太复杂。 public int arriveAndAwaitAdvance() {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState();
int phase = (int)(s >>> PHASE_SHIFT);
if (phase < 0)
return phase;
int counts = (int)s;
int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
if (unarrived <= 0)
throw new IllegalStateException(badArrive(s));
if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
s -= ONE_ARRIVAL)) {
if (unarrived > 1)
return root.internalAwaitAdvance(phase, null);
if (root != this)
return parent.arriveAndAwaitAdvance();
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase << PHASE_SHIFT;
if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
return (int)(state >>> PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}
函数的大致结构和doArrive差不多,在CAS之后如果unarrived大于1,则需要调用根结点的internalAwaitAdvance进行阻塞等待直到阶段号增长,如果unarrived小于等于1,则如果有堆叠发生(root != this)则调用父phaser的arriveAndAwaitAdvance,否则的话调用onAdvance,并且调用CAS把状态更新,然后调用releaseWaiters把之前的阶段等待队列释放。该函数对比起先调用arrive和awaitAdvance,更加方便并且由于减少了一些多余的变量读取和逻辑,速度更加快。原文:http://blog.csdn.net/pun_c/article/details/37965813