首页 > 其他 > 详细

Java 7之多线程并发容器 - Queue

时间:2014-02-15 16:34:40      阅读:366      评论:0      收藏:0      [点我收藏+]

Java中有些多线程编程模式在很大程序上都依赖于Queue实现的线程安全性,所以非常有必要认识,首先来看一下接口定义,如下:

public interface Queue<E> extends Collection<E> {
	// 向队列中添加元素
	boolean add(E e);
	boolean offer(E e);

	// 删除队列元素
	E remove();
	E poll();

	// 检查队列元素
	E element();
	E peek();
}
BlockingQueue类继承了如上的接口,定义如下:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit)  throws InterruptedException;
    
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit)  throws InterruptedException;
    
    int remainingCapacity();
    boolean remove(Object o);
    
    public boolean contains(Object o);
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}
这个接口中本身定义的方法,加上从Queue接口中继承的方法后,可以将BlockingQueue方法大概分为4种形式,如下:

bubuko.com,布布扣

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列。
下面来模拟一个阻塞队列的简单实现,如下:

public class BlockingQueue {
	private List queue = new LinkedList();
	private int limit = 10;

	public BlockingQueue(int limit) {
		this.limit = limit;
	}

	public synchronized void enqueue(Object item) throws InterruptedException {
		while (this.queue.size() == this.limit) {
			wait();
		}
		if (this.queue.size() == 0) {
			notifyAll(); // 通知所有的线程来取出,如果是加入线程则继续等待
		}
		this.queue.add(item);
	}

	public synchronized Object dequeue() throws InterruptedException {
		while (this.queue.size() == 0) {
			wait();
		}
		if (this.queue.size() == this.limit) {
			notifyAll(); // 通知所有的线程来加入,如果是取出线程则继续等待
		}
		return this.queue.remove(0);
	}
}
必须注意到,在enqueue和dequeue方法内部,只有队列的大小等于上限(limit)或者下限(0)时,才调用notifyAll方法。如果队列的大小既不等于上限,也不等于下限,任何线程调用enqueue或者dequeue方法时,都不会阻塞,都能够正常的往队列中添加或者移除元素。

Java提供了BlockingQueue接口的两个基本实现:LinkedBlockingQueue和ArrayBlockingQueue。他们都是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。他们的用法之间稍有区别,如已知队列的大小而能确定合适的边界时,用ArrayBlockingQueue非常高效。

下面来看ArrayBlockingQueue类的最主要的一个构造函数,如下:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。
notEmpty表示锁的非空条件。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒之前通过notEmpty.await()进入等待状态的线程。同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。

1、添加元素


    public boolean add(E e) {
        return super.add(e);
    }
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                insert(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    public boolean offer(E e, long timeout, TimeUnit unit)  throws InterruptedException {

        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            insert(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
(1)add(E e)方法会调用AbstractQueue类中的方法,代码如下:

   public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
还是调用offer()方法添加元素,成功返回true,失败则抛出异常,表示队列已经满了。

(2)offer(E e)方法如果队列满了,则返回false,否则调用insert()方法进行元素的插入,这个方法的源代码如下:

 private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;                 // 元素数量加1
        notEmpty.signal();       // 唤醒取元素的线程
 }

理解如上代码,首先要认识两个变量的含义:takeIndex和putIndex。takeIndex表示下一个被取出元素的索引,putIndex表示下一个被添加元素的索引。它们的定义如下:

int takeIndex; // 下一个被取出元素的索引
int putIndex;  // 下一个被添加元素的索引

其中inc()的源代码如下:

final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}
这个方法的作用就是判断i加1后队列是否已满。

(3) put(E e)方法当加入元素时,如果队列已经满了,则阻塞等待;直到检测到不满时调用insert()方法进行插入。

(4)offer(E e, long timeout, TimeUnit unit)  如果在指定的时间内还无法插入队列,则返回false,表示插入失败。否则让插入队列等待一定的时间。如果插入成功,则返回true。


2、获取元素




































Java 7之多线程并发容器 - Queue

原文:http://blog.csdn.net/mazhimazh/article/details/19239033

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!