CompletableFuture Start
上篇已经介绍了 Future,因为 Future 有如下局限性:
- 将多个异步计算的结果合并成一个
 - 等待 
Future集合中的所有任务都完成 Future完成事件(即,任务完成以后触发执行动作)CompletableFuture是Java8提供的新特性,提供了函数式编程的能力(面向对象编程->抽象数据,函数式编程->抽象行为),可以通过回调的方式处理计算结果。
CompletableFuture 特点
部分源码
/**
 * .....
 * @author Doug Lea
 * @since 1.8
 */
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    ......
}
CompletableFuture 实现了 Future 和 CompletionStage
CompletionStage
CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段- 一个阶段的计算执行可以是一个 
Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println()) - 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
 
CompletableFuture
- 在 
Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合CompletableFuture的方法。 - 它可能代表一个明确完成的 
Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。 
CompletableFuture 基本用法
源码提供一下创建 CompletableFuture 的方法
    /**
     * Creates a new incomplete CompletableFuture.
     */
    public CompletableFuture() {
    }
    /**
     * Creates a new complete CompletableFuture with given encoded result.
     */
    private CompletableFuture(Object r) {
        this.result = r;
    }
    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} with
     * the value obtained by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor with the value obtained
     * by calling the given Supplier.
     *
     * @param supplier a function returning the value to be used
     * to complete the returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @param <U> the function's return type
     * @return the new CompletableFuture
     */
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
    }
    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the {@link ForkJoinPool#commonPool()} after
     * it runs the given action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    /**
     * Returns a new CompletableFuture that is asynchronously completed
     * by a task running in the given executor after it runs the given
     * action.
     *
     * @param runnable the action to run before completing the
     * returned CompletableFuture
     * @param executor the executor to use for asynchronous execution
     * @return the new CompletableFuture
     */
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }
    /**
     * Returns a new CompletableFuture that is already completed with
     * the given value.
     *
     * @param value the value
     * @param <U> the type of the value
     * @return the completed CompletableFuture
     */
    public static <U> CompletableFuture<U> completedFuture(U value) {
        return new CompletableFuture<U>((value == null) ? NIL : value);
    }
thenApply
当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。thenApplyAsync 默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。thenApply 相当于回调函数(callback).源码如下:
    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }
- 例1 一个简单的链式执行例子
 
    @Test
    public void testCompletable(){
        CompletableFuture.supplyAsync(()->1)
                .thenApply(i -> i+1)
                .thenApply(i -> 1 * i)
                .whenComplete((r,e)-> System.out.println(r+","+e));
    }
- 例2 两个耗时异步计算同时执行
 
/**
* 具体的计算类
**/
public class SquareCalculator {
    private ExecutorService executor
            = Executors.newFixedThreadPool(4);
    public Future<Integer> calculate(Integer input) {
        return executor.submit(() -> {
            Thread.sleep(2000);
            return input * input;
        });
    }
    public CompletableFuture<Integer> completableFutureCalculate(Integer input, Long sleep){
        CompletableFuture<Integer> future = new CompletableFuture<>();
        executor.execute(() -> {
            try{
                Thread.sleep(sleep);
                Integer result =  input * input;
                future.complete(result);
            } catch (InterruptedException e){
                System.out.println(e);
            }
        });
        return future;
    }
}
/**
** CompletableFuture 处理两个异步计算
**/
    @Test
    public void testFuture() throws Exception{
        SquareCalculator sqa = new SquareCalculator();
        System.out.println("Calculating ... ");
        long start = System.currentTimeMillis();
        CompletableFuture<Integer> reslt = sqa.completableFutureCalculate(11, 2000L);
        CompletableFuture<Integer> reslt1 = sqa.completableFutureCalculate(22, 4000L);
        CompletableFuture.allOf(reslt,reslt1).join();
        System.out.println(reslt.get());
        System.out.println(reslt1.get());
        System.out.println(System.currentTimeMillis() - start);
    }
    /**
     一个计算结果
     Calculating ... 
     121
     484
     4058
    **/
thenAccept 与 thenRun
- 
可以看到,
thenAccept和thenRun都是无返回值的。如果说thenApply是不停的输入输出的进行生产,那么thenAccept和thenRun就是在进行消耗。它们是整个计算的最后两个阶段。 - 
同样是执行指定的动作,同样是消耗,二者也有区别:
thenAccept接收上一阶段的输出作为本阶段的输入thenRun根本不关心前一阶段的输出,根本不不关心前一阶段的计算结果,因为它不需要输入参数
 - 
例3
thenAccept的一个简单的例子 
    @Test
    public void testCompletable3() {
        CompletableFuture.supplyAsync(() -> 8)
                .thenApply(i -> i + 1)
                .thenAccept(i -> System.out.println(i * i));
    }
- 例4
thenRun的一个简单的例子 
    @Test
    public void testCompletable4() {
        CompletableFuture.supplyAsync(()-> 8)
                .thenRun(() -> System.out.println("Hello World"));
    }
thenCombine 整合两个计算结果
- 源码
 
    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }
- 例5 整合两个计算结果
 
    @Test
    public void testCompletable5() {
        CompletableFuture.supplyAsync(() -> "Hello")
                .thenApply(s -> s + " Word ")
                .thenApply(String::toUpperCase)
                .thenCombine(CompletableFuture.completedFuture("Java"),
                        (s1,s2)->s1+s2)
                .thenAccept(System.out::println);
    }
whenComplete
- 等待多个异步任务完成后执行相应的处理,这里要着重理解 
whenComplete返回的是一个新的Future,如果需要回调需要按顺序执行那么同样需要用CompletableFuture.allOf重新组合新他们。 - 要等待 
Group里面的Future执行完成,需要调用join()或get(),两者的区别下面会说明 
    @Test
    public void testCompletable9() {
        Runnable dummyTask = () -> {
            try {
                Thread.sleep(200);
            } catch (InterruptedException ignored) {
            }
        };
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(dummyTask);
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(dummyTask);
        f1 = f1.whenComplete((aVoid, throwable) -> System.out.println("Completed f1"));
        f2 = f2.whenComplete((aVoid, throwable) -> System.out.println("Completed f2"));
        CompletableFuture[] all = {f1, f2};
        CompletableFuture<Void> allOf = CompletableFuture.allOf(all);
        allOf.whenComplete((aVoid, throwable) -> {
            System.out.println("Completed allOf");
        });
        allOf.join();
        System.out.println("Joined");
    }
join 与 get 区别联系
两者的作用相同,都是等待 Future 完成然后返回结果。不同点是 join 不会被 interrupt,但是 get 可能会被中断,使用时需要捕捉异常,接口定义如下:
V get() throws InterruptedException, ExecutionException;
两个实际场景
多个异步请求全部完成
多个异步请求全部完成获取所有结果
/** controller 代码**/
    @RequestMapping(method = RequestMethod.GET, value = "/test")
    public long test() {
        long sleep = randomSleep();
        return sleep;
    }
    private long randomSleep(){
        long time = (int)(1+Math.random()*(1000-1+1));
        try{
            System.out.println("Sleep " + time + "ms");
            Thread.sleep(time);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        return time;
    }
/** okhttp3 client获取异步消息代码**/ 
public class AsynchronousMessage {
    private static ExecutorService executor = Executors.newFixedThreadPool(4);
    private static OkHttpClient okHttpClient = (new OkHttpClient.Builder()).connectionPool(new ConnectionPool(5, 1, TimeUnit.MINUTES)).
            retryOnConnectionFailure(true).connectTimeout(5000, TimeUnit.MILLISECONDS).readTimeout(5000, TimeUnit.MILLISECONDS).build();
    public static CompletableFuture<Long> getAsynchronousMessage() {
        CompletableFuture<Long> future = new CompletableFuture<>();
        Request req = new Request.Builder().url("http://10.201.0.39:8080/datanode/test/test").get().build();
        executor.execute(() -> {
            Response response = null;
            try {
                response = okHttpClient.newCall(req).execute();
                if (response.isSuccessful()) {
                    String res = response.body().string();
                    if (res != null) {
                        future.complete(Long.valueOf(res));
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (null != response) {
                    response.close();
                }
            }
        });
        return future;
    }
}
/** CompletableFuture 调用处理所有结果**/   
    @Test
    public void testCompletable12() {
        long start = System.currentTimeMillis();
        StringBuilder result = new StringBuilder();
        CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture futures =  CompletableFuture.allOf(fut1,fut2,fut3);
        System.out.println(fut1.join() + "," + fut2.join() + "," + fut3.join());
        System.out.println("Process Time:" + (System.currentTimeMillis() - start));
    }        
多个异步请求其中之一完成
多个异步请求全部完成获取完成的结果
/**
    共通代码参考上面的例子
**/
    @Test
    public void testCompletable13() {
        long start = System.currentTimeMillis();
        CompletableFuture fut1 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut2 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture fut3 = AsynchronousMessage.getAsynchronousMessage();
        CompletableFuture futures =  CompletableFuture.anyOf(fut1,fut2,fut3);
        System.out.println(futures.join());//anyOf 能取到完成任务的结果,而 allOf 不行
        System.out.println("Process Time:" + (System.currentTimeMillis() - start));
    }
参考资料
- Future 各种用法
 
https://juejin.im/post/5abc9e59f265da239f077460
            本文由 zealzhangz 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
            2019/02/19 23:20