需要实现一个多线程并发的业务场景,启动若干子线程,最后要所有子线程运行结束才结束。(类似 .NET 里的 Task WaitAll )
Java 中的 ExecutorService 多线程编程模型提供这样一个机制,通过代码来介绍一下。
方法一:ExecutorService#awaitTermination
/** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return <tt>true</tt> if this executor terminated and * <tt>false</tt> if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
sample:
ExecutorService es = Executors.newCachedThreadPool(); List<Future<String>> futureList = Lists.newArrayList(); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading1 is running"); Thread.sleep(1000); return "1"; } })); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading2 is running"); Thread.sleep(3000); return "2"; } })); futureList.add(es.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading3 is running"); Thread.sleep(2000); return "3"; } })); es.shutdown(); try { boolean completed = es.awaitTermination(2000, TimeUnit.MILLISECONDS); System.out.println("all tasks completed: " + completed); //es.wait(3000); for(Future future : futureList) { try { if (future.isDone()) { System.out.println("result: " + future.get()); } } catch (CancellationException ex) { ex.printStackTrace(); } finally { future.cancel(true); } } } catch (Exception e) { e.printStackTrace(); }
输出结果:
Threading 1 is running Threading 2 is running Threading 3 is running all tasks completed: false result: 1 result: 3
只有1和3输出了结果,因为 awaitTermination 超时设置了 2000 ms,Threading2 模拟了 3000 ms 因此被超时取消了。通过 Future#isDone() 可以判断对应线程是否处理完毕,
这个场景里如果 isDone() == false 那么就可以认为是被超时干掉了。
需要注意的是,在添加完所有的子线程并启动后调用了 ExecutorService#shutdown 方法:一方面是不再接受新的子线程"提交",另一方面ExecutorService 其实自己也是一个work线程,如果不shutdown 其实当前线程并不会结束。调用shutdown 和不调用shutdown 从main运行后控制台状态就可以看出差异。
最后在 finally 里调用了 Future#cancel() 主要是当await之后,被超时处理的线程可能还在运行,直接取消掉。
方法二:ExecutorService#invokeAll
invokeAll 很像上述实现的整体包装,但细节略有不同。首先将 Callable 统一创建好放在List里交给 invokeAll 方法执行并设置超时时间。
ExecutorService es = Executors.newCachedThreadPool(); List<Callable<String>> tasks = Lists.newArrayList(); tasks.add(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading 1 is running"); Thread.sleep(1000); return "1"; } }); tasks.add(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading 2 is running"); Thread.sleep(3000); return "2"; } }); tasks.add(new Callable<String>() { @Override public String call() throws Exception { System.out.println("Threading 3 is running"); Thread.sleep(2000); return "3"; } }); try { List<Future<String>> futureList = es.invokeAll(tasks, 2000, TimeUnit.MILLISECONDS); es.shutdown(); for(Future future : futureList) { try { if (future.isDone()) { System.out.println("result: " + future.get()); } } catch (CancellationException ex) { ex.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); }输出结果:
Threading 1 is running Threading 2 is running Threading 3 is running result: 1 result: 3 java.util.concurrent.CancellationException
ExecutorService#invokeAll() 执行后所有 future.isDone() 都是 true,在 future.get() 拿结果的时候,被超时的Future会抛出 CancellationException 。因为在 invokeAll 内部调用了Future#cancel() 方法。
源码如下:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); // Interleave time checks and calls to execute in case // executor doesn‘t have any/much parallelism. Iterator<Future<T>> it = futures.iterator(); while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) return futures; } for (Future<T> f : futures) { if (!f.isDone()) { if (nanos <= 0) return futures; try { f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); } }
当然在 ExecutorService 编程模型外,自己定义Threading,通过CountDownLatch 控制也是可以实现的。
原文:http://blog.csdn.net/fangxing80/article/details/42324377