? ? ?最近在研究blockqueue的源码,从今天开始,和大家分享一下我看源码的一些心得体会
? ? ?(1)LinkedBlockingQueue源码解析
? ? ?(2)ArrayBlockingQueue源码解析
? ??
? ? ?LinkedBlockingQueue实现了BlockingQueue接口以及Serializable接口,是有序的FIFO队列,构造函数中,可传入一个最大容量值,如果没有传入,则默认是Integer.MAX_VALUE
? ? 一 首先看一下重要的几个类变量:
?
?
/** 保存当前队列中元素的个数 */ private final AtomicInteger count = new AtomicInteger(0); /** * 头元素 * Invariant: head.item == null */ private transient Node<E> head; /** * 尾元素 * Invariant: last.next == null */ private transient Node<E> last; /** 消费者锁,Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** 使消费者线程等待,直到被唤醒或者打断 */ private final Condition notEmpty = takeLock.newCondition(); /** 生产者锁,Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** 使生产者线程等待,直到被唤醒或者打断 */ private final Condition notFull = putLock.newCondition();
?
二 put方法
?
?
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
? ?
执行过程如下:
? ?1 如果传入元素为空,抛出空指针异常
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }?
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }?
?
五 pool方法
?
?
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
?
执行过程如下:
?
public E peek() { if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) return null; else return first.item; } finally { takeLock.unlock(); } }?
?(4)take在队列为空时,会始终阻塞
BlockQueue之LinkedBlockingQueue源码解析
原文:http://wang7839186.iteye.com/blog/2294376