CompletableFuture.supplyAsync()
创建一个带返回值的任务
public static void main(String[] args) {
//推荐使用自定义线程池
ExecutorService executorService = Executors.newFixedThreadPool(20);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 10;
int j = 10;
return i*j;
},executorService);
//join()是用来获取CompletableFuture异步之后的返回值
System.out.println(completableFuture.join());
}
CompletableFuture.runAsync()
创建一个不带返回值的任务,可以传入自定义的线程池执行任务:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
int i = 10;
int j = 10;
System.out.println(i*j);
},executorService);
//因为返回类型为Void,所以此处打印值为null
System.out.println(completableFuture.join());
}
thenApply和thenApplyAsync
thenApply和thenApplyAsync指的是第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
public static ExecutorService executorService = Executors.newFixedThreadPool(20);
public static void main(String[] args) {
thenApplyTest();
thenApplyAsyncTest();
}
public static void thenApplyAsyncTest(){
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘");
ThreadUtil.sleepMillis(100);
return 10*10;
},executorService).thenApplyAsync(multiply -> {
ThreadUtil.printTimeAndThread("相加");
ThreadUtil.sleepMillis(200);
return multiply+1;
});
//结果为最后返回的结果
System.out.println(completableFuture.join());
}
public static void thenApplyTest(){
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘");
ThreadUtil.sleepMillis(100);
return 10*10;
},executorService).thenApply(multiply -> {
ThreadUtil.printTimeAndThread("相加");
ThreadUtil.sleepMillis(200);
return multiply+1;
});
//结果为最后返回的结果
System.out.println(completableFuture.join());
}
返回结果:
时间戳:1674546892550 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相乘
时间戳:1674546892661 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相加
101
时间戳:1674546892864 | 线程Id:21 | 线程名称:pool-1-thread-2 | 相乘
时间戳:1674546892973 | 线程Id:22 | 线程名称:ForkJoinPool.commonPool-worker-25 | 相加
101
thenApply 将方法内的操作当成上一个任务的继续,可以看成是同一个任务,所以在同一个线程执行
thenApplyAsync 将函数式方法中的操作放到一个新的线程中执行,如果未指定线程池则会使用ForkJoin线程池
thenCompose:连接两个有依赖关系的异步任务
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘");
int i = 10;
int j = 10;
return i*j;
},executorService).thenCompose(multiply -> CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相除");
return multiply / 10;
}));
//结果由第二个任务返回
System.out.println(completableFuture.join());
}
运行结果:
时间戳:1674484479563 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相乘
时间戳:1674484479564 | 线程Id:21 | 线程名称:ForkJoinPool.commonPool-worker-25 | 相除
10
可见使用thenCompose连接的两个异步任务,可以保证两个任务的先后顺序,并且thenCompose方法还可以将前面任务的结果交给下一个异步任务。
两个异步线程的名称不同是因为第一个异步任务使用的是executorService,而第二个任务未指定线程池,他会指定默认的提交任务的方法,所以建议自定义线程池,以方便排查错误
// 查看cpu的核数是否大于1核
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
// 如果大于1核 则调用execute方法, 每次创建一个线程
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
whenComplete:
whenComplete方法表示,某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法的参数是上个任务的结果。
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘");
int i = 1/0;
return 10*10;
//第一个参数为上个任务的返回结果,第二个参数为异常信息
},executorService).whenComplete( (multiply,exception) -> {
System.out.println(exception);
ThreadUtil.printTimeAndThread("相乘的结果");
});
//结果为最后返回的结果
System.out.println(completableFuture.join());
}
运行结果:
时间戳:1674552052468 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相乘
multiply:null
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
时间戳:1674552052468 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相乘的结果
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero......
thenCombine:将上一个任务和当前任务一起执行,等待两个任务执行完成后,得到的两个结果,再加工为一个结果
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘1");
int i = 10;
int j = 10;
return i*j;
},executorService).thenCombine(CompletableFuture.supplyAsync(() -> {
ThreadUtil.printTimeAndThread("相乘2");
int i = 20;
int j = 40;
return i*j;
}),(multiply1,multiply2) -> {
ThreadUtil.printTimeAndThread("相加");
return multiply1 + multiply2;
});
//结果为最后返回的结果
System.out.println(completableFuture.join());
}
打印结果:
时间戳:1674486175518 | 线程Id:20 | 线程名称:pool-1-thread-1 | 相乘1
时间戳:1674486175518 | 线程Id:21 | 线程名称:ForkJoinPool.commonPool-worker-25 | 相乘2
时间戳:1674486175518 | 线程Id:1 | 线程名称:main | 相加
900