Queue 接口
add()
方法在队列已满时将会抛出异常,而 offer()
在队列已满时会返回 false。入队 | 出队 | 检索 | 处理方式 |
---|---|---|---|
add() | remove() | element() | 在执行方法失败时不返回值,抛出异常。 |
offer() | poll() | peek() | 在执行方法时,给出返回值,比如 false、null。 |
BlockingQueue 接口
put()
和 take()
方法会一直阻塞。阻塞入队 | 阻塞出队 | 定时入队 | 定时出队 |
---|---|---|---|
put(E e) | E take() | offer(E e,long timeout,TimeUnit unit) | E poll(long timeout,TimeUnit unit) |
阻塞队列 | 说明 |
---|---|
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列。 |
LinkedBlockingQueue | 一个由链表结构组成的有界阻塞队列。 |
PriorityBlockingQueue | 一个支持优先级排序的无界阻塞队列。 |
DelayQueue | 一个使用优先级队列实现的无界阻塞队列。 |
SynchronousQueue | 一个不存储元素的阻塞队列。 |
LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列。 |
LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。 |
ArrayBlockingQueue
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true); ...... public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
PriorityBlockingQueue
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
DelayQueue
Deque 接口
队头入队 | 队头出队 | 队尾入队 | 队尾出队 | 队头检索 | 队尾检索 | 处理方式 |
---|---|---|---|---|---|---|
addFirst() | removeFirst() | addLast() | removeLast() | getFirst() | getLast() | 在方法执行失败时会抛出异常 |
offerFirst() | pollFirst() | offerLast() | pollLast() | peekFirst() | peekLast() | 在方法执行失败时会返回 false 或者 null。 |
SynchronousQueue
LinkedTransferQueue
take()
方法或带时间限制的 poll()
方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。
LinkedBlockingDeque
BlockingDeque 接口
阻塞队头入队 | 阻塞队头出队 | 阻塞队尾入队 | 阻塞队尾出队 | 处理方式 |
---|---|---|---|---|
putFirst(E e) | E takeFirst() | putLast(E e) | E takeLast() | 没有超时设置 |
offerFirst(E e,long timeout,TimeUnit unit) | E pollFirst(long timeout,TimeUnit unit) | offerLast(E e,long timeout,TimeUnit unit) | E pollLast(long timeout,TimeUnit unit) | 在超时之后,返回 false 或者 null。 |
add()
,offer()
,put()
时如果参数是 null,会抛出空指针。null 是用来有异常情况时做返回值的。public interface BlockingQueue<E> extends Queue<E> { //添加失败时会抛出异常 boolean add(E e); //添加失败时会返回 false boolean offer(E e); //添加元素时,如果没有空间,会阻塞等待;可以响应中断 void put(E e) throws InterruptedException; //添加元素到队列中,如果没有空间会等待参数中的时间,超时返回,会响应中断 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //获取并移除队首元素,如果没有元素就会阻塞等待 E take() throws InterruptedException; //获取并移除队首元素,如果没有就会阻塞等待参数的时间,超时返回 E poll(long timeout, TimeUnit unit) throws InterruptedException; //返回队列中剩余的空间 int remainingCapacity(); //移除队列中某个元素,如果存在的话返回 true,否则返回 false boolean remove(Object o); //检查队列中是否包含某个元素,至少包含一个就返回 true public boolean contains(Object o); //将当前队列所有元素移动到给定的集合中,这个方法比反复地获取元素更高效 //返回移动的元素个数 int drainTo(Collection<? super E> c); //移动队列中至多 maxElements 个元素到指定的集合中 int drainTo(Collection<? super E> c, int maxElements); }
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { //使用数组保存的元素 final Object[] items; //下一次取元素的索引 int takeIndex; //下一次添加元素的索引 int putIndex; //当前队列中元素的个数 int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ //全部操作的锁 final ReentrantLock lock; //等待获取元素的锁 private final Condition notEmpty; //等待添加元素的锁 private final Condition notFull; //... }
构造函数
//指定队列的容量,使用非公平锁 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //允许使用一个 Collection 来作为队列的默认元素 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { //遍历添加指定集合的元素 if (e == null) throw new NullPointerException(); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; //修改 putIndex 为 c 的容量 +1 } finally { lock.unlock(); } }
add 方法
add(E)
调用了父类的方法,而父类里调用的是 offer(E)
,如果返回 false 就抛出异常。public boolean add(E e) { return super.add(e); } //super.add() 的实现 public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
offer 方法
offer(E)
方法先拿到锁,如果当前队列中元素已满,就立即返回 false。
enqueue(E)
入队。public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
enqueue(E)
方法会将元素添加到数组队列尾部。
notEmpty.signal()
通知唤醒阻塞在获取元素的线程。private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
put 方法
put()
方法可以响应中断(lockInterruptibly
),当队列满了,就调用 notFull.await()
阻塞等待,等有消费者获取元素后继续执行, 可以添加时还是调用 enqueue(E)
。public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
offer(E,long,TimeUnit) 方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
offer()
和 put()
方法很相似,不同之处在于允许设置等待超时时间,超过这么久如果还不能有位置,就返回 false;否则调用 enqueue(E)
,然后返回 true。poll 方法
poll()
如果在队列中没有元素时会立即返回 null,如果有元素调用 dequeue()
。public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
dequeue()
方法会从队首移除元素(即 takeIndex 位置)。private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
itrs. elementDequeued()
,这个 itrs 是 ArrayBlockingQueue 的内部类 Itrs 的对象,是个迭代器,它的作用是保证循环数组迭代时的正确性。take 方法
take()
方法可以响应中断,与 poll()
不同的是,如果队列中没有数据会一直阻塞等待,直到中断或者有元素,有元素时还是调用 dequeue()
方法。public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
poll(long,TimeUnit) 方法
poll()
方法相当于无参 poll()
和 take()
的中和版,允许阻塞一段时间,如果在阻塞一段时间还没有元素进来,就返回 null。public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
peek 方法
peel()
直接返回数组中队尾的元素,并不删除元素。
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; }
总结
put
和 take
方法分别会在队列满了和队列空了之后被阻塞操作。offer
和 poll
方法来插入和提取元素,而不会在队列满了或者队列为空时阻塞操作。//链表结点 static class Node<E> { E item; //对当前结点的后一个结点,有三种情况: //1.真正的结点 //2.当前结点本身,说明当前结点是头结点 //3.null,说明这个结点是尾结点 Node<E> next; Node(E x) { item = x; } } //当前容量,默认是 Integer.MAX_VALUE private final int capacity; //当前队列中的元素数量 private final AtomicInteger count = new AtomicInteger(); //队列中的头结点,头结点的.item 永远为 null transient Node<E> head; //队列的尾结点,尾结点的 next 永远为 null private transient Node<E> last; //获取元素的锁 private final ReentrantLock takeLock = new ReentrantLock(); //等待取元素的等待队列 private final Condition notEmpty = takeLock.newCondition(); //添加元素的锁 private final ReentrantLock putLock = new ReentrantLock(); //等待添加元素的等待队列 private final Condition notFull = putLock.newCondition();
构造函数
//使用 Integer.MAX_VALUE 作为容量 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } //指定最大容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } //使用 Integer.MAX_VALUE 作为容量,同时将指定集合添加到队列中 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { //遍历添加到队列 if (e == null) //需要注意待添加集合中不能有空值 throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put 方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 不允许空元素 int c = -1; Node<E> node = new Node(e); // 以新元素构造结点 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 放锁加锁,保证调用put方法的时候只有1个线程 try { while (count.get() == capacity) { // 如果容量满了 notFull.await(); // 阻塞并挂起当前线程 } enqueue(node); // 结点添加到链表尾部 c = count.getAndIncrement(); // 元素个数+1 if (c + 1 < capacity) // 如果容量还没满 notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 } finally { putLock.unlock(); // 释放放锁,让其他线程可以调用put方法 } if (c == 0) // 由于存在放锁和拿锁,这里可能拿锁一直在消费数据,count会变化。这里的if条件表示如果队列中还有1条数据 signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 }
enqueue()
链表尾部添加结点。private void enqueue(Node<E> node) { last = last.next = node; }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
offer 方法
public boolean offer(E e) { if (e == null) throw new NullPointerException(); // 不允许空元素 final AtomicInteger count = this.count; if (count.get() == capacity) // 如果容量满了,返回false return false; int c = -1; Node<E> node = new Node(e); // 容量没满,以新元素构造结点 final ReentrantLock putLock = this.putLock; putLock.lock(); // 放锁加锁,保证调用offer方法的时候只有1个线程 try { if (count.get() < capacity) { // 再次判断容量是否已满,因为可能拿锁在进行消费数据,没满的话继续执行 enqueue(node); // 结点添加到链表尾部 c = count.getAndIncrement(); // 元素个数+1,并返回旧值 if (c + 1 < capacity) // 如果容量还没满 notFull.signal(); // 在放锁的条件对象notFull上唤醒正在等待的线程,表示可以再次往队列里面加数据了,队列还没满 } } finally { putLock.unlock(); // 释放放锁,让其他线程可以调用offer方法 } if (c == 0) // 如果队列中还有1条数据 signalNotEmpty(); // 在拿锁的条件对象notEmpty上唤醒正在等待的1个线程,表示队列里还有1条数据,可以进行消费 return c >= 0; // 添加成功返回true,否则返回false }
offer(E,long,TimeUnit) 方法
offer()
方法会阻塞一段时间,然后没结果就返回。take 方法
AtomicInteger.getAndDecrement()
方法,这个方法先返回当前值,然后加 1 ,所以后面判断是判断之前的情况。public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 拿锁加锁,保证调用take方法的时候只有1个线程 try { while (count.get() == 0) { // 如果队列里已经没有元素了 notEmpty.await(); // 阻塞并挂起当前线程 } x = dequeue(); // 删除头结点 c = count.getAndDecrement(); // 元素个数-1 if (c > 1) // 如果队列里还有元素 notEmpty.signal(); // 在拿锁的条件对象notEmpty上唤醒正在等待的线程,表示队列里还有数据,可以再次消费 } finally { takeLock.unlock(); // 释放拿锁,让其他线程可以调用take方法 } if (c == capacity) // 如果队列中还可以再插入数据 signalNotFull(); // 在放锁的条件对象notFull上唤醒正在等待的1个线程,表示队列里还能再次添加数据 return x; }
public boolean remove(Object o) { if (o == null) return false; fullyLock(); // remove操作要移动的位置不固定,2个锁都需要加锁 try { for (Node<E> trail = head, p = trail.next; // 从链表头结点开始遍历 p != null; trail = p, p = p.next) { if (o.equals(p.item)) { // 判断是否找到对象 unlink(p, trail); // 修改结点的链接信息,同时调用notFull的signal方法 return true; } } return false; } finally { fullyUnlock(); // 2个锁解锁 } }
private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; //指向队首的结点后移 E x = first.item; first.item = null; return x; }
compareTo()
指定元素的排序规则,或者在初始化它时在构造函数中传递 Comparator 排序规则。构造函数
private static final int DEFAULT_INITIAL_CAPACITY = 11; private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; private transient Object[] queue; private transient int size; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
offer 方法
tryGrow()
扩容。private void tryGrow(Object[] array, int oldCap) { lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; //扩容数组 } finally { allocationSpinLock = 0; } } if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); //拷贝原有数据 } }
保证优先级
siftUpComparable()
。
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { // 循环比较 // 寻找k的父元素下标,固定规则 int parent = (k - 1) >>> 1; Object e = array[parent]; // 自下而上一般出现在插入元素时调用,插入元素是插入到队列的最后,则需要将该元素调整到合适的位置 // 即从队列的最后往上调整堆,直到不小于其父结点为止,相当于冒泡 if (key.compareTo((T) e) >= 0) //比较 break; // 如果当前结点<其父结点,则将其与父结点进行交换,并继续往上访问父结点 array[k] = e; k = parent; } array[k] = key; }
poll 方法
public E poll() { // size==0队列为0,直接返回null if (size == 0) return null; int s = --size; modCount++; // 出队总是将数组的第一个元素进行出队, E result = (E) queue[0]; E x = (E) queue[s]; queue[s] = null; if (s != 0) // 同时将队列的最后一个元素放到第一个位置,然后自上而下调整堆 siftDown(0, x); return result; }
private void siftDownUsingComparator(int k, E x) { // 由于堆是一个二叉树,所以size/2是树中的最后一个非叶子结点 // 如果k是叶子结点,那么其无子结点,则不需要再往下调整堆 int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = queue[child]; // 右结点 int right = child + 1; // 找出两个子结点以及父结点中较小的一个 if (right < size && comparator.compare((E) c, (E) queue[right]) > 0) c = queue[child = right]; // 如果父结点最小,则无需继续往下调整堆 if (comparator.compare(x, (E) c) <= 0) break; // 否则将父结点与两个子结点中较小的一个交换,然后往下继续调整 queue[k] = c; k = child; } queue[k] = x; }
compareTo()
方法,保证集合中元素的顺序和 getDelay()
一致。public interface Delayed extends Comparable<Delayed> { //返回当前对象的剩余执行时间 long getDelay(TimeUnit unit); }
private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private Thread leader; /** * Condition signalled when a newer element becomes available * at the head of the queue or a new thread may need to * become leader. */ private final Condition available = lock.newCondition();
属性 | 说明 |
---|---|
ReentrantLock lock | 重入锁。 |
PriorityQueue q | 无界的、优先级队列。 |
Thread leader | Leader-Follower 模型中的 leader |
Condition available | 队首有新元素可用或者有新线程成为 leader 时触发的 condition。 |
PriorityQueue
public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1); size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e); return true; }
Leader-Follower 模型
await()
方法让当前线程等待信号。awaitNanos()
方法让当前线程等待接收信号或等待 delay 时间。实现 Delayed 接口
getDelay()
方法,返回当前元素还需要延时多久执行。compareTo()
方法,指定不同元素如何比较谁先执行。延时阻塞队列的实现
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //先获取队首元素,不删除 if (first == null) //如果为空就阻塞等待 available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0L) //比较元素延时时间是否到达 return q.poll(); //如果是就移除并返回 first = null; // don‘t retain ref while waiting if (leader != null) //如果有 leader 线程,依然阻塞等待 available.await(); else { //如果没有 leader 线程,指定当前线程,然后等待任务的待执行时间 Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { //最后等待时间到了后,就通知阻塞的线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } //PriorityQueue.peek() public E peek() { return (size == 0) ? null : (E) queue[0]; }
private transient volatile Transferer<E> transferer; public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
put 方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
transferer.transfer()
,如果返回 null 就调用 Thread.interrupted()
将中断标志位复位(设为 false),然后抛出异常。/** * Puts or takes an item. */ @SuppressWarnings("unchecked") E transfer(E e, boolean timed, long nanos) { SNode s = null; int mode = (e == null) ? REQUEST : DATA; //判断是添加还是获取 for (;;) { SNode h = head; //获取栈顶结点 if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can‘t wait if (h != null && h.isCancelled()) //如果头结点无法获取,就去获取下一个 casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { //设置头结点 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s‘s fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s‘s match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h‘s match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } }
结论
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable {...}
TransferQueue
public interface TransferQueue<E> extends BlockingQueue<E> { //尽可能快地转移元素给一个等待的消费者 //如果在这之前有其他线程调用了 taked() 或者 poll(long,TimeUnit) 方法,就返回 true //否则返回 false boolean tryTransfer(E e); //转移元素给一个消费者,在有的情况下会等待直到被取走 void transfer(E e) throws InterruptedException; //在 timeout 时间内将元素转移给一个消费者,如果这段时间内传递出去了就返回 true //否则返回 false boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; //如果至少有一个等待的消费者,就返回 true boolean hasWaitingConsumer(); //返回等待获取元素的消费者个数 //这个值用于监控 int getWaitingConsumerCount(); }
tryTransfer()
和 transfer()
。transfer 方法
transfer()
方法的作用是如果有等待接收元素的消费者线程,直接把生产者传入的元素 transfer 给消费者。transfer()
会将元素存放到队列尾部,并等待元素被消费者取走才返回。Node pred = tryAppend(s, haveData); return awaitMatch(s, pred, e, (how == TIMED), nanos);
awaitMatch()
方法的作用是,CPU 自旋等待消费者取走元素,为了避免长时间消耗 CPU,在自旋一定次数后会调用 Thread.yield() 暂停当前正在执行的线程,改为执行其他线程。tryTransfer 方法
tryTransfer()
的作用是试探生产者传入的元素是否能直接传递给消费者。
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); }
transfer()
必须等到消费者取出元素才返回不同的是,tryTransfer()
无论是否有消费者接收都会立即返回。tryTransfer(E,long,TimeUnit)
方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回 false,如果在超时时间内消费了元素,则返回 true。关键属性
static final class Node<E> { E item; Node<E> prev; Node<E> next; Node(E x) { item = x; } } transient Node<E> first; transient Node<E> last; private transient int count; private final int capacity; final ReentrantLock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition();
remove(Object)
等移除操作,LinkedBlockingDeque 的大多数操作的时间复杂度都是 O(n)。ArrayBlockingQueue
LinkedBlockingQueue
Executors.newFixedThreadPool()
使用了这个队列。PriorityBlockingQueue
DelayQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingDeque
https://blog.csdn.net/u011240877/article/details/73612930#1arrayblockingqueue
https://blog.csdn.net/fuyuwei2015/article/details/72716753
https://blog.csdn.net/tonywu1992/article/details/83419448
原文:https://www.cnblogs.com/youngao/p/12574029.html