import java.util.HashSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class Main { private static int Init = 0; private static HashSet<Integer> hashSet = new HashSet(); private static volatile boolean Finish = true; public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(100); ExecutorService exec = Executors.newCachedThreadPool(); int CONSUMER_COUNT = 20; int PRODUCER_COUNT = 2; int PRODUCT_THREAD = 100; int SUM_PRODUCT = PRODUCT_THREAD*PRODUCER_COUNT; for (int i = 0; i < CONSUMER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println("Produce Thread Run!"); for (; !Thread.interrupted(); ) { try { if (hashSet.size() == SUM_PRODUCT) { exec.shutdownNow(); //Finish = false; } Integer val = (Integer) abq.take(); hashSet.add(val); System.out.println(val); } catch (InterruptedException e) { //take()发出的中断信号被catch后,标志为将被清楚,要想被for捕捉到,必须重新设置中断! if (e instanceof InterruptedException){ Thread.currentThread().interrupt(); } } } } }); } for (int i = 0; i < PRODUCER_COUNT; i++) { exec.submit(new Runnable() { @Override public void run() { System.out.println("Produce Thread Run!"); for (int i = 0; i < PRODUCT_THREAD; i++) { try { System.out.println("putting.."); abq.put(Integer.valueOf(Init++)); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } //exec.shutdown(); //阻塞,等待所有任务执行完毕! for (;!exec.awaitTermination(10, TimeUnit.NANOSECONDS);); System.out.println("hashSet.size():" + hashSet.size()); } }
注意:这里报了一个不应该出现的异常!
如果先submits生产者,并且在消费者线程过多的情况下(比如200个),则会报java.util.concurrent.RejectedExecutionException异常!
原文:https://www.cnblogs.com/iuyy/p/13574364.html