exchange()
方法交换数据, 如果第一个线程先执行 exchange()
方法,会一直等待第二个线程也执行 exchange()
,当两个线程都到达同步点时,两个线程交换数据,将本线程生产出来的数据传递给对方。exchange()
方法。exchange()
方法。public V exchange(V x) throws InterruptedException public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
exchanger()
时,对于其伙伴结点的情况分为三种。
exchanger()
方法,则唤醒该伙伴结点然后进行数据交换,得到各自数据返回。数据结构
@sun.misc.Contended static final class Node { int index; // arena的下标,多个槽位的时候利用 int bound; // 上一次记录的Exchanger.bound; int collides; // 在当前bound下CAS失败的次数; int hash; // 用于自旋; Object item; // 这个线程的当前项,也就是需要交换的数据; volatile Object match; // 交换的数据 volatile Thread parked; // 线程 } /** * Value representing null arguments/returns from public * methods. Needed because the API originally didn‘t disallow null * arguments, which it should have. * 如果交换的数据为 null,则用NULL_ITEM 代替 */ private static final Object NULL_ITEM = new Object();
/** The number of CPUs, for sizing and spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * The bound for spins while waiting for a match. The actual * number of iterations will on average be about twice this value * due to randomization. Note: Spinning is disabled when NCPU==1. */ private static final int SPINS = 1 << 10; // 自旋次数 /** * Slot used until contention detected. */ private volatile Node slot; // 用于交换数据的槽位 /** * Per-thread state 每个线程的数据,ThreadLocal 子类 */ private final Participant participant; /** The corresponding thread local class */ static final class Participant extends ThreadLocal<Node> { // 初始值返回Node public Node initialValue() { return new Node(); } }
exchange 方法
没有设定超时时间的 exchange 方法
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // 转换成空对象 // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常 if ((arena != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V) v; }
slotExchange()
单槽方法,否则判断线程是否中断,如果中断值抛出 InterruptedException 异常,没有中断则执行 arenaExchange()
多槽方法,如果该方法返回 null,抛出中断异常,最后返回结果。具有超时功能的 exchange 方法
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x;// 转换成空对象 long ns = unit.toNanos(timeout); // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常 if ((arena != null || (v = slotExchange(item, true, ns)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null))) throw new InterruptedException(); if (v == TIMED_OUT)// 超时 throw new TimeoutException(); return (v == NULL_ITEM) ? null : (V) v; }
slotExchange 方法
private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get(); // 获取当前线程携带的Node Thread t = Thread.currentThread(); // 当前线程 if (t.isInterrupted()) // 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记 return null; for (Node q;;) { if ((q = slot) != null) { // slot不为null, 说明已经有线程在这里等待了 if (U.compareAndSwapObject(this, SLOT, q, null)) { // 将slot重新设置为null, CAS操作 Object v = q.item; // 取出等待线程携带的数据 q.match = item; // 将当前线程的携带的数据交给等待线程 Thread w = q.parked; // 可能存在的等待线程(可能中断,不等了) if (w != null) U.unpark(w); // 唤醒等待线程 return v; // 返回结果,交易成功 } // CPU的个数多于1个,并且bound为0时创建 arena,并将bound设置为SEQ大小 if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ)) arena = new Node[(FULL + 2) << ASHIFT]; // 根据CPU的个数估计Node的数量 } else if (arena != null) return null; // 如果slot为null, 但arena不为null, 则转而路由到arenaExchange方法 else { // 最后一种情况,说明当前线程先到,则占用此slot p.item = item; // 将携带的数据卸下,等待别的线程来交易 if (U.compareAndSwapObject(this, SLOT, null, p)) // 将slot的设为当前线程携带的Node break; // 成功则跳出循环 p.item = null; // 失败,将数据清除,继续循环 } } // 当前线程等待被释放, spin -> yield -> block/cancel int h = p.hash; // 伪随机,用于自旋 long end = timed ? System.nanoTime() + ns : 0L; // 如果timed为true,等待超时的时间点; 0表示没有设置超时 int spins = (NCPU > 1) ? SPINS : 1; // 自旋次数 Object v; while ((v = p.match) == null) { // 一直循环,直到有线程来交易 if (spins > 0) { // 自旋,直至spins不大于0 h ^= h << 1; // 伪随机算法, 目的是等h小于0(随机的) h ^= h >>> 3; h ^= h << 10; if (h == 0) // 初始值 h = SPINS | (int) t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 等到h < 0, 而spins的低9位也为0(防止spins过大,CPU空转过久),让出CPU时间片,每一次等待有两次让出CPU的时机(SPINS >>> 1) } else if (slot != p) // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好 spins = SPINS; // 如果线程没被中断,且arena还没被创建,并且没有超时 else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上 p.parked = t; // 挂在此结点上的阻塞着的线程 if (slot == p) U.park(false, ns); // 阻塞, 等着被唤醒或中断 p.parked = null; // 醒来后,解除与结点的联系 U.putObject(t, BLOCKER, null); // 解除阻塞对象 } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超时或其它(取消),给其它线程腾出slot v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; break; } } // 归位 U.putOrderedObject(p, MATCH, null); p.item = null; p.hash = h; return v; }
private static final int ASHIFT = 7; // 两个有效槽(slot -> Node)之间的字节地址长度(内存地址,以字节为单位),1 << 7至少为缓存行的大小,防止伪共享 private static final int MMASK = 0xff; // 场地(一排槽,arena -> Node[])的可支持的最大索引,可分配的大小为 MMASK + 1 private static final int SEQ = MMASK + 1; // bound的递增单元,确立其唯一性 private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的个数,用于场地大小和自旋控制 static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引 private static final int SPINS = 1 << 10; // 自旋次数,NCPU = 1时,禁用 private static final Object NULL_ITEM = new Object();// 空对象,对应null private static final Object TIMED_OUT = new Object();// 超时对象,对应timeout // 多个线程交换/多槽位 private volatile Node[] arena;
arenaExchange 方法
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; // 交换场地,一排slot Node p = participant.get(); // 获取当前线程携带的Node p.index 初始值为 0 for (int i = p.index;;) { // arena的索引,数组下标 int b, m, c; long j; // 原数组偏移量,包括填充值 // 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值,也即真正可用的Node //如果i为0,j相当于是 "第一个"槽位 Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不为null, 说明已经有线程在这里等了,重新将其设置为null, CAS操作 Object v = q.item; // 取出等待线程携带的数据 q.match = item; // 将当前线程携带的数据交给等待线程 Thread w = q.parked; // 可能存在的等待线程 if (w != null) U.unpark(w); // 唤醒等待线程 return v; // 返回结果, 交易成功 } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交换位置,且槽位为空 p.item = item; // 将携带的数据卸下,等待别的线程来交易 if (U.compareAndSwapObject(a, j, null, p)) { // 槽位占领成功 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 计算出超时结束时间点 Thread t = Thread.currentThread(); // 当前线程 for (int h = p.hash, spins = SPINS;;) { // 一直循环,直到有别的线程来交易,或超时,或中断 Object v = p.match; // 检查是否有别的线程来交换数据 if (v != null) { // 有则返回 U.putOrderedObject(p, MATCH, null); // match重置,等着下次使用 p.item = null; // 清空,下次接着使用 p.hash = h; return v; // 返回结果,交易结束 } else if (spins > 0) { // 自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // 移位加异或,伪随机 if (h == 0) // 初始值 h = SPINS | (int) t.getId(); else if (h < 0 && // SPINS >>> 1, 一半的概率 (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // 每一次等待有两次让出CPU的时机 } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好 else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上 p.parked = t; // 挂在此结点上的阻塞着的线程 if (U.getObjectVolatile(a, j) == p) U.park(false, ns); // 阻塞, 等着被唤醒或中断 p.parked = null; // 醒来后,解除与结点的联系 U.putObject(t, BLOCKER, null); // 解除阻塞对象 } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // 尝试缩减 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位递增,低位 -1 p.item = null; // 重置 p.hash = h; i = p.index >>>= 1; // 索引减半,为的是快速找到汇合点(最左侧) if (Thread.interrupted())// 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记 return null; if (timed && m == 0 && ns <= 0L) // 超时 return TIMED_OUT; break; // 重新开始 } } } else p.item = null; // 重置 } else { if (p.bound != b) { // 别的线程更改了bound,重置collides为0, i的情况如下:当i != m, 或者m = 0时,i = m; 否则,i = m-1; 从右往左遍历 p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; // index 左移 } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位递增,低位 +1 p.collides = c + 1; i = (i == 0) ? m : i - 1; // 左移,遍历槽位,m == FULL时,i == 0(最左侧),重置i = m, 重新从右往左循环遍历 } else i = m + 1; // 槽位增长 p.index = i; } } }
static final class Participant extends ThreadLocal<Node> {
public Node initialValue() { return new Node(); }
}
伪随机
h ^= h << 1;
h ^= h >>> 3;
h ^= h << 10;
max(2^31-1) - min(-2^31) = 2^32 - 1 = 4294967295
。为什么选用 1,3,10
2^32 - 1 = 4294967295
。[4294967295] (1, 3, 10) (2, 7, 7) (2, 7, 9) (5, 9, 7) (7, 1, 9) (7, 7, 2) (7, 9, 5)
为什么要有两次左移和一次右移
自旋等待
private static final int SPINS = 1 << 10;
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // h < 0,一半的概率
Thread.yield(); // 每一次等待有两次让出CPU的时机
((1024 >>>1) -1) = 511 = 0111111111
,spins 默认为 1024 循环递减。arena 的创建
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;
private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff; // 255
......
if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
slotExchange()
方法中存在竞争时,会构建 arena。
SEQ(SEQ=MMASK + 1)
,255 + 1 = 256。Runtime.getRuntime().availableProcessors()
。private static final sun.misc.Unsafe U;
private static final int ABASE;
U = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Node[].class;
s = U.arrayIndexScale(ak);
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);
FULL 和 ASHIFT 的定义
arrayBaseOffset(ak)
方法可以返回 arena 数组中第一个元素的偏移地址。arrayIndexScale(ak)
方法可以返回 arena 数组中每一个元素占用的大小,也就是元素与元素之间的间隔,即 1 << ASHIFT
为 128。
ABASE = arrayBaseOffset + (1 << ASHIFT)
是 arena 的起始位置加上 128 位这个偏移量。arrayBaseOffset + N * arrayIndexScale
。@sun.misc.Contended
注解 和 1 << ASHIFT
主要是用于避免 伪共享。1 << ASHIFT
可以避免两个 Node 在同一个共享区(缓存行)。
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
Object v = q.item; // 获取槽位中结点 q 的数据
q.match = item; // 把当前线程的数据交换给它
Thread w = q.parked; // 获得槽位中结点 q 对应的线程对象
if (w != null)
U.unpark(w); //唤醒该线程
return v;
}
bound 和 collides
private static final int MMASK = 0xff;
private static final int SEQ = MMASK + 1;
......
// MASK: 00000000000000000000000011111111
// SEQ: 00000000000000000000000100000000(MASK + 1)
// 1: 00000000000000000000000000000001
if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
// 当 bound 为 0 时,bound 被更新为 SEQ
//第一次更新
//b0: 00000000000000000000000100000000
U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)
//SEQ+1: 00000000000000000000000100000001
//b0+SEQ+1=b1: 00000000000000000000000200000001
//第二次更新
//b1+SEQ: 00000000000000000000000300000001
//第二次是 -1 的情况
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)
//b1+SEQ-1=b2: 00000000000000000000000300000000
b + SEQ + 1
后再 b + SEQ - 1
,实际经历了两个版本,并且会将 collides 重置。https://blog.csdn.net/carson0408/article/details/79477280
https://blog.csdn.net/u014634338/article/details/78385521
https://blog.csdn.net/chenssy/article/details/72550933
https://www.cnblogs.com/d-homme/p/9387948.html
https://www.cnblogs.com/aniao/p/aniao_exchanger.html
原文:https://www.cnblogs.com/youngao/p/12573935.html