前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >并发设计模式实战系列(15):Future/Promise

并发设计模式实战系列(15):Future/Promise

作者头像
摘星.
发布2025-05-20 15:06:14
发布2025-05-20 15:06:14
11100
代码可运行
举报
文章被收录于专栏:博客专享博客专享
运行总次数:0
代码可运行

🌟 大家好,我是摘星! 🌟

今天为大家带来的是并发设计模式实战系列,第十五章Future/Promise,废话不多说直接开始~

一、核心原理深度拆解

1. 异步计算双阶段模型
代码语言:javascript
代码运行次数:0
运行
复制
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Task      │───>│   Future    │───>│   Callback  │
│ Submission  │<───│  (Promise)  │<───│  Execution  │
└─────────────┘    └─────────────┘    └─────────────┘
  • 提交阶段:主线程提交任务后立即返回Future占位符
  • 计算阶段:工作线程异步执行计算,通过Promise设置结果
  • 回调阶段:结果就绪后触发回调(观察者模式)
2. 状态机流转
代码语言:javascript
代码运行次数:0
运行
复制
public interface Future<V> {
    boolean isDone();      // 完成状态(成功/失败/取消)
    V get() throws...;     // 阻塞获取结果
    void addCallback(...); // 回调注册
}

二、生活化类比:快递柜取件

系统组件

现实类比

核心行为

Future

快递柜取件码

凭码查询包裹是否到达

Promise

快递员存件操作

实际将包裹放入柜中并更新状态

Callback

短信通知服务

包裹入柜后自动发送取件提醒

  • 异步流程:下单→获得取件码(Future)→快递员送货(异步计算)→短信通知(Callback)

三、Java代码实现(生产级Demo)

1. 完整可运行代码
代码语言:javascript
代码运行次数:0
运行
复制
import java.util.concurrent.*;
import java.util.function.Consumer;

public class FuturePromiseDemo {

    // 1. 自定义Promise实现
    static class MyPromise<V> implements Future<V>, Runnable {
        private volatile V result;
        private volatile Throwable error;
        private volatile boolean isDone;
        private final CountDownLatch latch = new CountDownLatch(1);
        private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();

        // 提交任务时执行的方法
        @Override
        public void run() {
            try {
                // 模拟耗时计算
                Thread.sleep(1000);
                setResult((V) "计算结果"); // 实际业务逻辑替换此处
            } catch (Exception e) {
                setError(e);
            }
        }

        // Promise核心方法:设置结果
        public void setResult(V result) {
            this.result = result;
            this.isDone = true;
            latch.countDown();
            notifyCallbacks();
        }

        // Promise核心方法:设置异常
        public void setError(Throwable error) {
            this.error = error;
            this.isDone = true;
            latch.countDown();
        }

        private void notifyCallbacks() {
            callbacks.forEach(cb -> cb.accept(result));
        }

        // Future实现方法
        @Override
        public V get() throws InterruptedException, ExecutionException {
            latch.await();
            if (error != null) throw new ExecutionException(error);
            return result;
        }

        @Override
        public boolean isDone() {
            return isDone;
        }

        // 注册回调(非JUC标准方法)
        public void addCallback(Consumer<V> callback) {
            if (isDone) {
                callback.accept(result);
            } else {
                callbacks.add(callback);
            }
        }
    }

    // 2. 使用示例
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 创建Promise并提交任务
        MyPromise<String> promise = new MyPromise<>();
        executor.submit(promise);

        // 注册回调
        promise.addCallback(result -> 
            System.out.println("[回调] 异步结果: " + result));

        // 同步阻塞获取
        System.out.println("[主线程] 立即返回,继续其他工作...");
        System.out.println("最终结果: " + promise.get());

        executor.shutdown();
    }
}
2. 关键机制说明
代码语言:javascript
代码运行次数:0
运行
复制
// 1. 状态同步控制
private volatile boolean isDone;  // 保证可见性
private final CountDownLatch latch; // 实现阻塞等待

// 2. 线程安全回调列表
private final List<Consumer<V>> callbacks = new CopyOnWriteArrayList<>();

// 3. 异常处理流程
public void setError(Throwable error) {
    this.error = error;
    this.isDone = true;
    latch.countDown(); // 唤醒所有等待线程
}

四、横向对比表格

1. 异步模式对比

模式

核心特点

适用场景

Java实现类

Future

阻塞式获取结果

简单异步任务

FutureTask

CompletableFuture

链式调用+组合操作

复杂异步流水线

CompletableFuture

Promise

可写的结果容器

跨线程结果传递

需自行实现

Callback

事件驱动无阻塞

高并发IO

Netty的ChannelFuture

2. 回调注册方式对比

方法

触发时机

线程安全性

链式支持

addCallback

结果就绪后立即执行

需自行保证

不支持

thenApply

前序阶段完成后触发

内置线程池控制

支持

whenComplete

无论成功失败都执行

可能在不同线程执行

支持


五、高级应用技巧

1. 组合多个异步任务
代码语言:javascript
代码运行次数:0
运行
复制
CompletableFuture<String> query1 = queryDatabase("sql1");
CompletableFuture<String> query2 = queryDatabase("sql2");

// 并行执行后合并结果
CompletableFuture<String> merged = query1.thenCombineAsync(query2, 
    (r1, r2) -> r1 + "|" + r2,
    ForkJoinPool.commonPool());
2. 超时控制
代码语言:javascript
代码运行次数:0
运行
复制
Future<String> future = executor.submit(task);
try {
    String result = future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true); // 中断任务执行
}
3. 回调线程控制
代码语言:javascript
代码运行次数:0
运行
复制
promise.addCallback(result -> {
    // 指定回调执行线程池
    ForkJoinPool.commonPool().execute(() -> processResult(result));
});

通过这种 原理+实现+对比 的立体解析,可以掌握:

  1. Future/Promise的双阶段异步本质
  2. 如何实现生产级的Promise容器
  3. 不同异步模式的适用场景选择
  4. 复杂场景下的组合使用技巧

六、源码级实现剖析(接五)

1. JDK FutureTask 核心逻辑
代码语言:javascript
代码运行次数:0
运行
复制
// 状态机定义(OpenJDK 17)
private volatile int state;
static final int NEW          = 0; // 初始化状态
static final int COMPLETING   = 1; // 临时状态
static final int NORMAL       = 2; // 正常完成
static final int EXCEPTIONAL  = 3; // 异常完成
static final int CANCELLED    = 4; // 已取消
static final int INTERRUPTING = 5; // 中断中
static final int INTERRUPTED  = 6; // 已中断

// 结果存储设计
private Object outcome; // 非volatile,依赖状态可见性保证
2. CompletableFuture 回调链实现
代码语言:javascript
代码运行次数:0
运行
复制
// 回调节点结构(简化版)
static final class UniCompletion<T,V> extends Completion {
    Executor executor;         // 执行线程池
    CompletableFuture<V> dep;   // 依赖的前序Future
    BiFunction<? super T,? super Throwable,? extends V> fn; // 回调函数

    void tryFire(int mode) {    // 触发回调执行
        if (dep != null && 
            compareAndSetState(0, 1)) { // CAS保证线程安全
            fn.apply(src, ex);  // 实际执行用户回调
        }
    }
}

七、生产环境最佳实践

1. 异常处理模板
代码语言:javascript
代码运行次数:0
运行
复制
CompletableFuture.supplyAsync(() -> {
        // 业务逻辑
        return doSomething();
    })
    .exceptionally(ex -> {      // 捕获所有异常
        log.error("任务失败", ex);
        return defaultValue;    // 提供降级值
    })
    .thenAccept(result -> {     // 只处理成功情况
        updateUI(result); 
    });
2. 资源清理策略
代码语言:javascript
代码运行次数:0
运行
复制
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
try {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        // 使用try-with-resources确保资源释放
        try (Connection conn = getConnection()) {
            process(conn);
        }
    }, executor);
    
    future.whenComplete((r, ex) -> {
        if (ex != null) {
            cleanupTempFiles(); // 失败时清理临时文件
        }
    });
} finally {
    executor.shutdown(); // 确保线程池关闭
}
3. 性能监控指标
代码语言:javascript
代码运行次数:0
运行
复制
// 监控Future完成时长
Timer.Sample sample = Timer.start();
future.whenComplete((r, ex) -> {
    sample.stop(registry.timer("async.task.time"));
});

// 监控队列积压
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
metrics.gauge("task.queue.size", pool.getQueue()::size);

八、与其他模式的协作

1. 结合发布-订阅模式
代码语言:javascript
代码运行次数:0
运行
复制
EventBus bus = new EventBus();
CompletableFuture.supplyAsync(() -> fetchData())
    .thenAccept(data -> {
        bus.post(new DataReadyEvent(data)); // 异步事件通知
    });

// 订阅方处理
@Subscribe
void handleDataReady(DataReadyEvent event) {
    // 处理已完成的数据
}
2. 与反应式编程整合
代码语言:javascript
代码运行次数:0
运行
复制
// CompletableFuture -> Mono
Mono.fromFuture(() -> {
    return CompletableFuture.supplyAsync(() -> {
        return reactiveDao.query();
    });
}).subscribeOn(Schedulers.boundedElastic())
  .subscribe(System.out::println);

// Mono -> CompletableFuture
reactorMono.toFuture().thenApply(...);

九、各语言实现对比

语言

核心实现类

特色功能

典型使用场景

Java

CompletableFuture

链式组合、CompletionStage

服务端异步编排

C#

Task

async/await语法糖

UI线程非阻塞调用

JavaScript

Promise

then/catch链式调用

前端API请求

Python

asyncio.Future

协程集成

爬虫/高并发IO

Go

chan

通道原生支持

高并发微服务


十、常见陷阱与解决方案

1. 回调地狱问题

反模式

代码语言:javascript
代码运行次数:0
运行
复制
future.thenApply(r1 -> {
    future2.thenApply(r2 -> {
        future3.thenApply(r3 -> {  // 嵌套层次过深
            return r1 + r2 + r3;
        });
    });
});

解决方案

代码语言:javascript
代码运行次数:0
运行
复制
// 使用组合式编程
CompletableFuture.allOf(future1, future2, future3)
    .thenApply(v -> {
        return future1.join() + 
               future2.join() + 
               future3.join();
    });
2. 线程泄漏场景

问题代码

代码语言:javascript
代码运行次数:0
运行
复制
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture.runAsync(() -> {
    while (true) {  // 无限循环任务
        process();
    }
}, executor);  // 线程永远无法回收

正确做法

代码语言:javascript
代码运行次数:0
运行
复制
// 使用守护线程或超时控制
ExecutorService executor = Executors.newFixedThreadPool(5, r -> {
    Thread t = new Thread(r);
    t.setDaemon(true);  // 设置为守护线程
    return t;
});
3. 上下文丢失问题

问题现象

代码语言:javascript
代码运行次数:0
运行
复制
SecurityContext ctx = getContext();
CompletableFuture.runAsync(() -> {
    // 此处无法获取原始上下文
    doPrivilegedAction(); 
}, executor);

解决方案

代码语言:javascript
代码运行次数:0
运行
复制
// 使用ContextPropagator
ExecutorService wrappedExecutor = ContextPropagator.wrap(executor);
CompletableFuture.runAsync(() -> {
    // 可以获取原始上下文
    doPrivilegedAction();
}, wrappedExecutor);
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-05-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、核心原理深度拆解
    • 1. 异步计算双阶段模型
    • 2. 状态机流转
  • 二、生活化类比:快递柜取件
  • 三、Java代码实现(生产级Demo)
    • 1. 完整可运行代码
    • 2. 关键机制说明
  • 四、横向对比表格
    • 1. 异步模式对比
    • 2. 回调注册方式对比
  • 五、高级应用技巧
    • 1. 组合多个异步任务
    • 2. 超时控制
    • 3. 回调线程控制
  • 六、源码级实现剖析(接五)
    • 1. JDK FutureTask 核心逻辑
    • 2. CompletableFuture 回调链实现
  • 七、生产环境最佳实践
    • 1. 异常处理模板
    • 2. 资源清理策略
    • 3. 性能监控指标
  • 八、与其他模式的协作
    • 1. 结合发布-订阅模式
    • 2. 与反应式编程整合
  • 九、各语言实现对比
  • 十、常见陷阱与解决方案
    • 1. 回调地狱问题
    • 2. 线程泄漏场景
    • 3. 上下文丢失问题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档