Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:
public interface Queue<E> extends Collection<E> {
// 向队列中添加元素
boolean add(E e);
boolean offer(E e);
// 删除队列元素
E remove();
E poll();
// 检查队列元素
E element();
E peek();
}
BlockingQueue类继承了如上的接口,定义如下:
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式,如下:
阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
下面来模拟一个阻塞队列的简单实现,如下:
public class BlockingQueue {
private List queue = new LinkedList();
private int limit = 10;
public BlockingQueue(int limit) {
this.limit = limit;
}
public synchronized void enqueue(Object item) throws InterruptedException {
while (this.queue.size() == this.limit) {
wait();
}
if (this.queue.size() == 0) {
notifyAll(); // 通知所有的线程来取出,如果是加入线程则继续等待
}
this.queue.add(item);
}
public synchronized Object dequeue() throws InterruptedException {
while (this.queue.size() == 0) {
wait();
}
if (this.queue.size() == this.limit) {
notifyAll(); // 通知所有的线程来加入,如果是取出线程则继续等待
}
return this.queue.remove(0);
}
}必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。
Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。
下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。1、添加元素
public boolean add(E e) {
return super.add(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
insert(e);
return true;
} finally {
lock.unlock();
}
}
(1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
还是调用offer()方法添加元素,成功返回true,失败则抛出异常,表示队列已经满了。
(2)offer(E e)方法如果队列满了,则返回false,否则调用insert()方法进行元素的插入,这个方法的源代码如下:
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count; // 元素数量加1
notEmpty.signal(); // 唤醒取元素的线程
}
理解如上代码,首先要认识两个变量的含义:takeIndex和putIndex。takeIndex表示下一个被取出元素的索引,putIndex表示下一个被添加元素的索引。它们的定义如下:
int takeIndex; // 下一个被取出元素的索引 int putIndex; // 下一个被添加元素的索引其中inc()的源代码如下:
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}这个方法的作用就是判断i加1后队列是否已满。(3) put(E e)方法当加入元素时,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入。
(4)offer(E e, long timeout, TimeUnit unit) 如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。
2、获取元素
原文:http://blog.csdn.net/mazhimazh/article/details/19239033