CompletableFuture 是 Java 中一个强大的工具,用于创建和管理异步任务。

初始化

CompletableFuture 是 Java 中一个强大的工具,用于创建和管理异步任务。它有多种初始化方式,可以根据具体需求选择适合的方式。以下是常见的几种方式:

1. 使用静态工厂方法

CompletableFuture 提供了几个静态方法来快速创建实例。

(1) CompletableFuture.completedFuture(T value)

​ • 创建一个已经完成的 CompletableFuture,并设置其结果为给定值。

​ • 适用于同步场景或需要提供初始值的任务。

示例

CompletableFuture<String> future = CompletableFuture.completedFuture("Initial Value");

System.out.println(future.join()); *// 输出: Initial Value*

(2) CompletableFuture.supplyAsync(Supplier supplier)

​ • 异步地执行 Supplier,返回一个结果。

​ • 默认使用 ForkJoinPool.commonPool。

示例

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
  return "Async Task Result";
});

System.out.println(future.join()); *// 输出: Async Task Result*

(3) CompletableFuture.supplyAsync(Supplier supplier, Executor executor)

​ • 异步地执行 Supplier,使用自定义线程池。

示例

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {

  return "Custom Executor Result";

}, Executors.newCachedThreadPool());

System.out.println(future.join()); *// 输出: Custom Executor Result*

(4) CompletableFuture.runAsync(Runnable runnable)

​ • 异步地执行 Runnable,不返回结果。

示例

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {

  System.out.println("Running an async task!");

});

future.join();

(5) CompletableFuture.runAsync(Runnable runnable, Executor executor)

​ • 异步地执行 Runnable,使用自定义线程池。

示例

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {

  System.out.println("Task with custom executor!");

}, Executors.newFixedThreadPool(2));

future.join();

2. 使用构造方法创建

CompletableFuture 也可以通过构造方法直接创建实例,随后手动完成任务。

CompletableFuture<String> future = new CompletableFuture<>();

// 在某处手动完成任务
new Thread(() -> {
    try {
        Thread.sleep(1000);
        future.complete("Manually Completed!");
    } catch (InterruptedException e) {
        future.completeExceptionally(e);
    }
}).start();

System.out.println(future.join()); // 输出: Manually Completed!

3. 使用 anyOf 和 allOf

这些方法用于组合多个 CompletableFuture。

(1) CompletableFuture.anyOf

​ • 返回一个新的 CompletableFuture,其结果是第一个完成的任务的结果。

示例

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");



CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);

System.out.println(result.join()); *// 输出: Task 1 或 Task 2*

(2) CompletableFuture.allOf

​ • 返回一个新的 CompletableFuture,当所有任务都完成时才完成。

示例

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Task 1");

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Task 2");

CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2);

all.join(); *// 等待所有任务完成*

System.out.println(future1.join() + " & " + future2.join()); *// 输出: Task 1 & Task 2*

4. 使用异步任务链生成

通过方法链式调用创建复杂的异步任务。

示例

CompletableFuture<Void> chain = CompletableFuture.supplyAsync(() -> "Step 1")
  .thenApply(result -> result + " -> Step 2")
  .thenAccept(finalResult -> System.out.println("Final Result: " + finalResult));

chain.join();

输出

Final Result: Step 1 -> Step 2

5. 从已有结果或异常创建

(1) CompletableFuture.completedFuture

​ • 返回已完成的 CompletableFuture。

示例

CompletableFuture<String> future = CompletableFuture.completedFuture("Already Done!");

System.out.println(future.join()); *// 输出: Already Done!*

(2) 使用 CompletableFuture.failedFuture(Java 9+)

​ • 返回一个已完成的 CompletableFuture,但带有异常。

示例

CompletableFuture<String> future = CompletableFuture.failedFuture(new RuntimeException("Failure!"));

future.exceptionally(ex -> {

  System.out.println("Caught exception: " + ex.getMessage());

  return null;

}).join();

输出

Caught exception: Failure!

总结

方法 异步执行 是否使用默认线程池 是否需要手动完成
completedFuture
supplyAsync 是(可指定)
runAsync 是(可指定)
手动完成(构造方法)
anyOf/allOf 取决于内部任务

过程接口

CompletableFuture 提供了许多方法来处理异步任务链。这些方法的主要区别在于是否接收上一步的结果、是否返回新的结果,以及是否异步执行。以下是 thenApply、thenAccept、thenRun、thenRunAsync 和 thenCompose 的详细区别及示例:

1. thenApply

  • 用途:接收上一步的结果,并返回处理后的新结果。

  • 返回值:一个新的 CompletableFuture,包含处理后的结果。

  • 示例:

public class ThenApplyExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Step 1")
            .thenApply(result -> {
                System.out.println("Processing result: " + result);
                return result + " -> Step 2";
            }).thenApply(finalResult -> {
                System.out.println("Final result: " + finalResult);
                return finalResult + " -> Done";
            }).join();
    }
}
  • 输出:****
Processing result: Step 1
Final result: Step 1 -> Step 2
  1. thenAccept
  • 用途:接收上一步的结果,但不返回新结果(只执行某些操作)。

  • 返回值:CompletableFuture,表示该步骤完成但没有返回值。

  • 示例:

public class ThenAcceptExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Step 1")
            .thenAccept(result -> {
                System.out.println("Received result: " + result);
            }).join();
    }
}
  • 输出:
Received result: Step 1

3. thenRun

  • 用途:不接收上一步的结果,仅在上一步完成后执行一个任务。

  • 返回值:CompletableFuture,表示该步骤完成但没有返回值。

  • 示例:

public class ThenRunExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Step 1")
            .thenRun(() -> {
                System.out.println("Step 1 completed. Running follow-up task.");
            }).join();
    }
}

输出:

Step 1 completed. Running follow-up task.

4. thenRunAsync

  • 用途:与 thenRun 相同,但会在新的线程中异步执行任务。

  • 返回值:CompletableFuture,表示该步骤完成但没有返回值。

  • 示例:

public class ThenRunAsyncExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Step 1")
            .thenRunAsync(() -> {
                System.out.println(Thread.currentThread().getName() + ": Running follow-up task.");
            }).join();
    }
}
  • 输出:
ForkJoinPool.commonPool-worker-1: Running follow-up task.

5. thenCompose

  • 用途:接收上一步的结果,并返回一个新的 CompletableFuture,可用于动态连接新的异步任务。

  • 返回值:一个由新任务返回的 CompletableFuture。

  • 示例:

public class ThenComposeExample {
    public static void main(String[] args) {
        CompletableFuture.supplyAsync(() -> "Step 1")
            .thenCompose(result -> CompletableFuture.supplyAsync(() -> {
                System.out.println("Processing result: " + result);
                return result + " -> Step 2";
            })).thenAccept(finalResult -> {
                System.out.println("Final result: " + finalResult);
            }).join();
    }
}
  • 输出:
Processing result: Step 1
Final result: Step 1 -> Step 2

区别对比总结

方法 接收上一步结果 返回新结果 是否异步 常用场景
thenApply 转换并返回新结果
thenAccept 处理结果但不返回新值
thenRun 不关心结果,直接执行后续任务
thenRunAsync 不关心结果,异步执行后续任务
thenCompose 是(嵌套) 是(异步) 动态链接新的异步任务

如何选择?

  1. 需要转换结果:用 thenApply。
  2. 只需要处理结果:用 thenAccept。
  3. 只需要执行任务,不需要输入或输出:用 thenRun 或 thenRunAsync。
  4. 需要动态生成新的任务链:用 thenCompose。

其他

  1. 提供默认值或快速失败机制

在复杂的异步链中,completedFuture 可以用来提供默认值,避免异步任务失败时中断整个链。

示例:设置默认值

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    throw new RuntimeException("Task failed!");
}).exceptionally(ex -> "Default Value");

System.out.println(future.join()); // 输出: Default Value

示例:快速失败

CompletableFuture<String> failedFuture = CompletableFuture.completedFuture("Fallback Value")
    .thenCompose(result -> CompletableFuture.failedFuture(new RuntimeException("Quick Fail")));

failedFuture.exceptionally(ex -> {
    System.out.println("Caught exception: " + ex.getMessage());
    return null;
}).join();
// 输出: Caught exception: Quick Fail
  1. 延迟任务

虽然 completedFuture 本身是立即完成的,但可以与 thenCompose 和 delayedExecutor 结合,模拟延迟任务。

示例:延迟执行

CompletableFuture<String> delayedTask = CompletableFuture.completedFuture("Delayed Task")
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result, 
        CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS)));

System.out.println("Waiting...");
System.out.println(delayedTask.join()); // 3秒后输出: Delayed Task
  1. 与异常处理结合

使用 completedFuture 可以快速创建异常流,测试下游任务如何处理。

示例:模拟异常流

CompletableFuture<String> failedFuture = CompletableFuture.failedFuture(new RuntimeException("Simulated Failure"));

failedFuture.exceptionally(ex -> {
    System.out.println("Handled exception: " + ex.getMessage());
    return "Fallback Result";
}).thenAccept(result -> System.out.println("Result: " + result));

// 输出:
// Handled exception: Simulated Failure
// Result: Fallback Result

ForkJoin 入门

介绍

ForkJoinPool 是 Java 7 引入的一种线程池,专门设计用来支持大规模并行任务的执行,特别适合递归分治算法。与普通线程池不同,它以任务分解(fork)和任务合并(join)的方式高效利用线程资源。

ForkJoinPool 的特点

​ 1. 工作窃取算法:每个线程都有自己的双端队列(Deque);如果某个线程完成了自己的任务,它会尝试“窃取”其他线程的任务来执行,从而最大化线程的利用率。

​ 2. 递归任务支持:提供 ForkJoinTask 的子类 RecursiveTask 和 RecursiveAction,分别用于有返回值和无返回值的递归任务。

​ 3. 轻量级任务调度:任务的管理和调度开销比传统线程池小,因为它采用的是任务窃取而不是队列的全局锁。

​ 4. 高吞吐量:适合 CPU 密集型任务,最大化利用多核处理器。

基本用法

以下是使用 ForkJoinPool 和 RecursiveTask 实现并行计算的例子:

class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10; // 任务分割的阈值
    private final int start;
    private final int end;

    public SumTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if ((end - start) <= THRESHOLD) {
            // 小任务直接计算
            long sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 大任务拆分
            int mid = (start + end) / 2;
            SumTask leftTask = new SumTask(start, mid);
            SumTask rightTask = new SumTask(mid + 1, end);

            // 分别执行
            leftTask.fork();
            rightTask.fork();

            // 合并结果
            return leftTask.join() + rightTask.join();
        }
    }
}

public class ForkJoinExample {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();

        SumTask task = new SumTask(1, 1000);
        long result = pool.invoke(task);

        System.out.println("Sum: " + result); // 输出: Sum: 500500
    }
}

测试

public class WorkStealingBenchmark {

    public static void main(String[] args) {

        System.out.printf("%-10s %-30s %-10s%n", "Loop Count", "Description", "Time (ms)");
        withoutWorkStealing(100000);
        withWorkStealing(100000);
        System.out.println("[warmup done]");
        long[] times = new long[]{1_000, 100_000, 10_000_000, 1_000_000_000};
        for (long cnt : times) {
            withoutWorkStealing(cnt);
            withWorkStealing(cnt);
        }
        System.out.println("[benchmark done]");
    }

    public static void withWorkStealing(long loopCnt) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(8);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < loopCnt; i++) {
            int taskId = i;
            forkJoinPool.execute(() -> {
                if (taskId % 5 == 0) { // 重任务
                    heavyComputation();
                } else { // 轻任务
                    lightComputation();
                }
            });
        }

        forkJoinPool.shutdown();
        while (!forkJoinPool.isTerminated()) {
            // 等待所有任务完成
        }

        long endTime = System.currentTimeMillis();
        System.out.printf("%-10d %-30s %-10d%n", loopCnt, "With Work Stealing", (endTime - startTime));
    }

    public static void withoutWorkStealing(long loopCnt) {
        ExecutorService executor = Executors.newFixedThreadPool(8);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < loopCnt; i++) {
            int taskId = i;
            executor.execute(() -> {
                if (taskId % 5 == 0) { // 重任务
                    heavyComputation();
                } else { // 轻任务
                    lightComputation();
                }
            });
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
            // 等待所有任务完成
        }

        long endTime = System.currentTimeMillis();
        System.out.printf("%-10d %-30s %-10d%n", loopCnt, "Without Work Stealing", (endTime - startTime));
    }

    private static void heavyComputation() {
        for (int i = 0; i < 1_000_000; i++) {
            Math.sqrt(i);
        }
    }

    private static void lightComputation() {
        for (int i = 0; i < 100_000; i++) {
            Math.sqrt(i);
        }
    }

输出如下:

Loop Count Description                    Time (ms) 
100000     Without Work Stealing          124       
100000     With Work Stealing             41        
[warmup done]
1000       With Work Stealing             5         
10000      With Work Stealing             11        
100000     With Work Stealing             80        
1000000    With Work Stealing             403       
10000000   With Work Stealing             2945      
100000000  With Work Stealing             32283     
1000       Without Work Stealing          4         
10000      Without Work Stealing          11        
100000     Without Work Stealing          41        
1000000    Without Work Stealing          326       
10000000   Without Work Stealing          11129     
100000000  Without Work Stealing          267523    
[benchmark done]