java 8 - CompletableFuture 类
CompletableFuture实现了CompletionStage接口和Future接口,在JDK1.8中引入,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
# 常用的创建方式
new
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
静态方法创建
// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static <U> CompletableFuture<U> completedFuture(U value)
2
3
4
5
6
7
# 结果的获取
new的获取方式
CompletableFuture<Object> completableFuture = new CompletableFuture();
new Thread(() -> {
try {
//doSomething,调用complete方法将其他方法的执行结果记录在completableFuture对象中
completableFuture.complete(null);
} catch (Exception e) {
//异常处理
completableFuture.completeExceptionally(e);
}
}).start();
2
3
4
5
6
7
8
9
10
静态方法创建的获取方式
// 阻塞获取,抛出异常为编译期异常
public T get()
// 阻塞获取,超时机制,若在指定时间内未获取结果将抛出超时异常
public T get(long timeout, TimeUnit unit)
// 不阻塞,执行完毕则返回结果或执行抛出的异常,若未执行完毕则返回 valueIfAbsent
public T getNow(T valueIfAbsent)
// 阻塞获取,抛出异常为运行时异常
public T join()
2
3
4
5
6
7
8
# 链式编程API
这些API返回的都是阶段完成的CompletableFuture对象,所以能够达到链式编程的效果
| 方法 | 描述 |
|---|---|
| thenApply (opens new window) | 可以处理和改变这个阶段执行的结果 |
| thenApplyAsync (opens new window) | 可以异步地处理和改变这个阶段执行的结果 |
| thenRun (opens new window) | 当此阶段正常完成时,执行给定的操作。 |
| thenRunAsync (opens new window) | 当此阶段正常完成时,异步地执行给定的操作。 |
| thenAccept (opens new window) | 当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。 |
| thenAcceptAsync (opens new window) | 当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果作为提供的操作的参数。 |
| thenAcceptBoth (opens new window) | 当这个和另一个给定的阶段都正常完成时,两个结果作为提供的操作的参数被执行。 |
| thenAcceptBothAsync (opens new window) | 当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供的操作的参数。 |
| thenCompose (opens new window) | 当这个阶段正常完成时,这个阶段将作为提供函数的参数执行。 |
| thenComposeAsync (opens new window) | 当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段作为提供的函数的参数。 |
| thenCombine (opens new window) | 当这个和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。 |
| thenCombineAsync (opens new window) | 当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供函数的参数。 |
| applyToEither (opens new window) | 当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。 |
| applyToEitherAsync (opens new window) | 当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供函数的参数。 |
| acceptEither (opens new window) | 当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的操作的参数。 |
| acceptEitherAsync (opens new window) | 当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供的操作的参数。 |
| runAfterBoth (opens new window) | 当这个和另一个给定的阶段都正常完成时,执行给定的动作。 |
| runAfterBothAsync (opens new window) | 当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。 |
| runAfterEither (opens new window) | 当这个或另一个给定阶段正常完成时,执行给定的操作。 |
| runAfterEitherAsync (opens new window) | 当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。 |
| whenComplete (opens new window) | 当此阶段完成时,使用结果(或 null如果没有))和此阶段的异常(或 null如果没有))执行给定的操作。 |
| whenCompleteAsync (opens new window) | 当此阶段完成时,执行给定操作将使用此阶段的默认异步执行工具执行给定操作,结果(或 null如果没有))和异常(或 null如果没有)这个阶段作为参数。 |
| handle (opens new window) | 当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。 |
| handleAsync (opens new window) | 当该阶段完成正常或异常时,将使用此阶段的默认异步执行工具执行,此阶段的结果和异常作为提供函数的参数。 |
| exceptionally (opens new window) | 当CompletableFuture完成时完成,结果是异常触发此CompletableFuture的完成特殊功能的给定功能; 否则,如果此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。 |
| toCompletableFuture (opens new window) | 返回此CompletableFuture。 |
API详解
# thenApply (opens new window)
可以使用thenApply() 处理和改变上一段Future的结果。
public static void main(String[] args) {
Integer result = CompletableFuture.supplyAsync(() -> 1)
.thenApply(i -> i + 1)
.join();
System.out.println(result); // 2
}
2
3
4
5
6
可以一直接下去
public static void main(String[] args) {
Integer result = CompletableFuture.supplyAsync(() -> 1)
.thenApply(i -> i + 1)
.thenApply(i -> i + 1)
.thenApply(i -> i + 1)
.join();
System.out.println(result); // 4
}
2
3
4
5
6
7
8
如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。
# thenRun (opens new window)
可以使用thenRun()在上一段Future完成后执行一段操作,thenRun无法使用上一段Future的结果。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> 1)
.thenRun(() -> System.out.println("执行完毕"));// 执行完毕
}
2
3
4
# thenAccept (opens new window)
可以使用thenAccept()在上一段Future完成后执行一段操作,可以使用上一段Future的结果。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> 1)
.thenAccept(x -> System.out.println(x)); // 1
}
2
3
4
# thenAcceptBoth (opens new window)
可以使用thenAcceptBoth()在上一段Future和传入的Future完成后执行一段操作,可以使用两个Future的结果。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> 1)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2),
(i1, i2) -> System.out.println(i1 + i2));// 3
}
2
3
4
5
# thenCompose (opens new window)
组合两个Future的时候使用,用法与thenApply()相似,但是传参是一个Future,返回传入Future的结果。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> 1)
.thenCompose(i -> CompletableFuture.supplyAsync(() -> i + 1))
.join();
System.out.println(join); // 2
}
2
3
4
5
6
# thenCombine (opens new window)
组合两个Future的时候使用,与thenCompose()不同,不需要关心前一个Future的结果,只有当两个Future都完成后才会执行特定的函数。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> 1)
.thenCombine(CompletableFuture.supplyAsync(() -> 2), (i1, i2) -> i1 + i2)
.join();
System.out.println(join); // 3
}
2
3
4
5
6
# applyToEither (opens new window)
可以使用applyToEither()在上一段Future或传入的Future完成后对结果进行处理或改变,哪个Future先返回就处理哪个结果。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
})
.applyToEither(CompletableFuture.supplyAsync(() -> 2), i -> i + 1)
.join();
System.out.println(join); // 3
}
2
3
4
5
6
7
8
9
10
11
12
13
# acceptEither (opens new window)
可以使用acceptEither()在上一段Future或传入的Future完成后执行一段操作,哪个Future先返回就用哪个的结果,两个Future的结果类型需一致。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
})
.acceptEither(CompletableFuture.supplyAsync(() -> 2),i -> System.out.println(i)); // 2
}
2
3
4
5
6
7
8
9
10
11
# runAfterBoth (opens new window)
可以使用runAfterBoth()在上一段Future和传入的Future都完成时执行一段操作,无法使用上一段或传入Future的结果。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> 1)
.runAfterBoth(CompletableFuture.supplyAsync(() -> 2), () -> System.out.println("执行完毕"));// 执行完毕
}
2
3
4
# runAfterEither (opens new window)
可以使用runAfterBoth()在上一段Future或传入的Future完成时执行一段操作,无法使用上一段或传入Future的结果。
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
})
.runAfterEither(CompletableFuture.runAsync(() -> System.out.println(2)),
() -> System.out.println("执行完毕"));// 2 执行完毕
}
2
3
4
5
6
7
8
9
10
11
12
# whenComplete (opens new window)
当上一段Future完成时,对结果或异常进行处理。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println(throwable.getCause());
} else {
System.out.println(result);
}
}).join();
System.out.println(join);
}
2
3
4
5
6
7
8
9
10
11
打印
java.lang.ArithmeticException: / by zero
// 异常栈...
2
# handle (opens new window)
当上一段Future完成时,对结果或异常进行处理并返回结果,如果Future执行出现了异常,获取结果时不会抛出,但结果会为null。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
.handle((result, throwable) -> {
if (throwable != null) {
System.out.println(throwable.getCause());// 异常cause
}
return result;
}).join();
System.out.println(join);// null
}
2
3
4
5
6
7
8
9
10
# exceptionally (opens new window)
如果你并不关注返回值或Future没有返回,可以使用exceptionally()对上一段Future的异常进行处理,但还是需要返回一个默认值。
public static void main(String[] args) {
Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
.exceptionally(throwable -> {
System.out.println(throwable.getCause());// 异常cause
return 0;
}).join();
System.out.println(join);// 0
}
2
3
4
5
6
7
8
无返回值
public static void main(String[] args) {
Void join = CompletableFuture.runAsync(() -> System.out.println("task1")) // task1
.exceptionally(throwable -> {
System.out.println(throwable.getCause());
return null;
}).join();
System.out.println(join);// null
}
2
3
4
5
6
7
8
# 工具类
import lombok.extern.slf4j.Slf4j;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
/**
* 异步工具类
* @author kinoko
* @date 2023/7/13 10:07
*/
@Slf4j
public class CompletableFutureUtil {
/**
* 循环执行异步方法
* @param collection 执行的集合
* @param function 对集合对象的操作
* @param <T> 集合对象类型
* @throws InterruptedException e
*/
public static <T> void runAsyncForEach(Collection<T> collection, Consumer<T> function) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(collection.size());
for (T obj : collection) {
CompletableFuture.runAsync(() -> {
try {
function.accept(obj);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
});
}
latch.await();
}
/**
* 循环执行异步方法
* @param collection 执行的集合
* @param function 对集合对象的操作
* @param executor 线程池
* @param <T> 集合对象类型
* @throws InterruptedException e
*/
public static <T> void runAsyncForEach(Collection<T> collection, Consumer<T> function, Executor executor) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(collection.size());
for (T obj : collection) {
CompletableFuture.runAsync(() -> {
try {
function.accept(obj);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
latch.countDown();
}
}, executor);
}
latch.await();
}
/**
* 异步执行任务
* @param task 任务
* @param latch 计数器
* @param executor 线程池
* @param errorPrefix 错误信息前缀
*/
public static void runAsync(Runnable task, CountDownLatch latch, String errorPrefix, Executor executor) {
// 执行传入的 Runnable 任务
CompletableFuture.runAsync(task, executor).exceptionally(ex -> {
if (ex != null) {
log.error(errorPrefix, ex);
}
// 无论是否发生异常,都递减 CountDownLatch
latch.countDown();
return null;
});
}
/**
* 异步执行任务
* @param task 任务
* @param latch 计数器
* @param errorPrefix 错误信息前缀
*/
public static void runAsync(Runnable task, CountDownLatch latch, String errorPrefix) {
// 执行传入的 Runnable 任务
CompletableFuture.runAsync(task).exceptionally(ex -> {
if (ex != null) {
log.error(errorPrefix, ex);
}
// 无论是否发生异常,都递减 CountDownLatch
latch.countDown();
return null;
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101