首页 > 编程语言 > 详细

线程、线程池、CompletableFuture异步编排

时间:2021-02-04 17:15:20      阅读:22      评论:0      收藏:0      [点我收藏+]

java的线程是通过java.lang.Thread类来实现的。

在Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。
  第一是创建状态。在生成线程对象,并没有调用该对象的start方法,这是线程处于创建状态。
  第二是就绪状态。当调用了线程对象的start方法之后,该线程就进入了就绪状态,但是此时线程调度程序还没有把该线程设置为当前线程,此时处于就绪状态。在线程运行之后,从等待或者睡眠中回来之后,也会处于就绪状态。
  第三是运行状态。线程调度程序将处于就绪状态的线程设置为当前线程,此时线程就进入了运行状态,开始运行run函数当中的代码。
  第四是阻塞状态。线程正在运行的时候,被暂停,通常是为了等待某个时间的发生(比如说某项资源就绪)之后再继续运行。sleep,suspend,wait等方法都可以导致线程阻塞。
  第五是死亡状态。如果一个线程的run方法执行结束或者调用stop方法后,该线程就会死亡。对于已经死亡的线程,无法再使用start方法令其进入就绪。

线程的初始化

1)继承Thread

       技术分享图片

  run()与start()方法的区别

    a.start()方法来启动线程,真正实现了多线程运行。这时无需等待run方法体代码执行完毕,可以直接继续执行下面的代码;通过调用Thread类的start()方法来启动一个线程, 这时此线程是处于就绪状态, 并没有运行。 然后通过此Thread类调用方法run()来完成其运行操作的, 这里方法run()称为线程                 体,它包含了要执行的这个线程的内容, Run方法运行结束, 此线程终止。然后CPU再调度其它线程。

            b.run()方法当作普通方法的方式调用。程序还是要顺序执行,要等待run方法体执行完毕后,才可继续执行下面的代码; 程序中只有主线程——这一个线程, 其程序执行路径还是只有一条, 这样就没有达到写线程的目的。

2)实现Runnable接口

  技术分享图片

3)实现Callable接口 + FutureTask

  技术分享图片

  可以拿到返回结果,可以处理异常

4)线程池

  方式一:启动线程池(Executors可根据相关需求创建线程方法)

//
ExecutorService service = Executors.newCachedThreadPool(10);

  方式二:启动线程池

int corePoolSize = 10;//核心线程数;线程池启动后默认创建的线程数量【除非设置了allowCoreThreadTimeOut(回收核心线程)】,等待调用去执行异步任务
int maximumPoolSize = 200;//最大线程数量;控制资源
long keepAliveTime = 10;//存活时间;在当前的线程数 > 核心线程数时,释放空闲线程的时间
TimeUnit unit = TimeUnit.SECONDS;//时间单位
BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>();//阻塞队列,如果任务有很多就会放在这里,有空闲线程就会取出该队列里的任务(队列方式有多种,看相关接口介绍,设置队列的数量根据系统测试后能承受的最大数量,否则会导致内存不够)
ThreadFactory threadFactory = Executors.defaultThreadFactory();//线程创建工厂(可以根据自己相关需求,重写这个创建工厂)
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();//如果队列满了,按照指定的拒绝策略执行任务(根据相关需求,制定一些相关拒绝策略)
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);

       工作流程:

             a.线程池创建,启动好核心的线程数量,准备接受任务

             b.新的任务来了,用先用核心空闲线程去执行,如果核心线程都在运行状态 ,就将任务放置到阻塞队列中,核心线程空闲后去阻塞队列获取任务执行

             c.阻塞线程满了,就创建新的线程执行(创建新的线程最大只能开到maximumPoolSize指定的数量)

     d.如最大线程都满了,且 就执行handler(拒绝策略)

      e.线程把所有任务都执行完,有空闲线程,就会在指定存活的时间,释放空闲线程

       submit()与execute()区别

  submit()是可以取得返回值 

以上四种方法的区别 

        a.Thread与Runnable不能得到返回值,使用Callable+FutrueTask可以取得

        b.Thread、Runnable、Callable+FutrueTask不能控制资源,线程池可以

CompletableFuture

1.创建异步对象

CompletableFuture提供了四个静态方法来创建一个异步操作

public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}

a.以run开头的方法都是没有返回结果的,supply都是可以获取到返回结果的

b.有Executor参数的方法是可以传入自定义的线程池,没有的就使用默认的线程池

1) runAsync方法

技术分享图片

 

 

 2)supplyAsync 方法

技术分享图片

 

 

 2.计算完成时回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);

public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
1)whenComplete方法
技术分享图片

    返回异常处理结果 

    技术分享图片

 

 

    使用handle方法执行完成后的处理

   技术分享图片

   2)whenCompleteAsync 

whenComplete 可以处理正常和异常的计算结果,exceptionally处理异常情况
whenComplete 和 whenCompleteAsync 的区别
whenComplete :是执行当前任务的线程继续执行whenComplete的任务
whenCompleteAsync :是执行的当前任务继续提交给线程池来执行(可能是同一个线程(是相同的线程池)执行,也可能是其他线程执行)

 

 

 

3.线程串行化方法(上个任务执行完,才能处理下一步任务的)

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);public CompletableFuture<Void> thenAccept(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletableFuture<Void> thenRun(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action);public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

  1).加了async的方法,是执行完上一个任务,会再开另一个线程来处理下一个任务,默认是异步执行的,否则是跟上一个任务用同一个线程

  2).thenAccept方法,消费处理结果,接收上一个任务的结果进行处理

  3).thenRun方法,无法获取上一步的执行结果

  4).thenApply方法,当一个线程依赖另一个线程时,获取上一个任务的结果,并返回当前任务的返回值 

4.两任务组合(都需求完成后)

public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值 

  2)thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值

  3)runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务

5.两个任务组合有一个完成

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

  1)applyToEither:两个任务有一个执行完成,获取它的返回值 ,并处理任务并有新的返回值 

  2)acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值

  3)runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,没有返回值

6.多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

  1)allOf:等待所有任务完成 

  2)anyOf:只要有一个任务完成 

线程、线程池、CompletableFuture异步编排

原文:https://www.cnblogs.com/realman9527/p/14372734.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!