* <p>The <tt>Callable</tt> interface is similar to {@link * java.lang.Runnable}, in that both are designed for classes whose * instances are potentially executed by another thread. A * <tt>Runnable</tt>, however, does not return a result and cannot * throw a checked exception.从注释中可以看出Callable与Runnable的区别在于:
public class CallableTest { public static void main(String[] args) { //创建实现了Callable接口的对象 MyCallable callable = new MyCallable(); //将实现Callable接口的对象作为参数创建一个FutureTask对象 FutureTask<String> task = new FutureTask<>(callable); //创建线程处理当前callable任务 Thread thread = new Thread(task); //开启线程 System.out.println("开始执行任务的时间: "+getNowTime()); thread.start(); //获取到call方法的返回值 try { String result = task.get(); System.out.println("得到返回值: "+result); System.out.println("结束执行get的时间: "+getNowTime()); } catch (Exception e) { e.printStackTrace(); } } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(3000); return "call method result"; } }查看输出:
开始执行任务的时间: 2016-08-31 12:21:43 得到返回值: call method result 结束执行get的时间: 2016-08-31 12:21:46可以看到确实得到了call方法的返回值,但是在调用get方法的时候却造成了主线程的阻塞,因为我们在call方法里面让子线程暂停了3秒,这时候如果不阻塞主线程的话,输出语句中的第三行时间不应该是12:21:46的,应该是12:21:43,因此验证了上面我们给出的结论,但是使用Runnable接口是不会造成主线程阻塞的,具体实例马上给出;
public class CallableTest { public static void main(String[] args) { //创建实现了Callable接口的对象 MyCallable callable = new MyCallable(); //创建用于处理任务的线程池 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); //将任务添加到线程池中并且获得返回的FutureTask对象 System.out.println("提交任务的时间: "+getNowTime()); FutureTask<String> task = (FutureTask<String>) threadPool.submit(callable); //获取到call方法的返回值 try { String result = task.get(); System.out.println("得到返回值: "+result); System.out.println("结束执行get的时间: "+getNowTime()); } catch (Exception e) { e.printStackTrace(); } } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(3000); return "call method result"; } }查看输出:
提交任务的时间: 2016-08-31 14:44:09 得到返回值: call method result 结束执行get的时间: 2016-08-31 14:44:12可以看到,使用线程池方式和我们自己定义线程实现效果是一样的,这就是Callable使用的两种方式啦;如果你对线程池的实现原理不是很清楚的话,可以查看我的另一篇博客:我眼中的java线程池实现原理;
public class RunnableTest { public static void main(String[] args) { MyRunnable runnable = new MyRunnable(); Thread thread = new Thread(runnable); System.out.println("开始执行任务时间: "+getNowTime()); thread.start(); System.out.println("启动任务之后时间: "+getNowTime()); } public static String getNowTime() { SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return format.format(new Date()); } } class MyRunnable implements Runnable { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }查看输出:
开始执行任务时间: 2016-08-31 14:36:01 启动任务之后时间: 2016-08-31 14:36:01可以看到,两次时间是一致的,说明执行的runnable任务并没有影响主线程任务的执行;
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }set方法做的工作比较简单,就是将结果赋给了outcome而已,outcome是Object类型的全局变量,同时将state状态设置为NORMAL,接着会执行finishCompletion,这个方法其实就是用来唤醒我们上面想要获得数据的get方法的,我们先来看看get方法里面的实现:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }可以看到首先是先去查看当前state状态是否小于等于COMPLETING,你查看FutureTask源码的话,会发现有这么几种状态:
private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;state状态小于等于COMPLETING就表示我们刚刚的Callable是还没有处理完成的,那么就会调用awaitDone方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }awaitDone的业务逻辑还是挺复杂的,我大致来分析下,第7行判断如果当前线程被中断的话,则抛出异常,第12行获取当前任务状态state,第13行判断如果任务状态大于COMPLETING的话,则直接返回state状态,当然从上面源码中的状态列表中可以发现大于COMPLETING的状态有5种,可能是有正常返回值的,也可能是抛出异常的,具体怎么处理等会在回答get方法的时候是会介绍的,第18行如果任务是正在执行的话,则让出一段CPU时间继续运行,接着第20和22行的判断其实就是判断等待结点和等待队列为空的话创建一个出来而已,接着如果我们在调用awaitDone的时候,设置的timed参数是true的话,则会执行26行处的if语句块,会设置线程等待我们设置的时间,等待时间到了会唤醒此线程,我们平常使用的get方法默认timed值是false的,因此会执行到第34行,将当前线程锁起来,你如果仔细点的话会发现第6到34行是个死循环,也就是当34行处的锁定在其他地方(其实就是set方法里面了)解开的话,仍然会继续执行第6行,那么因为此时state状态已经发生了改变,此时执行已经和之前执行流程不同啦,一般来讲的话,会执行到第13行进行判断,最后执行第16行返回状态码就可以了;
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }可以看到第9行执行了LockSupport.unpark,相当于解锁了get方法中的LockSupport.park操作,这样的话awaitDone方法就会返回了,回答get方法里面:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }在通过awaitDone获得返回的state状态值之后,就会调用report方法,查看report方法:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }其实很简单了,就是根据state状态来进行相应的操作了,如果state只等于NORMAL的话,会直接返回值了,也就是我们的get方法返回值了,这就是我们自己通过FutureTask获得Callable返回值的源码过程以及为什么get方法会阻塞;
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }可以看到使用线程池的方式实际上内部是通过为我们创建一个RunnableFuture对象并且返回这个对象的,这里的RunnableFuture对象实际上是FutureTask类型的,因为我们查看newTaskFor的实现可以发现:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }实际上就是将Callable对象封装成了FutureTask对象为我们返回而已了;