首页 > 编程语言 > 详细

线程池相关

时间:2015-11-05 00:57:56      阅读:270      评论:0      收藏:0      [点我收藏+]
/**
 * 同时开始并行 执行任务,如果有一个异常则退出
 * @author Administrator
 *
 */
public class MyThreadStartTogether {
 
	public static void main(String[] args) throws Exception {
		testThread(50);
	}
	public static boolean testThread(int num) throws Exception {
		  ExecutorService executorService = Executors.newFixedThreadPool(num);
		  final AtomicBoolean flag = new AtomicBoolean(false);
		  final CountDownLatch down = new CountDownLatch(num);
		  final Semaphore se = new Semaphore(0);
	 
		  for(int i =0; i < num; i++) {
			  executorService.execute(new Runnable() {
					
					@Override
					public void run() {
		               	try {
		               		se.acquire();
		               		if(!flag.get()) {
		               			TimeUnit.MICROSECONDS.sleep(100);
		               			if(Thread.currentThread().getId() == 40) {
		               				throw new RuntimeException("测试异常");
		               			}
		               			System.out.println("Thread=="+Thread.currentThread().getId()+":www.ebnew.com");
		               		}
		               	}catch (Exception e) {
		               		flag.set(true);
							e.printStackTrace();
							
						} finally {
							down.countDown();
						}
						
					}
				});
		  }
		  se.release(num);
		  System.out.println("==================开始=================");
		  down.await();
		  executorService.shutdown();
		  return flag.get();	
	}
}

public class TestInvokeAny {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// invokeAny1();
		// invokeAny2();
//		invokeAny3();
		invokeAnyTimeout();
	}

	/**
	 * 还没有到超时之前,所以的任务都已经异常完成,抛出ExecutionException<br>
	 * 如果超时前满,还没有没有完成的任务,抛TimeoutException
	 */
	public static void invokeAnyTimeout() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(3);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();

		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());

		String result = executorService.invokeAny(tasks, 2, TimeUnit.SECONDS);
		System.out.println("result=" + result);
		executorService.shutdown();
	}

	/**
	 * 有异常的任务,有正常的任务,invokeAny()不会抛异常,返回最先正常完成的任务
	 */
	public static void invokeAny3() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(3);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();

		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());

		tasks.add(new SleepSecondsCallable("t1", 2));

		String result = executorService.invokeAny(tasks);

		System.out.println("result=" + result);
		executorService.shutdown();
	}

	/**
	 * 没有1个正常完成的任务,invokeAny()方法抛出ExecutionException,封装了任务中元素的异常
	 * 
	 */
	public static void invokeAny2() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(3);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();

		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());
		tasks.add(new ExceptionCallable());

		String result = executorService.invokeAny(tasks);

		System.out.println("result=" + result);

		executorService.shutdown();
	}

	/**
	 * 提交的任务集合,一旦有1个任务正常完成(没有抛出异常),会终止其他未完成的任务
	 */
	public static void invokeAny1() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(3);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();

		tasks.add(new SleepSecondsCallable("t1", 2));
		tasks.add(new SleepSecondsCallable("t2", 1));

		String result = executorService.invokeAny(tasks);

		System.out.println("result=" + result);

		executorService.shutdown();
	}

}

public class TestInvokeAll {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// testInvokeAll();
//		testInvokeAllTimeout();
		testInvokeAllWhenInterrupt();
	}

	/**
	 * 如果线程在等待invokeAll()执行完成的时候,调用线程被中断,会抛出InterruptedException<br>
	 * 此时线程池会终止没有完成的任务,这主要是为了减少资源的浪费.
	 */
	public static void testInvokeAllWhenInterrupt() throws Exception {
		final ExecutorService executorService = Executors.newFixedThreadPool(5);

		// 调用invokeAll的线程
		Thread invokeAllThread = new Thread() {

			@Override
			public void run() {
				
				List<Callable<String>> tasks = new ArrayList<Callable<String>>();
				tasks.add(new SleepSecondsCallable("t1", 2));
				tasks.add(new SleepSecondsCallable("t2", 2));
				tasks.add(new RandomTenCharsTask());

				// 调用线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
				try {
					List<Future<String>> results = executorService
							.invokeAll(tasks);
					System.out.println("wait for the result." + results.size());
				} catch (InterruptedException e) {
					System.out
							.println("I was wait,but my thread was interrupted.");
					e.printStackTrace();
				}

				
			}
		};

		invokeAllThread.start();

		Thread.sleep(200);

		invokeAllThread.interrupt();

		executorService.shutdown();

	}

	/**
	 * 可以通过Future.isCanceled()判断任务是被取消,还是完成(正常/异常)<br>
	 * Future.isDone()总是返回true,对于invokeAll()的调用者来说,没有啥用
	 */
	public static void testInvokeAllTimeout() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(5);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();
		tasks.add(new SleepSecondsCallable("t1", 2));
		tasks.add(new SleepSecondsCallable("t2", 2));
		tasks.add(new SleepSecondsCallable("t3", 1));
		// tasks.add(new RandomTenCharsTask());

		List<Future<String>> results = executorService.invokeAll(tasks, 1,
				TimeUnit.SECONDS);

		System.out.println("wait for the result." + results.size());

		for (Future<String> f : results) {
			System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
					+ f.isDone());
		}

		executorService.shutdown();

	}

	/**
	 * 程序的执行结果和一些结论,已经直接写在代码注释里面了。invokeAll是一个阻塞方法,会等待任务列表中的所有任务都执行完成。不管任务是正常完成,
	 * 还是异常终止,Future.isDone()始终返回true。通过Future.isCanceled()可以判断任务是否在执行的过程中被取消。
	 * 通过Future.get()可以获取任务的返回结果,或者是任务在执行中抛出的异常。
	 * 
	 * @throws Exception
	 */
	public static void testInvokeAll() throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(5);

		List<Callable<String>> tasks = new ArrayList<Callable<String>>();
		tasks.add(new SleepSecondsCallable("t1", 2));
		tasks.add(new SleepSecondsCallable("t2", 2));
		tasks.add(new RandomTenCharsTask());
		tasks.add(new ExceptionCallable());

		// 调用该方法的线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
		List<Future<String>> results = executorService.invokeAll(tasks);

		// 任务列表中所有任务执行完毕,才能执行该语句
		System.out.println("wait for the result." + results.size());

		executorService.shutdown();

		for (Future<String> f : results) {
			// isCanceled=false,isDone=true
			System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
					+ f.isDone());

			// ExceptionCallable任务会报ExecutionException
			System.out.println("task result=" + f.get());
		}
	}
}
public class SleepSecondsCallable implements Callable<String> {
	private String name;
	private int seconds;
	public SleepSecondsCallable(String name, int seconds) {
		this.name = name;
		this.seconds = seconds;
	}
	public String call() throws Exception {
		System.out.println(name + ",begin to execute");

		try {
			TimeUnit.SECONDS.sleep(seconds);
		} catch (InterruptedException e) {
			System.out.println(name + " was disturbed during sleeping.");
			e.printStackTrace();
			return name + "_SleepSecondsCallable_failed";
		}
		System.out.println(name + ",success to execute");
		return name + "_SleepSecondsCallable_succes";
	}
}

public class RandomTenCharsTask implements Callable<String> {
	@Override
	public String call() throws Exception {
		System.out.println("RandomTenCharsTask begin to execute...");
		StringBuffer content = new StringBuffer();
		String base = "ssssssssssssssssssssssss";
		Random random = new Random();
		for (int i = 0; i < 10; i++) {
			int number = random.nextInt(base.length());
			content.append(base.charAt(number));
		}
		System.out.println("RandomTenCharsTask complete.result=" + content);
		return content.toString();
	}
}

public class ExceptionCallable implements Callable<String> {
	private String name = null;
	public ExceptionCallable() {
	}
	public ExceptionCallable(String name) {
		this.name = name;
	}
	@Override
	public String call() throws Exception {
		System.out.println("开始执行...");
		System.out.println(name.length());
		System.out.println("结束执行.");
		return name;
	}
}



线程池相关

原文:http://my.oschina.net/zuiwoxing/blog/526043

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!