Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:
public interface Queue<E> extends Collection<E> { // 向队列中添加元素 boolean add(E e); boolean offer(E e); // 删除队列元素 E remove(); E poll(); // 检查队列元素 E element(); E peek(); }BlockingQueue类继承了如上的接口,定义如下:
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); int drainTo(Collection<? super E> c); int drainTo(Collection<? super E> c, int maxElements); }这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式,如下:
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
下面来模拟一个阻塞队列的简单实现,如下:
public class BlockingQueue { private List queue = new LinkedList(); private int limit = 10; public BlockingQueue(int limit) { this.limit = limit; } public synchronized void enqueue(Object item) throws InterruptedException { while (this.queue.size() == this.limit) { wait(); } if (this.queue.size() == 0) { notifyAll(); // 通知所有的线程来取出,如果是加入线程则继续等待 } this.queue.add(item); } public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() == 0) { wait(); } if (this.queue.size() == this.limit) { notifyAll(); // 通知所有的线程来加入,如果是取出线程则继续等待 } return this.queue.remove(0); } }必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。
下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
1、添加元素
public boolean add(E e) { return super.add(e); } public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } insert(e); return true; } finally { lock.unlock(); } }(1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }还是调用offer()方法添加元素,成功返回true,失败则抛出异常,表示队列已经满了。
(2)offer(E e)方法如果队列满了,则返回false,否则调用insert()方法进行元素的插入,这个方法的源代码如下:
private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; // 元素数量加1 notEmpty.signal(); // 唤醒取元素的线程 }
理解如上代码,首先要认识两个变量的含义:takeIndex和putIndex。takeIndex表示下一个被取出元素的索引,putIndex表示下一个被添加元素的索引。它们的定义如下:
int takeIndex; // 下一个被取出元素的索引 int putIndex; // 下一个被添加元素的索引其中inc()的源代码如下:
final int inc(int i) { return (++i == items.length) ? 0 : i; }这个方法的作用就是判断i加1后队列是否已满。
(3) put(E e)方法当加入元素时,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入。
(4)offer(E e, long timeout, TimeUnit unit) 如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。
2、获取元素
原文:http://blog.csdn.net/mazhimazh/article/details/19239033