/** * 同时开始并行 执行任务,如果有一个异常则退出 * @author Administrator * */ public class MyThreadStartTogether { public static void main(String[] args) throws Exception { testThread(50); } public static boolean testThread(int num) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(num); final AtomicBoolean flag = new AtomicBoolean(false); final CountDownLatch down = new CountDownLatch(num); final Semaphore se = new Semaphore(0); for(int i =0; i < num; i++) { executorService.execute(new Runnable() { @Override public void run() { try { se.acquire(); if(!flag.get()) { TimeUnit.MICROSECONDS.sleep(100); if(Thread.currentThread().getId() == 40) { throw new RuntimeException("测试异常"); } System.out.println("Thread=="+Thread.currentThread().getId()+":www.ebnew.com"); } }catch (Exception e) { flag.set(true); e.printStackTrace(); } finally { down.countDown(); } } }); } se.release(num); System.out.println("==================开始================="); down.await(); executorService.shutdown(); return flag.get(); } } public class TestInvokeAny { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // invokeAny1(); // invokeAny2(); // invokeAny3(); invokeAnyTimeout(); } /** * 还没有到超时之前,所以的任务都已经异常完成,抛出ExecutionException<br> * 如果超时前满,还没有没有完成的任务,抛TimeoutException */ public static void invokeAnyTimeout() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); String result = executorService.invokeAny(tasks, 2, TimeUnit.SECONDS); System.out.println("result=" + result); executorService.shutdown(); } /** * 有异常的任务,有正常的任务,invokeAny()不会抛异常,返回最先正常完成的任务 */ public static void invokeAny3() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new SleepSecondsCallable("t1", 2)); String result = executorService.invokeAny(tasks); System.out.println("result=" + result); executorService.shutdown(); } /** * 没有1个正常完成的任务,invokeAny()方法抛出ExecutionException,封装了任务中元素的异常 * */ public static void invokeAny2() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); tasks.add(new ExceptionCallable()); String result = executorService.invokeAny(tasks); System.out.println("result=" + result); executorService.shutdown(); } /** * 提交的任务集合,一旦有1个任务正常完成(没有抛出异常),会终止其他未完成的任务 */ public static void invokeAny1() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(3); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new SleepSecondsCallable("t1", 2)); tasks.add(new SleepSecondsCallable("t2", 1)); String result = executorService.invokeAny(tasks); System.out.println("result=" + result); executorService.shutdown(); } } public class TestInvokeAll { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // testInvokeAll(); // testInvokeAllTimeout(); testInvokeAllWhenInterrupt(); } /** * 如果线程在等待invokeAll()执行完成的时候,调用线程被中断,会抛出InterruptedException<br> * 此时线程池会终止没有完成的任务,这主要是为了减少资源的浪费. */ public static void testInvokeAllWhenInterrupt() throws Exception { final ExecutorService executorService = Executors.newFixedThreadPool(5); // 调用invokeAll的线程 Thread invokeAllThread = new Thread() { @Override public void run() { List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new SleepSecondsCallable("t1", 2)); tasks.add(new SleepSecondsCallable("t2", 2)); tasks.add(new RandomTenCharsTask()); // 调用线程会阻塞,直到tasks全部执行完成(正常完成/异常退出) try { List<Future<String>> results = executorService .invokeAll(tasks); System.out.println("wait for the result." + results.size()); } catch (InterruptedException e) { System.out .println("I was wait,but my thread was interrupted."); e.printStackTrace(); } } }; invokeAllThread.start(); Thread.sleep(200); invokeAllThread.interrupt(); executorService.shutdown(); } /** * 可以通过Future.isCanceled()判断任务是被取消,还是完成(正常/异常)<br> * Future.isDone()总是返回true,对于invokeAll()的调用者来说,没有啥用 */ public static void testInvokeAllTimeout() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(5); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new SleepSecondsCallable("t1", 2)); tasks.add(new SleepSecondsCallable("t2", 2)); tasks.add(new SleepSecondsCallable("t3", 1)); // tasks.add(new RandomTenCharsTask()); List<Future<String>> results = executorService.invokeAll(tasks, 1, TimeUnit.SECONDS); System.out.println("wait for the result." + results.size()); for (Future<String> f : results) { System.out.println("isCanceled=" + f.isCancelled() + ",isDone=" + f.isDone()); } executorService.shutdown(); } /** * 程序的执行结果和一些结论,已经直接写在代码注释里面了。invokeAll是一个阻塞方法,会等待任务列表中的所有任务都执行完成。不管任务是正常完成, * 还是异常终止,Future.isDone()始终返回true。通过Future.isCanceled()可以判断任务是否在执行的过程中被取消。 * 通过Future.get()可以获取任务的返回结果,或者是任务在执行中抛出的异常。 * * @throws Exception */ public static void testInvokeAll() throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(5); List<Callable<String>> tasks = new ArrayList<Callable<String>>(); tasks.add(new SleepSecondsCallable("t1", 2)); tasks.add(new SleepSecondsCallable("t2", 2)); tasks.add(new RandomTenCharsTask()); tasks.add(new ExceptionCallable()); // 调用该方法的线程会阻塞,直到tasks全部执行完成(正常完成/异常退出) List<Future<String>> results = executorService.invokeAll(tasks); // 任务列表中所有任务执行完毕,才能执行该语句 System.out.println("wait for the result." + results.size()); executorService.shutdown(); for (Future<String> f : results) { // isCanceled=false,isDone=true System.out.println("isCanceled=" + f.isCancelled() + ",isDone=" + f.isDone()); // ExceptionCallable任务会报ExecutionException System.out.println("task result=" + f.get()); } } } public class SleepSecondsCallable implements Callable<String> { private String name; private int seconds; public SleepSecondsCallable(String name, int seconds) { this.name = name; this.seconds = seconds; } public String call() throws Exception { System.out.println(name + ",begin to execute"); try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { System.out.println(name + " was disturbed during sleeping."); e.printStackTrace(); return name + "_SleepSecondsCallable_failed"; } System.out.println(name + ",success to execute"); return name + "_SleepSecondsCallable_succes"; } } public class RandomTenCharsTask implements Callable<String> { @Override public String call() throws Exception { System.out.println("RandomTenCharsTask begin to execute..."); StringBuffer content = new StringBuffer(); String base = "ssssssssssssssssssssssss"; Random random = new Random(); for (int i = 0; i < 10; i++) { int number = random.nextInt(base.length()); content.append(base.charAt(number)); } System.out.println("RandomTenCharsTask complete.result=" + content); return content.toString(); } } public class ExceptionCallable implements Callable<String> { private String name = null; public ExceptionCallable() { } public ExceptionCallable(String name) { this.name = name; } @Override public String call() throws Exception { System.out.println("开始执行..."); System.out.println(name.length()); System.out.println("结束执行."); return name; } }
原文:http://my.oschina.net/zuiwoxing/blog/526043