Mushroom Notes Mushroom Notes
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档

kinoko

一位兴趣使然的热心码农
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档
  • JavaSE

  • JavaEE

  • JDK版本特性

    • 总览
    • Java8

      • java 8 - Lambda表达式
      • java 8 - CompletableFuture 类
        • 常用的创建方式
        • 结果的获取
        • 链式编程API
          • thenApply
          • thenRun
          • thenAccept
          • thenAcceptBoth
          • thenCompose
          • thenCombine
          • applyToEither
          • acceptEither
          • runAfterBoth
          • runAfterEither
          • whenComplete
          • handle
          • exceptionally
        • 工具类
    • Java 17

    • Java 21

  • JVM

  • Java
  • JDK版本特性
  • Java8
kinoko
2024-01-11
目录

java 8 - CompletableFuture 类

CompletableFuture实现了CompletionStage接口和Future接口,在JDK1.8中引入,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

# 常用的创建方式


new

CompletableFuture<Object> completableFuture = new CompletableFuture<>();
1

静态方法创建

// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static <U> CompletableFuture<U> completedFuture(U value)
1
2
3
4
5
6
7

# 结果的获取


new的获取方式

CompletableFuture<Object> completableFuture = new CompletableFuture();
new Thread(() -> {
   try {
       //doSomething,调用complete方法将其他方法的执行结果记录在completableFuture对象中
       completableFuture.complete(null);
   } catch (Exception e) {
       //异常处理
       completableFuture.completeExceptionally(e);
    }
}).start();
1
2
3
4
5
6
7
8
9
10

静态方法创建的获取方式

  // 阻塞获取,抛出异常为编译期异常
  public T get()
  // 阻塞获取,超时机制,若在指定时间内未获取结果将抛出超时异常
  public T get(long timeout, TimeUnit unit)
  // 不阻塞,执行完毕则返回结果或执行抛出的异常,若未执行完毕则返回 valueIfAbsent
  public T getNow(T valueIfAbsent)
  // 阻塞获取,抛出异常为运行时异常
  public T join()
1
2
3
4
5
6
7
8

# 链式编程API


这些API返回的都是阶段完成的CompletableFuture对象,所以能够达到链式编程的效果

方法 描述
thenApply (opens new window) 可以处理和改变这个阶段执行的结果
thenApplyAsync (opens new window) 可以异步地处理和改变这个阶段执行的结果
thenRun (opens new window) 当此阶段正常完成时,执行给定的操作。
thenRunAsync (opens new window) 当此阶段正常完成时,异步地执行给定的操作。
thenAccept (opens new window) 当此阶段正常完成时,将以该阶段的结果作为提供的操作的参数执行。
thenAcceptAsync (opens new window) 当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段的结果作为提供的操作的参数。
thenAcceptBoth (opens new window) 当这个和另一个给定的阶段都正常完成时,两个结果作为提供的操作的参数被执行。
thenAcceptBothAsync (opens new window) 当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供的操作的参数。
thenCompose (opens new window) 当这个阶段正常完成时,这个阶段将作为提供函数的参数执行。
thenComposeAsync (opens new window) 当此阶段正常完成时,将使用此阶段的默认异步执行工具执行,此阶段作为提供的函数的参数。
thenCombine (opens new window) 当这个和另一个给定的阶段都正常完成时,两个结果作为提供函数的参数执行。
thenCombineAsync (opens new window) 当这个和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中两个结果作为提供函数的参数。
applyToEither (opens new window) 当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的函数的参数。
applyToEitherAsync (opens new window) 当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供函数的参数。
acceptEither (opens new window) 当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的操作的参数。
acceptEitherAsync (opens new window) 当这个或另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行,其中相应的结果作为提供的操作的参数。
runAfterBoth (opens new window) 当这个和另一个给定的阶段都正常完成时,执行给定的动作。
runAfterBothAsync (opens new window) 当这个和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
runAfterEither (opens new window) 当这个或另一个给定阶段正常完成时,执行给定的操作。
runAfterEitherAsync (opens new window) 当这个或另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。
whenComplete (opens new window) 当此阶段完成时,使用结果(或 null如果没有))和此阶段的异常(或 null如果没有))执行给定的操作。
whenCompleteAsync (opens new window) 当此阶段完成时,执行给定操作将使用此阶段的默认异步执行工具执行给定操作,结果(或 null如果没有))和异常(或 null如果没有)这个阶段作为参数。
handle (opens new window) 当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。
handleAsync (opens new window) 当该阶段完成正常或异常时,将使用此阶段的默认异步执行工具执行,此阶段的结果和异常作为提供函数的参数。
exceptionally (opens new window) 当CompletableFuture完成时完成,结果是异常触发此CompletableFuture的完成特殊功能的给定功能; 否则,如果此CompletableFuture正常完成,则返回的CompletableFuture也会以相同的值正常完成。
toCompletableFuture (opens new window) 返回此CompletableFuture。

API详解


# thenApply (opens new window)

可以使用thenApply() 处理和改变上一段Future的结果。

public static void main(String[] args) {
    Integer result = CompletableFuture.supplyAsync(() -> 1)
            .thenApply(i -> i + 1)
            .join();
    System.out.println(result); // 2
}
1
2
3
4
5
6

可以一直接下去

public static void main(String[] args) {
    Integer result = CompletableFuture.supplyAsync(() -> 1)
            .thenApply(i -> i + 1)
            .thenApply(i -> i + 1)
            .thenApply(i -> i + 1)
            .join();
    System.out.println(result); // 4
}
1
2
3
4
5
6
7
8

如果你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片段,你可以使用thenAccept()和 thenRun()方法,这些方法经常在调用链的最末端的最后一个回调函数中使用。

# thenRun (opens new window)

可以使用thenRun()在上一段Future完成后执行一段操作,thenRun无法使用上一段Future的结果。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> 1)
            .thenRun(() -> System.out.println("执行完毕"));// 执行完毕
}
1
2
3
4

# thenAccept (opens new window)

可以使用thenAccept()在上一段Future完成后执行一段操作,可以使用上一段Future的结果。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> 1)
    		.thenAccept(x -> System.out.println(x)); // 1
}
1
2
3
4

# thenAcceptBoth (opens new window)

可以使用thenAcceptBoth()在上一段Future和传入的Future完成后执行一段操作,可以使用两个Future的结果。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> 1)
            .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2),
                    (i1, i2) -> System.out.println(i1 + i2));// 3
}
1
2
3
4
5

# thenCompose (opens new window)

组合两个Future的时候使用,用法与thenApply()相似,但是传参是一个Future,返回传入Future的结果。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> 1)
            .thenCompose(i -> CompletableFuture.supplyAsync(() -> i + 1))
            .join();
    System.out.println(join); // 2
}
1
2
3
4
5
6

# thenCombine (opens new window)

组合两个Future的时候使用,与thenCompose()不同,不需要关心前一个Future的结果,只有当两个Future都完成后才会执行特定的函数。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> 1)
            .thenCombine(CompletableFuture.supplyAsync(() -> 2), (i1, i2) -> i1 + i2)
            .join();
    System.out.println(join); // 3
}
1
2
3
4
5
6

# applyToEither (opens new window)

可以使用applyToEither()在上一段Future或传入的Future完成后对结果进行处理或改变,哪个Future先返回就处理哪个结果。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return 1;
            })
            .applyToEither(CompletableFuture.supplyAsync(() -> 2), i -> i + 1)
            .join();
    System.out.println(join); // 3
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# acceptEither (opens new window)

可以使用acceptEither()在上一段Future或传入的Future完成后执行一段操作,哪个Future先返回就用哪个的结果,两个Future的结果类型需一致。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return 1;
            })
            .acceptEither(CompletableFuture.supplyAsync(() -> 2),i -> System.out.println(i)); // 2
}
1
2
3
4
5
6
7
8
9
10
11

# runAfterBoth (opens new window)

可以使用runAfterBoth()在上一段Future和传入的Future都完成时执行一段操作,无法使用上一段或传入Future的结果。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> 1)
            .runAfterBoth(CompletableFuture.supplyAsync(() -> 2), () -> System.out.println("执行完毕"));// 执行完毕
}
1
2
3
4

# runAfterEither (opens new window)

可以使用runAfterBoth()在上一段Future或传入的Future完成时执行一段操作,无法使用上一段或传入Future的结果。

public static void main(String[] args) {
    CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return 1;
            })
            .runAfterEither(CompletableFuture.runAsync(() -> System.out.println(2)),
                    () -> System.out.println("执行完毕"));// 2 执行完毕
}
1
2
3
4
5
6
7
8
9
10
11
12

# whenComplete (opens new window)

当上一段Future完成时,对结果或异常进行处理。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    System.out.println(throwable.getCause());
                } else {
                    System.out.println(result);
                }
            }).join();
    System.out.println(join);
}
1
2
3
4
5
6
7
8
9
10
11

打印

java.lang.ArithmeticException: / by zero
// 异常栈...
1
2

# handle (opens new window)

当上一段Future完成时,对结果或异常进行处理并返回结果,如果Future执行出现了异常,获取结果时不会抛出,但结果会为null。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
            .handle((result, throwable) -> {
                if (throwable != null) {
                    System.out.println(throwable.getCause());// 异常cause
                }
                return result;
            }).join();
    System.out.println(join);// null
}
1
2
3
4
5
6
7
8
9
10

# exceptionally (opens new window)

如果你并不关注返回值或Future没有返回,可以使用exceptionally()对上一段Future的异常进行处理,但还是需要返回一个默认值。

public static void main(String[] args) {
    Integer join = CompletableFuture.supplyAsync(() -> 1 / 0)
                    .exceptionally(throwable -> {
                        System.out.println(throwable.getCause());// 异常cause
                        return 0;
                    }).join();
    System.out.println(join);// 0
}
1
2
3
4
5
6
7
8

无返回值

public static void main(String[] args) {
    Void join = CompletableFuture.runAsync(() -> System.out.println("task1")) // task1
                    .exceptionally(throwable -> {
                        System.out.println(throwable.getCause());
                        return null;
                    }).join();
    System.out.println(join);// null
}
1
2
3
4
5
6
7
8

# 工具类


import lombok.extern.slf4j.Slf4j;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.function.Consumer;

/**
 * 异步工具类
 * @author kinoko
 * @date 2023/7/13 10:07
 */
@Slf4j
public class CompletableFutureUtil {

    /**
     * 循环执行异步方法
     * @param collection 执行的集合
     * @param function   对集合对象的操作
     * @param <T>        集合对象类型
     * @throws InterruptedException e
     */
    public static <T> void runAsyncForEach(Collection<T> collection, Consumer<T> function) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(collection.size());
        for (T obj : collection) {
            CompletableFuture.runAsync(() -> {
                try {
                    function.accept(obj);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
    }

    /**
     * 循环执行异步方法
     * @param collection 执行的集合
     * @param function   对集合对象的操作
     * @param executor   线程池
     * @param <T>        集合对象类型
     * @throws InterruptedException e
     */
    public static <T> void runAsyncForEach(Collection<T> collection, Consumer<T> function, Executor executor) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(collection.size());
        for (T obj : collection) {
            CompletableFuture.runAsync(() -> {
                try {
                    function.accept(obj);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    latch.countDown();
                }
            }, executor);
        }
        latch.await();
    }

    /**
     * 异步执行任务
     * @param task        任务
     * @param latch       计数器
     * @param executor    线程池
     * @param errorPrefix 错误信息前缀
     */
    public static void runAsync(Runnable task, CountDownLatch latch, String errorPrefix, Executor executor) {
        // 执行传入的 Runnable 任务
        CompletableFuture.runAsync(task, executor).exceptionally(ex -> {
            if (ex != null) {
                log.error(errorPrefix, ex);
            }
            // 无论是否发生异常,都递减 CountDownLatch
            latch.countDown();
            return null;
        });
    }

    /**
     * 异步执行任务
     * @param task        任务
     * @param latch       计数器
     * @param errorPrefix 错误信息前缀
     */
    public static void runAsync(Runnable task, CountDownLatch latch, String errorPrefix) {
        // 执行传入的 Runnable 任务
        CompletableFuture.runAsync(task).exceptionally(ex -> {
            if (ex != null) {
                log.error(errorPrefix, ex);
            }
            // 无论是否发生异常,都递减 CountDownLatch
            latch.countDown();
            return null;
        });
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#java#jdk8#多线程
上次更新: 2024/01/11 17:06:16
java 8 - Lambda表达式
java 17 - 增强型伪随机数生成器

← java 8 - Lambda表达式 java 17 - 增强型伪随机数生成器→

最近更新
01
JVM 底层
09-13
02
JVM 理论
09-13
03
JVM 应用
09-13
更多文章>
Theme by Vdoing | Copyright © 2022-2024 kinoko | MIT License | 粤ICP备2024165634号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式