public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); }函数首先判断当前线程是否已经被中断,如果是则抛出IE异常,否则调用doExchange函数,调用函数之前,为了防止传入交换对象的参数x为null,因此会当null时会传入NULL_ITEM,一个预定义的作为标识的Object作为参数,另外,根据doExchange返回的对象来判断槽中的对象为null或者当前操作被中断,如果被中断则doExchange返回CANCEL对象,这样exchange就会抛出IE异常。
private static final Object CANCEL = new Object(); private static final Object NULL_ITEM = new Object();我们再来看看doExchange方法的实现。
private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // Create in case occupying int index = hashIndex(); // Index of current slot int fails = 0; // Number of CAS failures for (;;) { Object y; // Contents of current slot Slot slot = arena[index]; if (slot == null) // Lazily initialize slots createSlot(index); // Continue loop to reread else if ((y = slot.get()) != null && // Try to fulfill slot.compareAndSet(y, null)) { Node you = (Node)y; // Transfer item if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // Else cancelled; continue } else if (y == null && // Try to occupy slot.compareAndSet(null, me)) { if (index == 0) // Blocking wait for slot 0 return timed ? awaitNanos(me, slot, nanos) : await(me, slot); Object v = spinWait(me, slot); // Spin wait for non-0 if (v != CANCEL) return v; me = new Node(item); // Throw away cancelled node int m = max.get(); if (m > (index >>>= 1)) // Decrease index max.compareAndSet(m, m - 1); // Maybe shrink table } else if (++fails > 1) { // Allow 2 fails on 1st slot int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // Grow on 3rd failed slot else if (--index < 0) index = m; // Circularly traverse } } }函数首先利用当前要交换对象作为参数构造Node变量me,类Node定义如下
private static final class Node extends AtomicReference<Object> { public final Object item; public volatile Thread waiter; public Node(Object item) { this.item = item; } }内部类Node继承于AtomicReference,并且内部拥有两个成员对象item,waiter。假设线程1和线程2需要进行对象交换,类Node把线程1中需要交换的对象作为参数传递给Node构造函数,然后线程2如果在槽中发现此Node,则会利用CAS把当前原子引用从null变为需要交换的item对象,然后返回Node的成员变量item对象,构造Node的线程1调用get()方法发现原子引用非null的时候,就返回此对象。这样线程1和线程2就顺利交换对象。类Node的成员变量waiter一般在线程1如果需要阻塞和唤醒的情况下使用。
private static final int CAPACITY = 32; private static final class Slot extends AtomicReference<Object> { // Improve likelihood of isolation on <= 128 byte cache lines. // We used to target 64 byte cache lines, but some x86s (including // i7 under some BIOSes) actually use 128 byte cache lines. long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } private volatile Slot[] arena = new Slot[CAPACITY]; private final AtomicInteger max = new AtomicInteger();内部类Slot也是继承于AtomicReference,其内部变量一共定义了15个long型成员变量,这15个long成员变量的作用就是缓存填充(cache padding),这样可以避免在大量CAS的时候减轻cache的影响。arena定义为大小为CAPACITY的数组,而max就是arena实际使用的数组大小,一般max会根据情况进行增长或者缩减,这样避免同时对一个槽进行CAS带来的性能下降影响。
private final int hashIndex() { long id = Thread.currentThread().getId(); int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193; int m = max.get(); int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1)) ((0x000001f8 >>> m) & 2) | // The constants hold ((0xffff00f2 >>> m) & 1)); // a lookup table int index; while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m return index; }hashIndex主要根据当前线程的id根据one-step FNV-1a的算出对应的哈希值,并且利用一个快速的模数估算来把哈希值限制在[0, max)之间(max是槽实际使用大小),具体实现涉及各种运算,有兴趣可以自行研究,此处略去。
private void createSlot(int index) { Slot newSlot = new Slot(); Slot[] a = arena; synchronized (a) { if (a[index] == null) a[index] = newSlot; } }createSlot的实现很简单,只是根据index参数把数组中的对应位置添加引用。但要注意并发问题,因此在给数组赋值的时候还要利用synchronized关键字进行同步。
private static final int NCPU = Runtime.getRuntime().availableProcessors(); private static final int SPINS = (NCPU == 1) ? 0 : 2000; private static Object await(Node node, Slot slot) { Thread w = Thread.currentThread(); int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) // Spin-wait phase --spins; else if (node.waiter == null) // Set up to block next node.waiter = w; else if (w.isInterrupted()) // Abort on interrupt tryCancel(node, slot); else // Block LockSupport.park(node); } }首先看看SPINS变量的定义,SPINS表示的是在阻塞或者等待匹配中超时放弃前需要自旋轮询变量的次数,在当只有单个CPU时为0,否则为2000。SPINS在多核CPU上能够在交换中,如果其中一条线程由于GC或者被抢占等原因暂停时,能够只等待短暂的轮询后即可重新进行交换操作。来看看await的实现,同样在循环里有四个判断:
private static boolean tryCancel(Node node, Slot slot) { if (!node.compareAndSet(null, CANCEL)) return false; if (slot.get() == node) // pre-check to minimize contention slot.compareAndSet(node, null); return true; }tryCancel的实现很简单,首先需要CAS把当前结点的原子引用从null变为CANCEL对象,如果CAS失败,则有可能已经有线程顺利与当前结点进行匹配,并且调用CAS进行了交换。否则的话,再调用CAS把node所在的slot修改为null。如果这里CAS成功,则CANCEL对象会被返回到exchange方法里,让exchange方法判断后,抛出InterruptedException异常。
private static Object spinWait(Node node, Slot slot) { int spins = SPINS; for (;;) { Object v = node.get(); if (v != null) return v; else if (spins > 0) --spins; else tryCancel(node, slot); } }spinWait的实现与await类似,但稍有不同,主要逻辑是如果经过SPINS次自旋以后,仍然无法被匹配,则会调用tryCancel把当前结点调用tryCancel取消,这样返回doExchange的时候,如果发现当前结点已经被取消,则重新构造一个新结点Node,并且把index的值右移一位(即整除2),另外此处还需要考虑把槽的数量减少,于是判断如果max的值比整除后的index要大,则通过CAS把max值减去一。
private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; else if (node.waiter == null) node.waiter = w; else if (w.isInterrupted()) tryCancel(node, slot); else LockSupport.parkNanos(node, nanos); } else if (tryCancel(node, slot) && !w.isInterrupted()) return scanOnTimeout(node); } }awaitNanos大体逻辑基本与await相同,但添加了一些关于超时判断的逻辑。其中最主要的是在超时之后,会尝试调用scanOnTimeout函数。
private Object scanOnTimeout(Node node) { Object y; for (int j = arena.length - 1; j >= 0; --j) { Slot slot = arena[j]; if (slot != null) { while ((y = slot.get()) != null) { if (slot.compareAndSet(y, null)) { Node you = (Node)y; if (you.compareAndSet(null, node.item)) { LockSupport.unpark(you.waiter); return you.item; } } } } } return CANCEL; }scanOnTimeout把整个槽表都扫描一次,如果发现有线程在另外的槽位中,则进行CAS交换。这样就可以减少超时的可能性。注意CAS替换的是node.item,并不是get()方法返回的先前在tryCancel中被CAS掉的原子引用。
Exchanger源码Android版剖析,布布扣,bubuko.com
原文:http://blog.csdn.net/pun_c/article/details/37829035