[c实现的队列](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue)
下面是akka实现的一个MPSC队列。
PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了
PPS: single customer 就不太需要考虑消费者的同时取的竞争状态
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch;
import akka.util.Unsafe;
import java.util.concurrent.atomic.AtomicReference;
/**
* Lock-free MPSC linked queue implementation based on Dmitriy Vyukov‘s non-intrusive MPSC queue:
* http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
*/
@SuppressWarnings("serial")
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
// Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
//AtomicReference的value对应队列的链尾Node
@SuppressWarnings("unused")
private volatile Node<T> _tailDoNotCallMeDirectly;
protected AbstractNodeQueue() {
final Node<T> n = new Node<T>();
_tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头!
set(n);//初始化链尾部Node
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
@SuppressWarnings("unchecked")
protected final Node<T> peekNode() {//链头
for(;;) {
final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
final Node<T> next = tail.next();
if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了
return next;
}
}
public final T peek() {
final Node<T> n = peekNode();
return (n != null) ? n.value : null;
}
public final void add(final T value) {
final Node<T> n = new Node<T>(value);
getAndSet(n).setNext(n);//蛮经典的一句,将AtomicReference的value设置为最新的Node,并将n链接到链表尾部上去
}
public final void addNode(final Node<T> n) {
n.setNext(null);
getAndSet(n).setNext(n);
}
public final boolean isEmpty() {
return peek() == null;
}
public final int count() {//count 是不准确的
int count = 0;
for(Node<T> n = peekNode();n != null; n = n.next())
++count;
return count;
}
/*
* !!! There is a copy of this code in pollNode() !!!
*/
public final T poll() {
final Node<T> next = peekNode();
if (next == null) return null;
else {
final T ret = next.value;
next.value = null;
Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉
return ret;
}
}
@SuppressWarnings("unchecked")
public final Node<T> pollNode() {
Node<T> tail;
Node<T> next;
for(;;) {
tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
next = tail.next();
if (next != null || get() == tail)
break;
}
if (next == null) return null;
else {
tail.value = next.value;
next.value = null;
Unsafe.instance.putOrderedObject(this, tailOffset, next);
return tail;
}
}
private final static long tailOffset;
static {
try {
tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
public static class Node<T> {
public T value;
@SuppressWarnings("unused")
private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作
public Node() {
this(null);
}
public Node(final T value) {
this.value = value;
}
@SuppressWarnings("unchecked")
public final Node<T> next() {
return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);
}
protected final void setNext(final Node<T> newNext) {
Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
}
private final static long nextOffset;
static {
try {
nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}
}
}
}
原文:http://my.oschina.net/myprogworld/blog/381183