SynchronousQueue 没有长度,每一个入队操作必须对应一个出队操作,或者每一个出队操作必须对应一个入栈操作,否则阻塞。SynchronousQueue内部提供两种模式TransferStack非公平模式(LIFO)和TransferQueue公平模式(FIFO)。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue 继承AbstractQueue抽象类,并实现BlockingQueue接口
abstract static class Transferer<E>
Transferer是抽象类,它有两个实现TransferStack、TransferQueue
abstract E transfer(E e, boolean timed, long nanos);
此方法可以既可以执行put也可以执行take操作。
static final class TransferStack<E> extends Transferer<E>
static final class SNode
// 后面节点
volatile SNode next;
// 匹配节点
volatile SNode match;
// 等待线程
volatile Thread waiter;
// 元素
Object item;
// 节点模式
int mode;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// 匹配节点偏移量
private static final long matchOffset;
// 后续节点偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
SNode(Object item) {
this.item = item;
}
// 修改后续节点
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// 尝试匹配
boolean tryMatch(SNode s) {
// 修改匹配节点
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 唤醒等待线程
Thread w = waiter;
if (w != null) {
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
// 尝试取消等待
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
// 是否匹配
boolean isCancelled() {
return match == this;
}
// 消费者
static final int REQUEST = 0;
// 生产者
static final int DATA = 1;
// 生产者在等待消费者消费
static final int FULFILLING = 2;
// 头节点
volatile SNode head;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// head偏移量
private static final long headOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferStack.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
} catch (Exception e) {
throw new Error(e);
}
}
// 更新头节点
boolean casHead(SNode h, SNode nh) {
// 把头节点从h更新为nh
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}
// 判断是否为FULFILLING模式
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 设置节点属性,节点为空创建新节点
static SNode snode(SNode s, Object e, SNode next, int mode) {
// s 为空,创建新节点
if (s == null) s = new SNode(e);
// 设置s属性
s.mode = mode;
s.next = next;
return s;
}
// 如果节点在栈头或栈头为FULFILLING的节点,则返回true
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
入队出队
E transfer(E e, boolean timed, long nanos) {
// 根据元素判断模式
SNode s = null;
int mode = (e == null) ? REQUEST : DATA;
// 自旋
for (;;) {
SNode h = head;
// 头节点模式与当前模式一样
if (h == null || h.mode == mode) {
// 如果超时,则取消等待
if (timed && nanos <= 0) {
if (h != null && h.isCancelled())
casHead(h, h.next);
else
return null;
}
// 没有超时,入栈,head指向他
else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋等待匹配
SNode m = awaitFulfill(s, timed, nanos);
// 取消等待
if (m == s) {
// 清理取消等待的节点
clean(s);
return null;
}
// 头节点匹配成功,头后移
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// REQUEST返回匹配元素,DATA返回本身
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 头节点不为Fulfilling模式
else if (!isFulfilling(h.mode)) {
// 头节点是否取消等待
if (h.isCancelled())
// 头节点后移
casHead(h, h.next);
// 入栈修改头节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
// 自旋
for (;;) {
SNode m = s.next;
// next节点为null,则出栈
if (m == null) {
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
// 尝试匹配是s节点
if (m.tryMatch(s)) {
// s出栈
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 节点后移
s.casNext(m, mn);
}
}
}
// 头节点为Fulfilling模式
else {
SNode m = h.next;
// 头节点next为空,修改头节点
if (m == null)
casHead(h, null);
else {
SNode mn = m.next;
// 试匹配,如果匹配成功,
// 栈头和匹配节点出栈,否则跳过后继节点
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}
自旋或阻塞
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 获取剩余等待时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 获取自旋次数
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋
for (;;) {
// 响应中断
if (w.isInterrupted())
s.tryCancel();
// 获取匹配节点
SNode m = s.match;
if (m != null)
return m;
// 是否需要等待
if (timed) {
nanos = deadline - System.nanoTime();
// 等待时间小于0,出队
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 自旋次数减一
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
// 数组等待线程
else if (s.waiter == null)
s.waiter = w;
// 等待
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
移除队列中取消等待的线程节点
void clean(SNode s) {
// 置空元素和等待线程
s.item = null;
s.waiter = null;
SNode past = s.next;
// past已取消节点后移
if (past != null && past.isCancelled())
past = past.next;
// 循环修改头节点
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
// 设置栈头节点的next为第一个非取消等待的节点
casHead(p, p.next);
// 遍历栈
while (p != null && p != past) {
SNode n = p.next;
// 移除取消等待的节点
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
// 节点后移
p = n;
}
}
这里说一下大概逻辑,TransferStack在执行读写时,首先判断元素是否为空,为空REQUEST模式,否则DATA模式。队列为空或当前模式与队头模式一样,自旋阻塞;队列不为空且与队头的模式不同,匹配成功,出队操作;队列不为空且队头为FULFILLING模式,从队头往后遍历找第一个非FULFILLING模式匹配,匹配成功出队。
static final class TransferQueue<E> extends Transferer<E>
static final class QNode
// 下一个节点
volatile QNode next;
// 节点元素
volatile Object item;
// 等待线程
volatile Thread waiter;
// 是否为DATA模式
final boolean isData;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// item偏移量
private static final long itemOffset;
// next偏移量
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// CAS设置next属性
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// CAS设置元素值
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 取消节点等待(方便GC)
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 是否取消等待
boolean isCancelled() {
return item == this;
}
// 是否已出队
boolean isOffList() {
return next == this;
}
// 头节点
transient volatile QNode head;
// 尾节点
transient volatile QNode tail;
// 待取消节点
transient volatile QNode cleanMe;
// 内存操作不安全类
private static final sun.misc.Unsafe UNSAFE;
// 头节点偏移量
private static final long headOffset;
// 尾节点偏移量
private static final long tailOffset;
// 待取消节点偏移量
private static final long cleanMeOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
TransferQueue() {
// 初始化一个空的QNode
// isData为false
QNode h = new QNode(null, false);
// 设置头节点和尾节点
head = h;
tail = h;
}
// 尝试修改新头节点
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}
// 尝试修改新尾节点
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
// 尝试修改新取消等待节点
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
入队出队
E transfer(E e, boolean timed, long nanos) {
QNode s = null;
// e不为null,则为DATA模式,否则为REQUEST模式
boolean isData = (e != null);
for (;;) {
// 获取当前头尾节点
QNode t = tail;
QNode h = head;
// 头或尾节点为空
if (t == null || h == null)
continue;
// 头尾节点一样,模式一样
if (h == t || t.isData == isData) {
QNode tn = t.next;
// 尾节点已变化
if (t != tail)
continue;
// 尾节点next不为空
if (tn != null) {
// 尝试修改尾节点
advanceTail(t, tn);
continue;
}
// 超时,并且剩余时间小于0
if (timed && nanos <= 0)
// 返回null
return null;
// 新节点为空,初始化新节点
if (s == null)
s = new QNode(e, isData);
// 尾节点的next为null,就把新节点加到后面
if (!t.casNext(null, s))
// 替换失败,开始下轮自旋
continue;
// 尝试修改尾节点
advanceTail(t, s);
// 阻塞等待
Object x = awaitFulfill(s, e, timed, nanos);
// 如果s指向自己,s出队列
if (x == s) {
// 清除队列中取消等待的线程节点
clean(t, s);
return null;
}
// 是否已出队
if (!s.isOffList()) {
// 修改头节点
advanceHead(t, s);
// 元素指向自己
if (x != null)
s.item = s;
// 等待取消
s.waiter = null;
}
// 返回元素
return (x != null) ? (E)x : e;
}
// 头尾节点不一样
else {
QNode m = h.next;
// 头尾节点变化
if (t != tail || m == null || h != head)
continue;
// 获取当前元素
Object x = m.item;
// 后续节点模式跟当前模式一样
// 或者已经尝试取消
// 或者修改后续节点元素为当前元素(交换元素)
if (isData == (x != null) || x == m ||
!m.casItem(x, e)) {
// 出队
advanceHead(h, m);
// 进入下次自旋,修改对方线程
continue;
}
// 直接修改头节点
advanceHead(h, m);
// 解除后续节点阻塞
LockSupport.unpark(m.waiter);
// 返回后续节点元素
return (x != null) ? (E)x : e;
}
}
}
自旋阻塞
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 如果中断,则取消等待
if (w.isInterrupted())
// 把s的item从e修改为s
s.tryCancel(e);
// 获取s的元素
Object x = s.item;
// s的item不为e,直接返回x
if (x != e)
return x;
// 超时
if (timed) {
// 计算剩余超时时间
nanos = deadline - System.nanoTime();
// 超时时间小于等于0,取消节点等待
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋次数减一
if (spins > 0)
--spins;
// 等待线程不为空
else if (s.waiter == null)
// 设置等待线程为当前线程
s.waiter = w;
// 没有超时
else if (!timed)
// 开始阻塞
LockSupport.park(this);
// 超时时间大于1000
else if (nanos > spinForTimeoutThreshold)
// 阻塞
LockSupport.parkNanos(this, nanos);
}
}
移除队列中取消等待的线程节点
void clean(QNode pred, QNode s) {
s.waiter = null;
// 遍历
while (pred.next == s) {
// 获取头节点和头节点next
QNode h = head;
QNode hn = h.next;
// 头节点next不为空,并且是取消等待节点
if (hn != null && hn.isCancelled()) {
// 修改头节点
advanceHead(h, hn);
continue;
}
// 获取尾节点
QNode t = tail;
// 头尾一样时返回
if (t == h)
return;
// 尾节点有next
QNode tn = t.next;
// 非一致性读
if (t != tail)
continue;
// 尾节点next不为空,修改尾节点
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s不是尾节点
if (s != t) {
// 修改pred下级节点
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
// 获取取消节点
QNode dp = cleanMe;
// 取消节点不为空
if (dp != null) {
// 获取取消节点next
QNode d = dp.next;
QNode dn;
// 移除前一个取消等待的节点
if (d == null ||
d == dp ||
!d.isCancelled() ||
(d != t &&
(dn = d.next) != null &&
dn != d &&
dp.casNext(d, dn)))
casCleanMe(dp, null);
if (dp == pred)
return;
}
// 设置取消节点
else if (casCleanMe(null, pred))
return;
}
}
这里说一下大概逻辑,TransferQueue在执行读写时,首先判断元素是否为空,为空REQUEST模式,否则DATA模式。队列为空或当前模式与队尾模式一样,自旋阻塞;队列不为空且与队头的模式不同,匹配成功,出队操作。
// CPU的数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();
// 有超时的情况自旋多少次,当CPU数量小于2的时候不自旋
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
// 没有超时的情况自旋多少次
static final int maxUntimedSpins = maxTimedSpins * 16;
// 剩余时间阈值常量(没有超时时间会用到)
static final long spinForTimeoutThreshold = 1000L;
// Transferer模式
private transient volatile Transferer<E> transferer;
// 下面三个都是序列化时使用
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
// 默认初始化
public SynchronousQueue() {
// 默认非公平模式
this(false);
}
// 设置是否使用公平模式
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
public boolean isEmpty() {
return true;
}
public int size() {
return 0;
}
public int remainingCapacity() {
return 0;
}
public void clear() {
}
public boolean contains(Object o) {
return false;
}
public boolean remove(Object o) {
return false;
}
public boolean containsAll(Collection<?> c) {
return c.isEmpty();
}
public boolean removeAll(Collection<?> c) {
return false;
}
public boolean retainAll(Collection<?> c) {
return false;
}
public E peek() {
return null;
}
可以说SynchronousQueue是没有容量的(只有生成者线程或者消费者线程),所以长度可以看做0,也不能peek。
// 入队,没有出队操作一直阻塞
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 返回为null,则put失败,中断当前线程
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
// 入队,超时阻塞
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
// 返回为null,则offer失败,中断当前线程
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
// 入队
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
// 入队
return transferer.transfer(e, true, 0) != null;
}
// 出队,没有入队操作一直阻塞
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
// 出队,超时阻塞
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 出队,获取返回值
E e = transferer.transfer(null, true, unit.toNanos(timeout));
// 返回值不为空,或者线程未中断,返回结果
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
// 出队
public E poll() {
// 出队
return transferer.transfer(null, true, 0);
}
数据结构 - SynchronousQueue 线程通信阻塞队列
原文:https://www.cnblogs.com/yuanjiangnan/p/12764026.html