/** * 序列化版本id */ private static final long serialVersionUID = -817911632652898426L; /** * 保存队列元素的数组 */ final Object[] items; /** * 指向下一个元素的位置(进行take、poll、peek和remove),可以理解为队首指针 */ int takeIndex; /** * 指向下一个元素的位置(进行put、offer、add),可以理解为队尾指针 */ int putIndex; /** * 记录队列中中元素的位置 */ int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** * 访问控制的锁 */ final ReentrantLock lock; /** * 用于take阻塞的condition */ private final Condition notEmpty; /** * 用于put阻塞的condition */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;
/** * 初始化ArrayBlockingQueue,设置队列大小,使用非公平策略(默认) */ public ArrayBlockingQueue(int capacity) { this(capacity, false); } /** * 初始化ArrayBlockingQueue,设置队列大小,并设置是否使用公平策略 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) { throw new IllegalArgumentException(); } // 创建n个元素的数组(赋值给队列数组) this.items = new Object[capacity]; // 创建可重复锁(使用设置的公平策略) lock = new ReentrantLock(fair); // 创建lock关联的notEmpty和notFull condition notEmpty = lock.newCondition(); notFull = lock.newCondition(); } /** * 初始化ArrayBlockingQueue,设置队列大小,并设置是否使用公平策略 * 然后将传入的元素加入到队列中(按照传入元素的顺序),元素不能为null */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { // 初始化队列 this(capacity, fair); // 获取锁,并加锁 final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { // 遍历集合,将集合的元素顺序加入到队列数组中 for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } // 修改队列元素的数量 count = i; // 修改putIndex,也就是下一个元素放入的下标,如果队列满了(i为capacity),那么下一个位置为0(队首) putIndex = (i == capacity) ? 0 : i; } finally { // 释放锁 lock.unlock(); } }
/** * 将元素插入队列的尾部(入队) * 入队成功,返回true;入队失败,返回false */ public boolean offer(E e) { // 判断元素是否为null,如果为null,则抛出NPE checkNotNull(e); // 获取锁,并加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 判断队列是否已满 if (count == items.length) { // 队列已满,返回false return false; } else { // 队列未满,调用enqueue来入队,并返回true(入队成功) enqueue(e); return true; } } finally { // 释放锁 lock.unlock(); } }
/** * 向队列中添加元素(放入队尾),返回添加元素是否成功 * 如果队列已满,那么将会抛出IllegalStateException异常 * 如果添加的元素为null,则会抛出NullPointerException异常 */ public boolean add(E e) { return super.add(e);//调用AbstractQueue的add方法 } // 这是AbstractQueue的add方法,ArrayBlockingQueue重写了offer方法 public boolean add(E e) { // 调用的ArrayBlockingQueue的offer方法 if (offer(e)) { return true; } else { throw new IllegalStateException("Queue full"); } }
/** * 元素入队,如果队列已满,则会阻塞直到元素入队完成 */ public void put(E e) throws InterruptedException { // 判断元素是否为null,如果为null,则抛出NPE checkNotNull(e); // 获取锁,并加锁(可中断) final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 自旋锁,如果队列已满,则线程进行阻塞 while (count == items.length) { // 阻塞,等待notFull的signal notFull.await(); } // 队列未满,进行入队操作 enqueue(e); } finally { // 释放锁 lock.unlock(); } }
/** * 元素入队,并设置超时时长 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 判断元素是否为null,如果为null,则抛出NPE checkNotNull(e); // 时间转换为纳秒 long nanos = unit.toNanos(timeout); // 加锁(可中断) final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列已满,并且已经超时,则返回false(入队失败) while (count == items.length) { if (nanos <= 0) { return false; } // 阻塞,返回剩余的超时时长 nanos = notFull.awaitNanos(nanos); } // 队列未满,进行入队操作,并返回true(入队成功) enqueue(e); return true; } finally { // 释放锁 lock.unlock(); } }
/** * 将元素插入putIndex所指的位置 */ private void enqueue(E x) { // 获取队列数组 final Object[] items = this.items; // 将元素放入数组元素中(队尾) items[putIndex] = x; // 如果队列已满,那么重置putIndex(因为是循环数组实现队列,所以下一个放入的位置为队首) if (++putIndex == items.length) { putIndex = 0; } // 元素数量加1 count++; // 唤醒notEmpty(通知队列不为空,有元素) notEmpty.signal(); }
/** * 获取元素(不出队),如果队列为空,则返回null */ public E peek() { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 获取队列指定位置的元素(takeIndex,也就是下一个出队的元素位置) return itemAt(takeIndex); } finally { lock.unlock(); } } /** * 获取队列数组中的i位置上的元素 */ @SuppressWarnings("unchecked") final E itemAt(int i) { return (E) items[i]; }
/** * 非阻塞式出队 * * @return 出队的元素(如果队列为空,则返回null) */ public E poll() { // 加锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 如果队列为空,则返回null,否则进行出队操作 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
/** * 出队,设置超时时间 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 加锁 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列为空,并且未超时,则进行阻塞 while (count == 0) { // 如果已经超时,则返回null if (nanos <= 0) { return null; } // 线程阻塞,等待notEmpty的signal nanos = notEmpty.awaitNanos(nanos); } // 队列不为空,则进行出队 return dequeue(); } finally { lock.unlock(); } }
/** * 出队,阻塞式 */ public E take() throws InterruptedException { // 加锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 如果队列为空,则进行阻塞,等待notEmpty.signal while (count == 0) { notEmpty.await(); } // 进行出队 return dequeue(); } finally { // 释放锁 lock.unlock(); } }
/** * 出队操作 */ private E dequeue() { // 获取队列数组 final Object[] items = this.items; // 获取队列数组中队首的元素 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 将队首元素设置为null(释放) items[takeIndex] = null; // 如果takeIndex达到数组长度,那么就重设队首指针 if (++takeIndex == items.length) { takeIndex = 0; } // 队列元素减1 count--; // 如果itrs不为null,则调用元素出队接口(elementDequeue) if (itrs != null) { itrs.elementDequeued(); } // notFull唤醒(表示可以继续入队元素) notFull.signal(); // 返回出队的元素 return x; }
/** * 获取队列元素数量 * 注意,获取size也加锁了,这是因为避免有其他线程正在出队或者入队,导致获取的size不正确 * 加了锁,能够保证获取size时,队列没有被修改 */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }