CompletableFuture 是 Java 中一个强大的工具,用于创建和管理异步任务。
初始化
CompletableFuture 是 Java 中一个强大的工具,用于创建和管理异步任务。它有多种初始化方式,可以根据具体需求选择适合的方式。以下是常见的几种方式:
1. 使用静态工厂方法
CompletableFuture 提供了几个静态方法来快速创建实例。
(1) CompletableFuture.completedFuture(T value)
• 创建一个已经完成的 CompletableFuture,并设置其结果为给定值。
• 适用于同步场景或需要提供初始值的任务。
示例:
CompletableFuture<String> future = CompletableFuture.completedFuture("Initial Value");
System.out.println(future.join()); *// 输出: Initial Value*
(2) CompletableFuture.supplyAsync(Supplier
• 异步地执行 Supplier,返回一个结果。
• 默认使用 ForkJoinPool.commonPool。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Async Task Result";
});
System.out.println(future.join()); *// 输出: Async Task Result*
(3) CompletableFuture.supplyAsync(Supplier
• 异步地执行 Supplier,使用自定义线程池。
示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Custom Executor Result";
}, Executors.newCachedThreadPool());
System.out.println(future.join()); *// 输出: Custom Executor Result*
(4) CompletableFuture.runAsync(Runnable runnable)
• 异步地执行 Runnable,不返回结果。
示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Running an async task!");
});
future.join();
(5) CompletableFuture.runAsync(Runnable runnable, Executor executor)
• 异步地执行 Runnable,使用自定义线程池。
示例:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("Task with custom executor!");
}, Executors.newFixedThreadPool(2));
future.join();
2. 使用构造方法创建
CompletableFuture 也可以通过构造方法直接创建实例,随后手动完成任务。
CompletableFuture<String> future = new CompletableFuture<>();
// 在某处手动完成任务
new Thread(() -> {
try {
Thread.sleep(1000);
future.complete("Manually Completed!");
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
}).start();
System.out.println(future.join()); // 输出: Manually Completed!
3. 使用 anyOf 和 allOf
这些方法用于组合多个 CompletableFuture。
(1) CompletableFuture.anyOf
• 返回一个新的 CompletableFuture,其结果是第一个完成的任务的结果。
示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.join()); *// 输出: Task 1 或 Task 2*
(2) CompletableFuture.allOf
• 返回一个新的 CompletableFuture,当所有任务都完成时才完成。
示例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);
all.join(); *// 等待所有任务完成*
System.out.println(future1.join() + " & " + future2.join()); *// 输出: Task 1 & Task 2*
4. 使用异步任务链生成
通过方法链式调用创建复杂的异步任务。
示例:
CompletableFuture<Void> chain = CompletableFuture.supplyAsync(() -> "Step 1")
.thenApply(result -> result + " -> Step 2")
.thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));
chain.join();
输出:
Final Result: Step 1 -> Step 2
5. 从已有结果或异常创建
(1) CompletableFuture.completedFuture
• 返回已完成的 CompletableFuture。
示例:
CompletableFuture<String> future = CompletableFuture.completedFuture("Already Done!");
System.out.println(future.join()); *// 输出: Already Done!*
(2) 使用 CompletableFuture.failedFuture(Java 9+)
• 返回一个已完成的 CompletableFuture,但带有异常。
示例:
CompletableFuture<String> future = CompletableFuture.failedFuture(new RuntimeException("Failure!"));
future.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return null;
}).join();
输出:
Caught exception: Failure!
总结
方法 | 异步执行 | 是否使用默认线程池 | 是否需要手动完成 |
---|---|---|---|
completedFuture | 否 | 否 | 否 |
supplyAsync | 是 | 是(可指定) | 否 |
runAsync | 是 | 是(可指定) | 否 |
手动完成(构造方法) | 否 | 否 | 是 |
anyOf/allOf | 取决于内部任务 | 否 | 否 |
过程接口
CompletableFuture 提供了许多方法来处理异步任务链。这些方法的主要区别在于是否接收上一步的结果、是否返回新的结果,以及是否异步执行。以下是 thenApply、thenAccept、thenRun、thenRunAsync 和 thenCompose 的详细区别及示例:
1. thenApply
-
用途:接收上一步的结果,并返回处理后的新结果。
-
返回值:一个新的 CompletableFuture,包含处理后的结果。
-
示例:
public class ThenApplyExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Step 1")
.thenApply(result -> {
System.out.println("Processing result: " + result);
return result + " -> Step 2";
}).thenApply(finalResult -> {
System.out.println("Final result: " + finalResult);
return finalResult + " -> Done";
}).join();
}
}
- 输出:****
Processing result: Step 1
Final result: Step 1 -> Step 2
- thenAccept
-
用途:接收上一步的结果,但不返回新结果(只执行某些操作)。
-
返回值:CompletableFuture
,表示该步骤完成但没有返回值。 -
示例:
public class ThenAcceptExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Step 1")
.thenAccept(result -> {
System.out.println("Received result: " + result);
}).join();
}
}
- 输出:
Received result: Step 1
3. thenRun
-
用途:不接收上一步的结果,仅在上一步完成后执行一个任务。
-
返回值:CompletableFuture
,表示该步骤完成但没有返回值。 -
示例:
public class ThenRunExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Step 1")
.thenRun(() -> {
System.out.println("Step 1 completed. Running follow-up task.");
}).join();
}
}
输出:
Step 1 completed. Running follow-up task.
4. thenRunAsync
-
用途:与 thenRun 相同,但会在新的线程中异步执行任务。
-
返回值:CompletableFuture
,表示该步骤完成但没有返回值。 -
示例:
public class ThenRunAsyncExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Step 1")
.thenRunAsync(() -> {
System.out.println(Thread.currentThread().getName() + ": Running follow-up task.");
}).join();
}
}
- 输出:
ForkJoinPool.commonPool-worker-1: Running follow-up task.
5. thenCompose
-
用途:接收上一步的结果,并返回一个新的 CompletableFuture,可用于动态连接新的异步任务。
-
返回值:一个由新任务返回的 CompletableFuture。
-
示例:
public class ThenComposeExample {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> "Step 1")
.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println("Processing result: " + result);
return result + " -> Step 2";
})).thenAccept(finalResult -> {
System.out.println("Final result: " + finalResult);
}).join();
}
}
- 输出:
Processing result: Step 1
Final result: Step 1 -> Step 2
区别对比总结
方法 | 接收上一步结果 | 返回新结果 | 是否异步 | 常用场景 |
---|---|---|---|---|
thenApply | 是 | 是 | 否 | 转换并返回新结果 |
thenAccept | 是 | 否 | 否 | 处理结果但不返回新值 |
thenRun | 否 | 否 | 否 | 不关心结果,直接执行后续任务 |
thenRunAsync | 否 | 否 | 是 | 不关心结果,异步执行后续任务 |
thenCompose | 是 | 是(嵌套) | 是(异步) | 动态链接新的异步任务 |
如何选择?
- 需要转换结果:用 thenApply。
- 只需要处理结果:用 thenAccept。
- 只需要执行任务,不需要输入或输出:用 thenRun 或 thenRunAsync。
- 需要动态生成新的任务链:用 thenCompose。
其他
- 提供默认值或快速失败机制
在复杂的异步链中,completedFuture 可以用来提供默认值,避免异步任务失败时中断整个链。
示例:设置默认值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("Task failed!");
}).exceptionally(ex -> "Default Value");
System.out.println(future.join()); // 输出: Default Value
示例:快速失败
CompletableFuture<String> failedFuture = CompletableFuture.completedFuture("Fallback Value")
.thenCompose(result -> CompletableFuture.failedFuture(new RuntimeException("Quick Fail")));
failedFuture.exceptionally(ex -> {
System.out.println("Caught exception: " + ex.getMessage());
return null;
}).join();
// 输出: Caught exception: Quick Fail
- 延迟任务
虽然 completedFuture 本身是立即完成的,但可以与 thenCompose 和 delayedExecutor 结合,模拟延迟任务。
示例:延迟执行
CompletableFuture<String> delayedTask = CompletableFuture.completedFuture("Delayed Task")
.thenCompose(result -> CompletableFuture.supplyAsync(() -> result,
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS)));
System.out.println("Waiting...");
System.out.println(delayedTask.join()); // 3秒后输出: Delayed Task
- 与异常处理结合
使用 completedFuture 可以快速创建异常流,测试下游任务如何处理。
示例:模拟异常流
CompletableFuture<String> failedFuture = CompletableFuture.failedFuture(new RuntimeException("Simulated Failure"));
failedFuture.exceptionally(ex -> {
System.out.println("Handled exception: " + ex.getMessage());
return "Fallback Result";
}).thenAccept(result -> System.out.println("Result: " + result));
// 输出:
// Handled exception: Simulated Failure
// Result: Fallback Result
ForkJoin 入门
介绍
ForkJoinPool 是 Java 7 引入的一种线程池,专门设计用来支持大规模并行任务的执行,特别适合递归分治算法。与普通线程池不同,它以任务分解(fork)和任务合并(join)的方式高效利用线程资源。
ForkJoinPool 的特点
1. 工作窃取算法:每个线程都有自己的双端队列(Deque);如果某个线程完成了自己的任务,它会尝试“窃取”其他线程的任务来执行,从而最大化线程的利用率。
2. 递归任务支持:提供 ForkJoinTask 的子类 RecursiveTask 和 RecursiveAction,分别用于有返回值和无返回值的递归任务。
3. 轻量级任务调度:任务的管理和调度开销比传统线程池小,因为它采用的是任务窃取而不是队列的全局锁。
4. 高吞吐量:适合 CPU 密集型任务,最大化利用多核处理器。
基本用法
以下是使用 ForkJoinPool 和 RecursiveTask 实现并行计算的例子:
class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10; // 任务分割的阈值
private final int start;
private final int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if ((end - start) <= THRESHOLD) {
// 小任务直接计算
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 大任务拆分
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);
// 分别执行
leftTask.fork();
rightTask.fork();
// 合并结果
return leftTask.join() + rightTask.join();
}
}
}
public class ForkJoinExample {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(1, 1000);
long result = pool.invoke(task);
System.out.println("Sum: " + result); // 输出: Sum: 500500
}
}
测试
public class WorkStealingBenchmark {
public static void main(String[] args) {
System.out.printf("%-10s %-30s %-10s%n", "Loop Count", "Description", "Time (ms)");
withoutWorkStealing(100000);
withWorkStealing(100000);
System.out.println("[warmup done]");
long[] times = new long[]{1_000, 100_000, 10_000_000, 1_000_000_000};
for (long cnt : times) {
withoutWorkStealing(cnt);
withWorkStealing(cnt);
}
System.out.println("[benchmark done]");
}
public static void withWorkStealing(long loopCnt) {
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
long startTime = System.currentTimeMillis();
for (int i = 0; i < loopCnt; i++) {
int taskId = i;
forkJoinPool.execute(() -> {
if (taskId % 5 == 0) { // 重任务
heavyComputation();
} else { // 轻任务
lightComputation();
}
});
}
forkJoinPool.shutdown();
while (!forkJoinPool.isTerminated()) {
// 等待所有任务完成
}
long endTime = System.currentTimeMillis();
System.out.printf("%-10d %-30s %-10d%n", loopCnt, "With Work Stealing", (endTime - startTime));
}
public static void withoutWorkStealing(long loopCnt) {
ExecutorService executor = Executors.newFixedThreadPool(8);
long startTime = System.currentTimeMillis();
for (int i = 0; i < loopCnt; i++) {
int taskId = i;
executor.execute(() -> {
if (taskId % 5 == 0) { // 重任务
heavyComputation();
} else { // 轻任务
lightComputation();
}
});
}
executor.shutdown();
while (!executor.isTerminated()) {
// 等待所有任务完成
}
long endTime = System.currentTimeMillis();
System.out.printf("%-10d %-30s %-10d%n", loopCnt, "Without Work Stealing", (endTime - startTime));
}
private static void heavyComputation() {
for (int i = 0; i < 1_000_000; i++) {
Math.sqrt(i);
}
}
private static void lightComputation() {
for (int i = 0; i < 100_000; i++) {
Math.sqrt(i);
}
}
输出如下:
Loop Count Description Time (ms)
100000 Without Work Stealing 124
100000 With Work Stealing 41
[warmup done]
1000 With Work Stealing 5
10000 With Work Stealing 11
100000 With Work Stealing 80
1000000 With Work Stealing 403
10000000 With Work Stealing 2945
100000000 With Work Stealing 32283
1000 Without Work Stealing 4
10000 Without Work Stealing 11
100000 Without Work Stealing 41
1000000 Without Work Stealing 326
10000000 Without Work Stealing 11129
100000000 Without Work Stealing 267523
[benchmark done]