
实践 -- 响应式编程改造DAG
前言
LLM这里按照传统DAG思路,写了一个编排框架,但是在推理模型问世后,单个推理模型耗时能达到120s以上,按照以往阻塞式编程的方式,线程池很快被占满,导致服务不可用,因此需要改造为响应式链路。
概念
首先先理清楚几个最核心的问题:
1. 无论怎么改,链路耗时问题始终摆在哪里,模型该120s返回还是120s返回,为什么改成响应式会有效果?
这个问题需要明确,这120s无法通过编程来消除,响应式的改造是为了提高系统吞吐率。举个例子:客服小二在一对一语音服务的时候,同一时间只能服务一个客户。客服小二在面对消息聊天窗口时,却可以一对多进行服务,这里就是资源利用率在不同形式下的不同表现。那么响应式改造要做的就是将一对一变成后者一对多异步形式。
原本线程需要阻塞等待模型120s返回后才能继续下一步。响应式改造后,模型请求任务提交后,线程就可以释放,等数据返回后再主动通知接下来的流程可以继续。
2. 那么这120s的时间跑哪里去了?
这部分时间被操作系统的I/O多路复用给“消耗”掉了。 操作系统的I/O多路复用(如Linux的epoll、macOS的kqueue、Windows的IOCP)是响应式编程的核心基础之一,应用程序通过select
/poll
/epoll
等机制告诉操作系统:”我对这些网络连接的变化感兴趣,有变化时通知我”,然后一个线程可以监控成千上万个连接,只有当某个连接有事件发生时才被唤醒处理,也就是callback机制。
3. callback和future,promise还有协程又有什么区别?
callback是基础,future和promise是抽象,协程是语言级别对异步编程的简化。由于使用回调或复杂的Future/Promise链进行异步编程可能导致代码复杂、难以理解和调试(即所谓的‘回调地狱’),协程应运而生,它提供了一种更简洁、更符合人类直觉的方式来编写异步代码。
改造核心点
任务节点返回Future
之前的任务是直接返回结果,本次改造为返回future,让任务有充分自主能力选择同步或者异步。
1 | public interface FlowTaskInstance { |
针对必须同步的任务,需要用线程池控制并发度,或者说隔离。避免调度链路出现同步等待情况。
1 | public abstract class AbstractFlowTaskSyncInstance extends AbstractTaskInstance { |
调度逻辑变更
使用递归式遍历节点,而非阻塞队列等待式。需要三个队列管控。核心逻辑参考:fun.libx.flow.FlowFutureExecuteGraph#processQueue
1 | /** |
节点执行合并错误和超时逻辑处理。超时主要依赖一个Schedule线程池进行监控,到达时间后强制结束任务。节点处理逻辑则参考:fun.libx.flow.FlowFutureExecuteGraph#executeNode
。
1 | /** |
MVC采取异步
MVC返回这里也需要异步化,否则就是同步等待,导致MVC的线程池反而成为了卡点。Spring MVC很好的支持了异步,只需要返回CompletableFuture,整个链路即为响应式链路。
1 |
|
改造成果
首先DAG如下图所示,Dealy节点是一个发起HTTP,5S之后才返回的节点。由于是并发逻辑,那么整个DAG耗时至少是5s。
线程池设置:
Tomcat线程池:10个Thread
HTTP Client:默认大小,设置每个地址允许200个连接(ConnPreRoute),避免连接数限制。
调度线程池:1个Thread
超时控制线程池:1个Thread
压测:
Postman免费版本最多设置100个user,那就按照100个user不断请求,压测3min。可以看到QPS达到14,平均响应耗时在5773ms,在调度初期由于链接建立,系统初始化,线程分配,存在一定的波动,后面就趋近于稳定状态。
使用Visual监控进程状况,线程使用率还是很低,一直处于park状态(采样是按照1s一次),证明系统还有很大的容量上线。
项目代码
https://github.com/mrdear/reactor-flow
- 版权声明: 感谢您的阅读,本文由屈定's Blog版权所有。如若转载,请注明出处。
- 文章标题: 实践 -- 响应式编程改造DAG
- 文章链接: https://mrdear.cn/posts/work-reactive-dag.html