CompletableFuture.supplyAsync()

创建一个带返回值的任务

    public static void main(String[] args) {
    //推荐使用自定义线程池
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            int i = 10;
            int j = 10;
            return i*j;
        },executorService);
    //join()是用来获取CompletableFuture异步之后的返回值
        System.out.println(completableFuture.join());
    }

CompletableFuture.runAsync()

创建一个不带返回值的任务,可以传入自定义的线程池执行任务:

public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            int i = 10;
            int j = 10;
            System.out.println(i*j);
        },executorService);
    //因为返回类型为Void,所以此处打印值为null
        System.out.println(completableFuture.join());
    }

thenApply和thenApplyAsync

thenApply和thenApplyAsync指的是第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

    public static ExecutorService executorService = Executors.newFixedThreadPool(20);
    public static void main(String[] args) {
        thenApplyTest();
        thenApplyAsyncTest();
    }

    public static void thenApplyAsyncTest(){

        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘");
            ThreadUtil.sleepMillis(100);
            return 10*10;
        },executorService).thenApplyAsync(multiply -> {
            ThreadUtil.printTimeAndThread("相加");
            ThreadUtil.sleepMillis(200);
            return multiply+1;
        });
        //结果为最后返回的结果
        System.out.println(completableFuture.join());
    }

    public static void thenApplyTest(){
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘");
            ThreadUtil.sleepMillis(100);
            return 10*10;
        },executorService).thenApply(multiply -> {
            ThreadUtil.printTimeAndThread("相加");
            ThreadUtil.sleepMillis(200);
            return multiply+1;
        });
        //结果为最后返回的结果
        System.out.println(completableFuture.join());
    }

返回结果:

时间戳:1674546892550    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相乘
时间戳:1674546892661    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相加
101
时间戳:1674546892864    |    线程Id:21    |    线程名称:pool-1-thread-2    |    相乘
时间戳:1674546892973    |    线程Id:22    |    线程名称:ForkJoinPool.commonPool-worker-25    |    相加
101

thenApply 将方法内的操作当成上一个任务的继续,可以看成是同一个任务,所以在同一个线程执行

thenApplyAsync 将函数式方法中的操作放到一个新的线程中执行,如果未指定线程池则会使用ForkJoin线程池

thenCompose:连接两个有依赖关系的异步任务

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘");
            int i = 10;
            int j = 10;
            return i*j;
        },executorService).thenCompose(multiply -> CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相除");
            return multiply / 10;
        }));
        //结果由第二个任务返回
        System.out.println(completableFuture.join());
    }

运行结果:

时间戳:1674484479563    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相乘
时间戳:1674484479564    |    线程Id:21    |    线程名称:ForkJoinPool.commonPool-worker-25    |    相除
10

可见使用thenCompose连接的两个异步任务,可以保证两个任务的先后顺序,并且thenCompose方法还可以将前面任务的结果交给下一个异步任务。

两个异步线程的名称不同是因为第一个异步任务使用的是executorService,而第二个任务未指定线程池,他会指定默认的提交任务的方法,所以建议自定义线程池,以方便排查错误

    // 查看cpu的核数是否大于1核
    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
 
    // 如果大于1核 则调用execute方法, 每次创建一个线程
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
 
    static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
    }

whenComplete:

whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法的参数是上个任务的结果。

    public static void main(String[] args) {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘");
            int i = 1/0;
            return 10*10;
    //第一个参数为上个任务的返回结果,第二个参数为异常信息
        },executorService).whenComplete( (multiply,exception) -> {
            System.out.println(exception);
            ThreadUtil.printTimeAndThread("相乘的结果");
        });
        //结果为最后返回的结果
        System.out.println(completableFuture.join());
    }

运行结果:

时间戳:1674552052468    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相乘
multiply:null
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
时间戳:1674552052468    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相乘的结果
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero......

thenCombine:将上一个任务和当前任务一起执行,等待两个任务执行完成后,得到的两个结果,再加工为一个结果

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘1");
            int i = 10;
            int j = 10;
            return i*j;
        },executorService).thenCombine(CompletableFuture.supplyAsync(() -> {
            ThreadUtil.printTimeAndThread("相乘2");
            int i = 20;
            int j = 40;
            return i*j;
        }),(multiply1,multiply2) -> {
            ThreadUtil.printTimeAndThread("相加");
            return multiply1 + multiply2;
        });
        //结果为最后返回的结果
        System.out.println(completableFuture.join());
    }

打印结果:

时间戳:1674486175518    |    线程Id:20    |    线程名称:pool-1-thread-1    |    相乘1
时间戳:1674486175518    |    线程Id:21    |    线程名称:ForkJoinPool.commonPool-worker-25    |    相乘2
时间戳:1674486175518    |    线程Id:1    |    线程名称:main    |    相加
900
最后修改:2023 年 03 月 31 日
如果觉得我的文章对你有用,请随意赞赏