Subagent 架构工程化落地指南
多种实现方案的对比与选型建议,由于AI发展迅速,新的框架、工具和最佳实践可能会不断涌现,文档中的部分内容可能会随着时间的推移而变得过时。例如,新的异步编程框架或消息队列系统可能会提供更好的性能和功能。
什么是 Subagent?
定义
Subagent(子代理)是将一个复杂 AI 任务拆分成多个专门的子任务,由不同的"专家 Agent"协作完成的架构模式。
核心原则:职责单一,边界明确
- 单一 Agent:一个 Agent 处理一个完整的简单任务
- Subagent:多个 Agent 各自处理自己擅长的子任务,边界清晰
每种方式都有明确的适用场景,关键是根据任务复杂度选择合适的方案。
单一 Agent vs Subagent
单一 Agent 方式:
用户请求 → AI Agent → 返回结果
核心特点:职责单一
- 一个 Agent 完成一个独立的任务
- 任务边界明确,输入输出清晰
适用场景:
- ✅ 简单、单一任务(如:"翻译这段文本")
- ✅ 快速响应需求(如:"代码补全")
- ✅ 原型验证阶段(快速迭代)
优势:
- 实现简单,易于理解
- 响应快速(一次调用)
- 适合明确、独立的任务
Subagent 方式(多 Agent 协作):
用户请求 → Master Agent(协调者)
├─→ Code Agent(写代码专家)
├─→ Debug Agent(调试专家)
├─→ Research Agent(搜索专家)
├─→ Review Agent(审查专家)
└─→ Test Agent(测试专家)
└─→ 汇总结果
核心特点:边界明确
- 每个 Agent 只做一件事(职责单一)
- Agent 之间的边界清晰(输入输出明确)
- Master Agent 负责协调,不干预具体执行
适用场景:
- ✅ 复杂、多步骤任务(如:"开发完整功能模块")
- ✅ 需要多种专业能力(如:"研究 + 分析 + 报告")
- ✅ 高质量要求(如:"生产代码、正式文档")
优势:
- 每个 Agent 专注自己的领域
- 任务分步完成,每步可验证
- 容易定位问题环节
- 可针对不同 Agent 使用不同模型(成本优化)
典型应用场景
场景 1:代码生成与审查
用户:"帮我写一个用户认证功能"
流程:
1. Code Agent → 生成代码(使用 GPT-4o,质量高)
2. Review Agent → 审查代码安全漏洞(使用 GPT-4o)
3. Test Agent → 生成单元测试(使用 GPT-4o-mini,便宜)
4. Master → 汇总结果给用户
总耗时:约 30 秒
总成本:三个 Agent 协同,比单次调用更可靠
场景 2:深度研究任务
用户:"研究一下 Rust 和 Go 的性能对比"
流程:
1. Research Agent → 搜索相关资料(3 次并行搜索)
2. Analyze Agent → 分析整理数据
3. Writer Agent → 生成报告
4. Review Agent → 事实核查
总耗时:约 2 分钟
如果单个 Agent 做:可能 5-10 分钟,且容易遗漏
场景 3:复杂问题拆解
用户:"帮我优化这段慢查询 SQL"
流程:
1. Analyze Agent → 分析 SQL 执行计划
2. Optimize Agent → 提出优化方案
3. Verify Agent → 验证优化效果(模拟执行)
4. Document Agent → 生成优化文档
每个 Agent 用最合适的模型和工具
Subagent 解决的核心问题
| 问题 | Subagent 解决方案 |
|---|---|
| 复杂任务质量差 | 拆分成子任务,每步由专家完成 |
| 成本难以控制 | 简单任务用便宜模型(如 gpt-4o-mini),复杂任务用强力模型 |
| 出错难定位 | 每个子任务独立执行,容易找出问题环节 |
| 无法并行处理 | 多个子 Agent 可以并行工作,提升速度 |
| 缺乏针对性优化 | 每个 Agent 可以有自己的 prompt、工具、知识库 |
什么时候需要 Subagent?
适合使用 Subagent 的场景:
- ✅ 任务需要多个步骤(如:写代码 → 测试 → 文档)
- ✅ 任务需要多种能力(如:搜索 + 分析 + 生成)
- ✅ 对质量要求高(如:生产代码、正式报告)
- ✅ 任务耗时较长(> 30秒),需要进度反馈
- ✅ 需要多个 Agent 并行工作以提升速度
不需要 Subagent 的场景:
- ❌ 简单问答(如:"这个 API 怎么用?")
- ❌ 单步任务(如:"总结这段文本")
- ❌ 快速原型验证(越简单越好)
- ❌ 资源受限(Subagent 会增加复杂度和成本)
⚠️ 重要说明
本文档提供的时间阈值和选型建议仅供参考,并非绝对标准。
实际选择时需要考虑的因素:
- 您的具体业务需求(任务复杂度、用户量、SLA 要求)
- 团队能力(熟悉的技术栈、运维经验)
- 成本预算(服务器资源、开发时间成本)
- 发展阶段(原型验证 vs 生产环境)
举例:
- 即使任务 < 5秒,如果是生产环境且需要高可靠性,也可能选择队列系统
- 即使任务 > 30秒,如果是内部工具且允许重启丢失,也可能用协程
- 小团队用同步调用先上线验证,比直接上复杂队列系统更务实
核心原则:从简单开始,根据实际需求逐步演进。不要过度设计。
一、方案概览
根据场景选择实现方式
| 场景 | 建议方案 | 复杂度 | 适用条件 |
|---|---|---|---|
| 简单 LLM 调用 | 同步调用 | 最低 | 单次 LLM 调用,通常 < 5秒 |
| 中等复杂度 | 协程/异步 | 中等 | 需要并发控制,通常 5-30秒 |
| 长时间任务 | 队列 + Worker | 较高 | 需要重试、持久化、分布式 |
| 企业级生产 | 队列 + 监控 | 高 | 需要完整的运维能力 |
四种实现方案对比
| 维度 | 同步调用 | 协程/异步 | 队列系统 |
|---|---|---|---|
| 开发复杂度 | ⭐ 最低 | ⭐⭐ 中等 | ⭐⭐⭐ 较高 |
| 性能 | 低(阻塞) | 高(并发) | 中(网络开销) |
| 可靠性 | 低 | 中 | 高(重试+持久化) |
| 运维成本 | 最低 | 低 | 中(需要维护队列) |
| 分布式支持 | ❌ | ❌ | ✅ |
| 典型任务时长 | 通常 < 5秒 | 通常 5-30秒 | 任意时长 |
二、方案 1:同步调用(最简单)
适用场景
- 单次 LLM 调用(通常 < 5 秒)
- 内部工具或原型验证
- 对可靠性要求不高
- 用户并发量低(通常 < 10 人同时使用)
典型例子:
- 代码补全:约 1-2 秒
- 简单问答:约 1-3 秒
- 文本摘要:约 2-4 秒
优点
- ✅ 代码最简单,易于理解
- ✅ 无需额外依赖(Redis、Celery 等)
- ✅ 调试方便,同步逻辑清晰
- ✅ 无需处理异步结果回调
缺点
- ❌ 阻塞主线程,影响并发性能
- ❌ 任务失败需要自己实现重试
- ❌ 超时控制需要自己处理
- ❌ 无法处理长时间任务
架构示意
┌──────────┐
│ Browser │
└────┬─────┘
│ HTTP POST
↓
┌──────────────────┐
│ Master Agent │
│ │
│ 直接调用 Subagent │
│ (同步阻塞) │
└────────┬─────────┘
│
↓
返回结果给 Browser
代码示例(Python FastAPI)
pythonfrom fastapi import FastAPI import openai app = FastAPI() @app.post("/api/task") async def submit_task(task: str): # 直接同步调用 OpenAI API response = openai.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": task}] ) # 直接返回结果 return { "result": response.choices[0].message.content }
三、方案 2:协程/异步(性能优化)
适用场景
- 任务时长通常 5-30 秒
- 单机可承载负载(通常 < 1000 并发)
- 需要并发性能优化
- 允许服务重启时丢失少量任务
典型例子:
- 代码生成 + 审查(两次 LLM 调用):约 5-10 秒
- 文档分析 + 总结:约 10-20 秒
- 搜索 + LLM 分析:约 10-15 秒
- 简单代码执行 + 结果:约 5-15 秒
优点
- ✅ 高并发性能(单机处理大量任务)
- ✅ 代码相对简单
- ✅ 无需额外中间件
- ✅ 原生并发模型,性能最优
缺点
- ❌ 重试机制需要自己实现
- ❌ 任务无持久化(重启丢失)
- ❌ 死信队列需要自己开发
- ❌ 单机限制(无法分布式)
架构示意
┌──────────┐
│ Browser │
└────┬─────┘
│ HTTP/WS
↓
┌────────────────────────┐
│ Master Agent │
│ │
│ ┌─────────────────┐ │
│ │ 协程池 │ │
│ │ - goroutine 1 │ │
│ │ - goroutine 2 │ │
│ │ - coroutine 1 │ │
│ └─────────────────┘ │
└──────────┬────────────┘
│
↓
结果通过 WS 推回
四、方案 3:队列系统(生产级)
适用场景
- 任务时长通常 > 30 秒
- 需要任务持久化(服务重启不丢失)
- 需要自动重试和死信队列
- 需要分布式部署(多台服务器)
- 需要完善的监控和管理
典型例子:
- 深度研究 Subagent(多轮搜索 + 分析):约 1-5 分钟
- 大型代码库重构分析:约 1-10 分钟
- 批量文档处理:数分钟到数小时
- 复杂工作流(多个 Subagent 协作):约 30 秒以上
优点
- ✅ 自动重试 + 指数退避
- ✅ 死信队列处理失败任务
- ✅ 任务持久化,重启恢复
- ✅ 分布式,多 Worker 负载均衡
- ✅ 成熟的监控工具(如 Flower)
- ✅ 优先级队列、定时任务
缺点
- ❌ 架构复杂,组件多
- ❌ 需要维护 Redis/RabbitMQ
- ❌ 性能略低于协程(网络开销)
- ❌ 开发和调试成本高
架构示意
┌──────────┐
│ Browser │
└────┬─────┘
│ HTTP
↓
┌────────────────────────┐
│ Master Agent │
│ (提交任务到队列) │
└────────┬─────────────┘
│
↓
┌─────────┐
│ Redis │
│ Queue │
└────┬────┘
│
↓
┌────────────────────────┐
│ Worker Pool │
│ - Worker 1 │
│ - Worker 2 │
│ - Worker 3 │
└──────────────────────┘
│
↓
结果通过 WS 推回
代码示例(Python Celery)
pythonfrom celery import Celery from celery.exceptions import Retry app = Celery('subagent', broker='redis://localhost:6379/0') @app.task(bind=True, max_retries=3, default_retry_delay=60) def code_agent(self, task: str): try: response = openai.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": task}] ) return response.choices[0].message.content except openai.APIError as e: # 自动重试,指数退避 raise self.retry(exc=e) # 提交任务 result = code_agent.delay('写一个排序算法')
五、分布式场景下的 WebSocket 结果推送
问题:多台服务器如何推送?
场景:
Client → Server A (建立 WS 连接)
↓
提交任务到 Redis Queue
↓
Server B 的 Worker 处理任务
↓
Server B 处理完成,如何推送给 Server A 上的客户端?
解决方案 1:全局订阅 + 连接检查(推荐)
架构图:
┌──────────────────────────────────────────────────┐
│ Redis Pub/Sub │
│ channel: subagent:results │
└───────┬──────────────┬──────────────┬────────┘
│ │ │
Server A Server B Server C
(订阅) (订阅) (订阅)
│ │ │
检查连接 检查连接 检查连接
在本机? 在本机? 在本机?
工作流程:
- 所有服务器都订阅 Redis Pub/Sub 频道
- 任意 Worker 完成任务后发布结果到 Redis
- 所有服务器收到消息
- 每台服务器检查该 task_id 的 WebSocket 连接是否在本机
- 只有持有该连接的服务器才推送结果
代码示例(Go):
go// 每台服务器都订阅 func (m *MasterAgent) resultSubscriber() { pubsub := m.redis.Subscribe(context.Background(), "subagent:results") channel := pubsub.Channel() for msg := range channel { var result TaskResult json.Unmarshal([]byte(msg.Payload), &result) // 关键:检查连接是否在本机 m.mu.RLock() conn, exists := m.wsConns[result.TaskID] m.mu.RUnlock() if exists { // 只有本机有这个连接才推送 conn.WriteJSON(WebSocketMessage{ Type: "task_result", Data: result, }) log.Printf("[PUSH] 本机推送结果: %s", result.TaskID) } else { // 不是本机的连接,忽略 log.Printf("[IGNORE] 任务 %s 的连接在其他服务器", result.TaskID) } } }
优点:
- ✅ 简单易实现
- ✅ 无需复杂的路由逻辑
- ✅ 天然支持服务器动态增减
缺点:
- ❌ 所有服务器都收到消息(网络开销)
- ❌ 需要每台服务器都维护 Redis 连接
解决方案 2:连接注册 + 精准路由
架构图:
┌──────────────────────────────────────┐
│ Redis │
│ ┌────────────────────────┐ │
│ │ conn_registry │ │
│ │ task_123 -> Server A │ │
│ │ task_456 -> Server B │ │
│ └────────────────────────┘ │
└──────────────────────────────────────┘
↓ ↓
Server A Server B
只有 task_123 只有 task_456
工作流程:
- 客户端连接时,将
{task_id: server_id}注册到 Redis - Worker 完成任务后,查询 Redis 该任务属于哪台服务器
- 只有对应的服务器才从队列拉取并推送
代码示例(Redis Hash 存储):
go// 客户端连接时注册 func (m *MasterAgent) registerConnection(taskID, serverID string) { m.redis.HSet(context.Background(), "conn_registry", taskID, serverID) log.Printf("[REGISTER] 任务 %s 注册到服务器 %s", taskID, serverID) } // Worker 完成后查询 func (m *MasterAgent) publishResult(result TaskResult) { // 查询任务属于哪台服务器 serverID, _ := m.redis.HGet(context.Background(), "conn_registry", result.TaskID).Result() if serverID == config.ServerID { // 是本机的任务,直接推送 m.pushToClient(result) } else { // 其他服务器的任务,发布到专用频道 channel := fmt.Sprintf("server:%s:results", serverID) m.redis.Publish(context.Background(), channel, result) log.Printf("[ROUTE] 路由结果到服务器 %s", serverID) } } // 每台服务器只订阅自己的频道 func (m *MasterAgent) subscribeMyResults() { channel := fmt.Sprintf("server:%s:results", config.ServerID) pubsub := m.redis.Subscribe(context.Background(), channel) for msg := range pubsub.Channel() { var result TaskResult json.Unmarshal([]byte(msg.Payload), &result) // 推送到本机客户端 m.pushToClient(result) } }
优点:
- ✅ 精准路由,减少网络开销
- ✅ 每台服务器只处理自己的结果
- ✅ 可扩展性好
缺点:
- ❌ 需要维护连接注册表
- ❌ Redis 成为中心依赖
解决方案 3:Sticky Session(会话保持)
架构图:
┌─────────────┐
│ Nginx/LB │
│ (ip_hash) │
└──────┬──────┘
│
Sticky Session
(同一客户端总是到同一服务器)
│
┌───────────┴───────────┐
│ │ │
Server A Server B Server C
(WS + Task都在同一机器)
Nginx 配置示例:
nginxupstream backend { ip_hash; # 根据 IP 分配,保证同一用户到同一服务器 server 192.168.1.10:8000; server 192.168.1.11:8000; server 192.168.1.12:8000; } server { listen 80; location / { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; } }
优点:
- ✅ 最简单,无需 Redis Pub/Sub
- ✅ WS 连接和任务处理在同一机器
- ✅ 无需跨机器通信
缺点:
- ❌ 负载不均衡(某个服务器负载高)
- ❌ 服务器故障会丢失连接
- ❌ 不适合动态扩缩容
三种方案对比
| 方案 | 复杂度 | 网络开销 | 负载均衡 | 故障隔离 | 推荐场景 |
|---|---|---|---|---|---|
| 全局订阅 | 低 | 高(广播) | 好 | 好 | 中小规模(< 10 台服务器) |
| 精准路由 | 中 | 低 | 最好 | 好 | 大规模(> 10 台服务器) |
| Sticky Session | 最低 | 无 | 差 | 差 | 简单部署、会话应用 |
实际生产建议
小规模(通常 < 5 台服务器):
→ 可考虑 Sticky Session(最简单)
中等规模(通常 5-20 台):
→ 可考虑全局订阅 + 连接检查
大规模(通常 > 20 台):
→ 建议精准路由 + 连接注册
六、通信方式选择
任务提交方式对比
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| HTTP POST | 简单、通用、RESTful | 无实时性 | 一般任务提交 |
| WebSocket | 双向通信、实时 | 需要维护连接 | 频繁交互 |
| gRPC | 高性能、强类型 | 需要定义 proto | 微服务间通信 |
结果推送方式对比
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| WebSocket | 实时推送、双向 | 需要维护连接 | 需要实时反馈 |
| HTTP 轮询 | 简单、无状态 | 浪费资源、延迟 | 低频查询 |
| SSE | 单向推送、简单 | 只能服务器推客户端 | 简单推送场景 |
| Redis Pub/Sub | 多订阅者、解耦 | 需要额外组件 | 多消费者 |
六、Go 实现方案(协程模式)
方案特点
- 并发模型: goroutine + channel
- 适用场景: 高并发单机应用
- 任务时长: < 30 秒
架构图
┌────────────┐
│ Browser │
│ │
│ WS/HTTP │
└─────┬─────┘
↓
┌──────────────────────────┐
│ Master Agent (Go) │
│ │
│ ┌────────────────────┐ │
│ │ Task Coordinator │ │
│ └────────┬───────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ Goroutine Pool │ │
│ │ - code_agent │ │
│ │ - math_agent │ │
│ │ - research_agent │ │
│ └──────────────────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ Result Channel │ │
│ └──────────┬───────────┘ │
└───────────────┼──────────────┘
│
↓
Result Channel (推送回 WS)
时序图
客户端 Master Subagent
│ │ │
│ ① WS/HTTP 提交任务 │
├──────────────────>│ │
│ │ │
│ │ ② 启动 goroutine │
│ │ ├──────────────────>│
│ │ │ ③ 执行任务
│ │<─────────────────────│ (调用 LLM)
│ │ │
│ │ │ ④ 结果发送到 channel
│ │<─────────────────────┘
│ ⑤ 从 channel 接收结果 │
├──────────────────>│ │
│ │ │
│ ⑥ WS 推送结果 │
├──────────────────>│ │
│ │ │
完整代码实现
go// main.go package main import ( "context" "encoding/json" "fmt" "log" "net/http" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/google/uuid" "github.com/go-redis/redis/v8" ) // ==================== 配置 ==================== type Config struct { WSPort int HTTPPort int RedisAddr string MaxGoroutines int // 最大并发 goroutine 数 TaskTimeout time.Duration } var config = Config{ WSPort: 8000, HTTPPort: 8080, RedisAddr: "localhost:6379", MaxGoroutines: 100, // 最多 100 个并发任务 TaskTimeout: 5 * time.Minute, } // ==================== 数据结构 ==================== type TaskRequest struct { Subagent string `json:"subagent"` Task string `json:"task"` Timeout int `json:"timeout,omitempty"` } type TaskResult struct { TaskID string `json:"task_id"` Status string `json:"status"` // success, failed, timeout Result string `json:"result,omitempty"` Error string `json:"error,omitempty"` Duration int64 `json:"duration_ms"` } type WebSocketMessage struct { Type string `json:"type"` // task_result, status_update Data any `json:"data"` } // ==================== Master Agent ==================== type MasterAgent struct { router *gin.Engine redis *redis.Client resultChannels map[string]chan TaskResult // task_id -> result channel wsConns map[string]*websocket.Conn // client_id -> connection mu sync.RWMutex wg sync.WaitGroup // 等待所有 goroutine 完成 semaphore chan struct{} // 控制并发数 } func NewMasterAgent() *MasterAgent { rdb := redis.NewClient(&redis.Options{ Addr: config.RedisAddr, }) return &MasterAgent{ redis: rdb, resultChannels: make(map[string]chan TaskResult), wsConns: make(map[string]*websocket.Conn), semaphore: make(chan struct{}, config.MaxGoroutines), } } // ==================== HTTP 任务提交 ==================== func (m *MasterAgent) submitTaskHandler(c *gin.Context) { var req TaskRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(400, gin.H{"error": err.Error()}) return } taskID := uuid.New().String() log.Printf("[HTTP] 任务提交: %s -> %s", taskID, req.Subagent) // 使用 WaitGroup 启动 goroutine m.wg.Add(1) // 发送信号(获取 semaphore) m.semaphore <- struct{}{} go func() { defer m.wg.Done() defer func() { <-m.semaphore }() // 释放 semaphore // 执行任务(含超时控制) ctx, cancel := context.WithTimeout(context.Background(), config.TaskTimeout) defer cancel() result := m.executeTask(ctx, taskID, req.Subagent, req.Task) // 发送结果到 channel m.mu.RLock() resultChan, exists := m.resultChannels[taskID] m.mu.RUnlock() if exists { resultChan <- result } else { log.Printf("[ERROR] 结果 channel 不存在: %s", taskID) } }() c.JSON(200, gin.H{ "task_id": taskID, "status": "submitted", "message": "任务已提交,正在处理中...", }) } // ==================== WebSocket 处理 ==================== func (m *MasterAgent) websocketHandler(c *gin.Context) { var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true // 生产环境应该验证 origin }, } conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Printf("[WS] 升级失败: %v", err) return } clientID := uuid.New().String() log.Printf("[WS] 客户端连接: %s", clientID) m.mu.Lock() m.wsConns[clientID] = conn m.mu.Unlock() // 处理 WebSocket 消息(可用于任务提交) go m.handleWebSocketMessages(clientID, conn) // 监听该客户端的结果并推送 go m.listenAndPushResults(clientID, conn) } func (m *MasterAgent) handleWebSocketMessages(clientID string, conn *websocket.Conn) { defer conn.Close() for { var msg WebSocketMessage err := conn.ReadJSON(&msg) if err != nil { log.Printf("[WS] 读取失败: %v", err) break } if msg.Type == "submit_task" { // WebSocket 提交任务(和 HTTP 一样) data, _ := json.Marshal(msg.Data) var req TaskRequest json.Unmarshal(data, &req) taskID := uuid.New().String() log.Printf("[WS] 任务提交: %s -> %s", taskID, req.Subagent) // 创建结果 channel resultChan := make(chan TaskResult, 1) m.mu.Lock() m.resultChannels[taskID] = resultChan m.wsConns[taskID] = conn // 关联 task 到连接 m.mu.Unlock() m.wg.Add(1) m.semaphore <- struct{}{} go func() { defer m.wg.Done() defer func() { <-m.semaphore }() ctx, cancel := context.WithTimeout(context.Background(), config.TaskTimeout) defer cancel() result := m.executeTask(ctx, taskID, req.Subagent, req.Task) resultChan <- result }() // 立即返回 task_id conn.WriteJSON(WebSocketMessage{ Type: "task_accepted", Data: gin.H{ "task_id": taskID, "status": "queued", }, }) } else if msg.Type == "ping" { // 心跳 conn.WriteJSON(WebSocketMessage{ Type: "pong", }) } } } func (m *MasterAgent) listenAndPushResults(clientID string, conn *websocket.Conn) { // 等待这个客户端的所有任务结果 // 实际实现中应该监听 resultChannels m.mu.RLock() conn, exists := m.wsConns[clientID] m.mu.RUnlock() if !exists { return } // 发送心跳 ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: conn.WriteJSON(WebSocketMessage{ Type: "heartbeat", }) } } } // ==================== Subagent 执行(使用 channel 和 WaitGroup)==================== func (m *MasterAgent) executeTask(ctx context.Context, taskID, subagentName, task string) TaskResult { startTime := time.Now() log.Printf("[SUBAGENT-%s] 开始执行: %s", subagentName, taskID) // 根据超时检查 select { case <-ctx.Done(): return TaskResult{ TaskID: taskID, Status: "timeout", Error: "任务执行超时", Duration: time.Since(startTime).Milliseconds(), } default: // 执行具体的 subagent result := m.runSubagent(subagentName, task) duration := time.Since(startTime).Milliseconds() result.Duration = duration log.Printf("[SUBAGENT-%s] 完成: %s (%dms)", subagentName, taskID, duration) return result } } func (m *MasterAgent) runSubagent(name string, task string) TaskResult { switch name { case "code_agent": return m.codeAgent(task) case "math_agent": return m.mathAgent(task) case "research_agent": return m.researchAgent(task) default: return TaskResult{ Status: "failed", Error: fmt.Sprintf("unknown subagent: %s", name), } } // ==================== 具体 Subagent 实现 ==================== func (m *MasterAgent) codeAgent(task string) TaskResult { // 调用 OpenAI API // 简单任务用 gpt-4o-mini,复杂任务用 gpt-4o model := "gpt-4o-mini" if len(task) > 200 { model = "gpt-4o" } log.Printf("[CODE-AGENT] 使用模型: %s", model) // 实际应该调用 OpenAI API // 这里模拟返回 time.Sleep(2 * time.Second) result := fmt.Sprintf("生成的代码(%s 模型): %s", model, task) return TaskResult{ Status: "success", Result: result, } } func (m *MasterAgent) mathAgent(task string) TaskResult { // 本地计算,不需要调用外部 API // 安全的数学计算 time.Sleep(100 * time.Millisecond) result := "calculated: " + task // 实际应该用更安全的方式,而不是 eval return TaskResult{ Status: "success", Result: result, } } func (m *MasterAgent) researchAgent(task string) TaskResult { // 模拟长时间研究 log.Printf("[RESEARCH-AGENT] 开始研究: %s", task) time.Sleep(5 * time.Second) result := fmt.Sprintf("研究结果: %s", task) return TaskResult{ Status: "success", Result: result, } } // ==================== 结果推送监听 ==================== func (m *MasterAgent) resultBroadcaster() { log.Printf("[BROADCASTER] 结果推送器启动") // 这里使用 Redis Pub/Sub 来广播结果 // 但也可以直接通过 channel 推送 pubsub := m.redis.Subscribe(context.Background(), "subagent:results") ch := pubsub.Channel() for msg := range ch { if msg.Type == "message" { var result TaskResult if err := json.Unmarshal(msg.Payload, &result); err != nil { log.Printf("[ERROR] 解析结果失败: %v", err) continue } log.Printf("[BROADCASTER] 推送结果: %s", result.TaskID) // 找到对应的 WebSocket 连接 m.mu.RLock() conn, exists := m.wsConns[result.TaskID] m.mu.RUnlock() if exists { if err := conn.WriteJSON(WebSocketMessage{ Type: "task_result", Data: result, }); err != nil { log.Printf("[ERROR] 推送失败: %v", err) } else { log.Printf("[BROADCASTER] 结果已推送: %s", result.TaskID) } // 任务完成后可以移除连接 // m.mu.Lock() // delete(m.wsConns, result.TaskID) // m.mu.Unlock() } } } // ==================== 路由设置 ==================== func (m *MasterAgent) setupRoutes() { m.router = gin.Default() // 静态文件(测试页面) m.router.Static("/", "./static") // HTTP 任务提交 m.router.POST("/api/task/submit", m.submitTaskHandler) // WebSocket 端点 m.router.GET("/ws", m.websocketHandler) // 健康检查 m.router.GET("/health", func(c *gin.Context) { c.JSON(200, gin.H{ "status": "ok", "goroutines": len(m.semaphore), "connections": len(m.wsConns), }) } // ==================== 主函数 ==================== func main() { master := NewMasterAgent() master.setupRoutes() // 启动结果广播器 go master.resultBroadcaster() log.Printf("[SERVER] HTTP 服务启动在 :%d", config.HTTPPort) log.Printf("[SERVER] WebSocket 服务启动在 :%d", config.WSPort) // 启动服务器(HTTP 和 WebSocket 在同一个端口) if err := master.router.Run(fmt.Sprintf(":%d", config.HTTPPort)); err != nil { log.Fatal(err) } } /* =================== 静态文件 (static/index.html) ==================== <!DOCTYPE html> <html> <head> <title>Subagent Go 实现</title> <style> body { font-family: Arial, sans-serif; max-width: 1200px; margin: 50px auto; padding: 20px; } .container { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; } .box { border: 1px solid #ddd; border-radius: 8px; padding: 20px; } .box h3 { margin-top: 0; color: #333; } textarea { width: 100%; height: 100px; margin: 10px 0; } button { background: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; } button:hover { background: #45a049; } #status { margin-top: 10px; padding: 10px; border-radius: 4px; } #status.success { background: #d4edda; } #status.error { background: #f8d7da; } #result { background: #f5f5f5; padding: 15px; border-radius: 4px; white-space: pre-wrap; word-break: break-all; } .metrics { margin-top: 20px; padding: 10px; background: #e7f3ff; border-radius: 4px; } </style> </head> <body> <h1>🚀 Subagent 系统(Go 实现)</h1> <div class="container"> <div class="box"> <h3>📝 提交任务</h3> <textarea id="taskInput" placeholder="输入任务..."></textarea> <select id="subagentSelect" style="margin: 10px 0; padding: 5px;"> <option value="code_agent">💻 代码 Agent (GPT-4o-mini)</option> <option value="math_agent">🔢 数学 Agent (本地计算)</option> <option value="research_agent">🔍 研究 Agent (5秒)</option> </select> <div style="margin-top: 10px;"> <button onclick="submitViaHTTP()">通过 HTTP 提交</button> <button onclick="submitViaWS()" style="background: #2196F3;">通过 WebSocket 提交</button> </div> </div> <div class="box"> <h3>📊 系统状态</h3> <div id="status" class="success">就绪</div> <div class="metrics"> <div>活动 Goroutines: <span id="goroutines">0</span></div> <div>WebSocket 连接: <span id="connections">0</span></div> </div> </div> </div> <div class="box" style="grid-column: 1 / -1;"> <h3>📤 结果</h3> <div id="result">等待任务提交...</div> </div> <script> let ws = null; let currentTaskId = null; function connectWS() { ws = new WebSocket("ws://localhost:8000/ws"); ws.onopen = () => { document.getElementById("status").className = "success"; document.getElementById("status").textContent = "✅ WebSocket 已连接"; updateMetrics(); }; ws.onmessage = (event) => { const msg = JSON.parse(event.data); switch(msg.type) { case "task_accepted": currentTaskId = msg.data.task_id; document.getElementById("status").className = "success"; document.getElementById("status").textContent = `✅ 任务已接受: ${msg.data.task_id}`; break; case "task_result": handleResult(msg.data); break; case "heartbeat": // 心跳,可以忽略或显示连接状态 break; } }; ws.onclose = () => { document.getElementById("status").className = "error"; document.getElementById("status").textContent = "❌ WebSocket 已断开"; }; ws.onerror = (error) => { document.getElementById("status").className = "error"; document.getElementById("status").textContent = "❌ WebSocket 错误"; }; } function submitViaHTTP() { const task = document.getElementById("taskInput").value; const subagent = document.getElementById("subagentSelect").value; fetch("http://localhost:8000/api/task/submit", { method: "POST", headers: {"Content-Type": "application/json"}, body: JSON.stringify({subagent, task}) }) .then(res => res.json()) .then(data => { currentTaskId = data.task_id; document.getElementById("status").textContent = `✅ 任务已提交: ${data.task_id}`; // 如果使用 HTTP,需要手动连接 WS 接收结果 if (!ws || ws.readyState !== WebSocket.OPEN) { connectWS(); } }) .catch(err => { document.getElementById("status").className = "error"; document.getElementById("status").textContent = "❌ 提交失败"; }); } function submitViaWS() { if (!ws || ws.readyState !== WebSocket.OPEN) { connectWS(); } const task = document.getElementById("taskInput").value; const subagent = document.getElementById("subagentSelect").value; ws.send(JSON.stringify({ type: "submit_task", data: {subagent, task} })); } function handleResult(result) { const statusDiv = document.getElementById("status"); const resultDiv = document.getElementById("result"); if (result.status === "success") { statusDiv.className = "success"; statusDiv.textContent = `✅ 任务完成 (${result.duration_ms}ms)`; resultDiv.textContent = result.result; } else if (result.status === "timeout") { statusDiv.className = "error"; statusDiv.textContent = "⏰ 任务超时"; resultDiv.textContent = result.error; } else { statusDiv.className = "error"; statusDiv.textContent = "❌ 任务失败"; resultDiv.textContent = result.error; } } function updateMetrics() { fetch("http://localhost:8000/health") .then(res => res.json()) .then(data => { document.getElementById("goroutines").textContent = data.goroutines; document.getElementById("connections").textContent = data.connections; }); } // 页面加载时连接 WebSocket window.onload = () => { connectWS(); setInterval(updateMetrics, 5000); }; </script> </body> </html> */ /* =================== 启动方式 ==================== # 1. 启动 Redis docker run -d -p 6379:6379 redis # 2. 启动服务(Go) go run main.go # 3. 测试 浏览器访问 http://localhost:8000 =================== Go 方案的核心优势 ==================== ✅ 原生并发:goroutine(轻量级线程) ✅ Channel 通信:类型安全,内置同步 ✅ WaitGroup:等待一组 goroutine 完成 ✅ Semaphore:控制并发数量 ✅ Context:超时控制和取消 ✅ 无需中间件:不需要 Redis Queue ✅ 性能:单进程处理大量并发 =================== 数据流 ==================== 方案 1:HTTP 提交 Browser HTTP POST → Master → (Semaphore 控制) → Goroutine → 执行任务 → Channel → 结果推送到 Redis Pub → Master 订阅 → 找到 WS 连接 → 推送结果 方案 2:WS 提交 Browser WS → Master → (Semaphore 控制) → Goroutine → 执行任务 → Channel → 直接推送到 WS(无需 Redis Pub) */
七、Python 实现方案(asyncio 模式)
方案特点
- 并发模型: asyncio + 协程
- 适用场景: I/O 密集型任务
- 任务时长: < 30 秒
架构图
┌────────────┐
│ Browser │
│ │
│ WS/HTTP │
└─────┬─────┘
↓
┌──────────────────────────┐
│ FastAPI (asyncio) │
│ │
│ ┌────────────────────┐ │
│ │ Task Coordinator │ │
│ │ + WS Manager │ │
│ └────────┬───────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ Async Task Pool │ │
│ │ - asyncio tasks │ │
│ └──────────────────────┘ │
└───────────────┼──────────────┘
│
↓
(可选) Redis Pub/Sub
完整代码实现
python# main.py from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import HTMLResponse from fastapi.concurrency import asyncio from typing import Dict, Optional, Set import uuid import time import os app = FastAPI() # ==================== 配置 ==================== class Settings: WS_HOST: str = "0.0.0.0" WS_PORT: int = 8000 MAX_CONCURRENT_TASKS: int = 50 # 最大并发任务数 TASK_TIMEOUT: int = 300 # 5 分钟 settings = Settings() # ==================== 数据结构 ==================== from pydantic import BaseModel class TaskRequest(BaseModel): subagent: str task: str timeout: Optional[int] = None class TaskResult(BaseModel): task_id: str status: str # success, failed, timeout result: Optional[str] = None error: Optional[str] = None duration_ms: int class WSMessage(BaseModel): type: str # task_result, status_update, heartbeat data: dict # ==================== Async Master Agent ==================== class AsyncMasterAgent: def __init__(self): # 活跃的 WebSocket 连接 self.active_connections: Set[WebSocket] = set() # 任务结果存储(task_id -> result) self.task_results: Dict[str, TaskResult] = {} # 并发控制(Semaphore) self.semaphore = asyncio.Semaphore(settings.MAX_CONCURRENT_TASKS) # 子任务方法映射 self.subagents = { "code_agent": self.code_agent, "math_agent": self.math_agent, "research_agent": self.research_agent, } async def submit_task(self, task_req: TaskRequest, ws: Optional[WebSocket] = None) -> str: """提交任务并返回 task_id""" task_id = str(uuid.uuid4()) print(f"[MASTER] 任务提交: {task_id} -> {task_req.subagent}") # 保存任务结果(初始状态) self.task_results[task_id] = TaskResult( task_id=task_id, status="pending" ) # 如果有 WS 连接,发送确认 if ws: await ws.send_json(WSMessage( type="task_accepted", data={"task_id": task_id, "status": "queued"} )) # 异步执行任务(带并发控制) asyncio.create_task(self._execute_with_semaphore(task_id, task_req)) return task_id async def _execute_with_semaphore(self, task_id: str, task_req: TaskRequest): """使用 semaphore 控制并发""" async with self.semaphore: try: # 执行任务(带超时) result = await asyncio.wait_for( self._run_subagent(task_req.subagent, task_req.task), timeout=settings.TASK_TIMEOUT ) # 保存结果 self.task_results[task_id] = result print(f"[MASTER] 任务完成: {task_id}") # 通知所有监听该结果的 WS await self._notify_result(task_id, result) except asyncio.TimeoutError: result = TaskResult( task_id=task_id, status="timeout", error="任务执行超时" ) self.task_results[task_id] = result await self._notify_result(task_id, result) except Exception as e: result = TaskResult( task_id=task_id, status="failed", error=str(e) ) self.task_results[task_id] = result await self._notify_result(task_id, result) async def _run_subagent(self, subagent_name: str, task: str) -> TaskResult: """运行具体的 subagent""" print(f"[SUBAGENT-{subagent_name}] 开始执行") start_time = time.time() # 调用对应的子任务 method = self.subagents.get(subagent_name) if method: if asyncio.iscoroutinefunction(method): result = await method(task) else: result = method(task) # 同步函数 else: result = TaskResult( status="failed", error=f"Unknown subagent: {subagent_name}" ) duration_ms = int((time.time() - start_time) * 1000) result.task_id = subagent_name # 临时 result.duration_ms = duration_ms print(f"[SUBAGENT-{subagent_name}] 完成 ({duration_ms}ms)") return result async def _notify_result(self, task_id: str, result: TaskResult): """通知所有监听该任务的 WebSocket 连接""" # 向所有活跃连接广播结果 # 实际生产环境应该只发送给相关的连接 message = WSMessage( type="task_result", data=result.dict() ) # 并发发送给所有连接 if self.active_connections: await asyncio.gather(*[ ws.send_json(message) for ws in self.active_connections ], return_exceptions=True) # ==================== Subagent 实现 ==================== async def code_agent(self, task: str) -> TaskResult: """代码 Subagent(调用 OpenAI)""" # 根据任务复杂度选择模型 if len(task) < 200: model = "gpt-4o-mini" # 便宜 else: model = "gpt-4o" # 强力 print(f"[CODE-AGENT] 使用模型: {model}") # 调用 OpenAI API # import openai # client = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) # response = await client.chat.completions.create( # model=model, # messages=[{"role": "user", "content": task}] # ) # result = response.choices[0].message.content # 模拟 await asyncio.sleep(2) return TaskResult( status="success", result=f"生成的代码(模型: {model}): {task}" ) def math_agent(self, task: str) -> TaskResult: """数学 Subagent(同步计算)""" import ast try: result = ast.literal_eval(task) return TaskResult( status="success", result=str(result) ) except: return TaskResult( status="failed", error="Invalid math expression" ) async def research_agent(self, task: str) -> TaskResult: """研究 Subagent(模拟长时间任务)""" print(f"[RESEARCH-AGENT] 开始研究: {task}") await asyncio.sleep(5) return TaskResult( status="success", result=f"研究结果: {task}" ) # ==================== 全局 Master 实例 ==================== master = AsyncMasterAgent() # ==================== HTTP 端点 ==================== @app.post("/api/task/submit") async def submit_task(task_req: TaskRequest): """HTTP 任务提交端点""" task_id = await master.submit_task(task_req) return { "task_id": task_id, "status": "submitted", "message": "任务已提交" } @app.get("/api/task/{task_id}") async def get_task_result(task_id: str): """查询任务结果""" result = master.task_results.get(task_id) if not result: raise HTTPException(status_code=404, detail="Task not found") return result # ==================== WebSocket 端点 ==================== @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket 端点""" await websocket.accept() master.active_connections.add(websocket) client_id = str(uuid.uuid4()) print(f"[WS] 客户端连接: {client_id}") try: while True: # 接收消息 data = await websocket.receive_json() msg_type = data.get("type") if msg_type == "submit_task": # WebSocket 提交任务 task_data = data.get("data", {}) task_req = TaskRequest(**task_data) task_id = await master.submit_task(task_req, websocket) elif msg_type == "ping": # 心跳 await websocket.send_json({"type": "pong"}) elif msg_type == "get_result": # 查询任务结果 task_id = data.get("task_id") result = master.task_results.get(task_id) if result: await websocket.send_json(WSMessage( type="task_result", data=result.dict() )) except WebSocketDisconnect: print(f"[WS] 客户端断开: {client_id}") finally: master.active_connections.remove(websocket) # ==================== 静态文件 ==================== @app.get("/") async def get_homepage(): """返回测试页面""" return HTMLResponse(content=""" <!DOCTYPE html> <html> <head> <title>Subagent Python (asyncio)</title> <style> body { font-family: Arial, sans-serif; max-width: 1200px; margin: 50px auto; padding: 20px; } .container { display: grid; grid-template-columns: 1fr 1fr; gap: 20px; } .box { border: 1px solid #ddd; border-radius: 8px; padding: 20px; } textarea { width: 100%%; height: 100px; margin: 10px 0; } button { background: #4CAF50; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; margin: 5px; } select { padding: 5px; margin: 10px 0; } #result { background: #f5f5f5; padding: 15px; border-radius: 4px; white-space: pre-wrap; min-height: 100px; } </style> </head> <body> <h1>🚀 Subagent 系统(Python asyncio)</h1> <div class="container"> <div class="box"> <h3>📝 提交任务</h3> <textarea id="taskInput" placeholder="输入任务..."></textarea> <select id="subagentSelect"> <option value="code_agent">💻 代码 Agent</option> <option value="math_agent">🔢 数学 Agent</option> <option value="research_agent">🔍 研究 Agent</option> </select> <div> <button onclick="submitViaHTTP()">通过 HTTP 提交</button> <button onclick="submitViaWS()" style="background: #2196F3;">通过 WS 提交</button> </div> </div> <div class="box"> <h3>📤 结果</h3> <div id="result">等待任务提交...</div> </div> </div> <script> let ws = null; function connectWS() { ws = new WebSocket("ws://localhost:8000/ws"); ws.onopen = () => { console.log("WebSocket 已连接"); }; ws.onmessage = (event) => { const msg = JSON.parse(event.data); console.log("收到消息:", msg); if (msg.type === "task_result") { handleResult(msg.data); } else if (msg.type === "task_accepted") { document.getElementById("result").textContent = `任务已接受: ${msg.data.task_id}`; } }; ws.onclose = () => { console.log("WebSocket 已断开"); }; } function submitViaHTTP() { const task = document.getElementById("taskInput").value; const subagent = document.getElementById("subagentSelect").value; fetch("http://localhost:8000/api/task/submit", { method: "POST", headers: {"Content-Type": "application/json"}, body: JSON.stringify({subagent, task}) }) .then(res => res.json()) .then(data => { console.log("任务已提交:", data); // 连接 WS 接收结果 if (!ws || ws.readyState !== WebSocket.OPEN) { connectWS(); } document.getElementById("result").textContent = `任务已提交: ${data.task_id}`; }) .catch(err => { console.error("提交失败:", err); document.getElementById("result").textContent = "提交失败: " + err; }); } function submitViaWS() { if (!ws || ws.readyState !== WebSocket.OPEN) { connectWS(); } const task = document.getElementById("taskInput").value; const subagent = document.getElementById("subagentSelect").value; ws.send(JSON.stringify({ type: "submit_task", data: {subagent, task} })); } function handleResult(result) { const resultDiv = document.getElementById("result"); if (result.status === "success") { resultDiv.textContent = `✅ 完成 (${result.duration_ms}ms)\\n\\n${result.result}`; } else { resultDiv.textContent = `❌ ${result.status}\\n\\n${result.error || result.result}`; } } // 页面加载时连接 window.onload = () => connectWS(); </script> </body> </html> """) # ==================== 启动 ==================== if __name__ == "__main__": import uvicorn print("[SERVER] 启动在 http://0.0.0.0:8000") uvicorn.run(app, host="0.0.0.0", port=8000) /* =================== Python asyncio 方案的核心优势 ==================== ✅ 原生异步:asyncio(Python 3.5+) ✅ 并发控制:Semaphore(限制并发数) ✅ 类型安全:完整的类型提示支持 ✅ 超时控制:asyncio.wait_for ✅ 生态系统:丰富的异步库(aiohttp, aiomysql 等) ✅ 内存效率:比多进程更轻量 =================== 数据流 ==================== WS 提交: Browser WS → Master → asyncio.Task → Subagent → Channel → Master → WS 推送 HTTP 提交: Browser HTTP → Master → asyncio.Task → Subagent → Channel → Master → (需要 WS) → WS 推送 */
八、Hyperf 实现方案(协程模式)
方案特点
- 并发模型: Swoole 协程
- 适用场景: PHP 高并发应用
- 任务时长: < 30 秒
架构图
┌────────────┐
│ Browser │
│ │
│ WS/HTTP │
└─────┬─────┘
↓
┌──────────────────────────┐
│ Hyperf Server/Task │
│ │
│ ┌────────────────────┐ │
│ │ Task Worker Pool │ │
│ │ - 协程 1 │ │
│ │ - 协程 2 │ │
│ │ - 协程 3 │ │
│ └──────────────────────┘ │
└───────────────┼──────────────┘
│
↓
(通过内存 channel 推送回 WS)
完整代码实现
php<?php declare(strict_types=1); namespace App\Service; use Hyperf\Di\Annotation\Inject; use Hyperf\Context\Context; use Hyperf\WebSocketServer\Socket; use Hyperf\WebSocketServer\Message; use function Hyperf\Engine\coroutine; class SubagentService { #[Inject] private \Hyperf\Contract\StdoutLoggerInterface $logger; private array $taskResults = []; private array $activeConnections = []; /** * 提交任务到 Task Worker */ public function submitTask(string $subagent, string $task, string $taskId): void { $this->logger->info(sprintf("[MASTER] 任务提交: %s -> %s", $taskId, $subagent)); // 使用 defer 并行执行(不阻塞主线程) coroutine(function () use ($subagent, $task, $taskId) { $startTime = microtime(true); try { // 执行具体的 subagent $result = $this->runSubagent($subagent, $task); $duration = (microtime(true) - $startTime) * 1000; // 保存结果 $this->taskResults[$taskId] = [ 'task_id' => $taskId, 'status' => $result['status'], 'result' => $result['result'] ?? null, 'error' => $result['error'] ?? null, 'duration_ms' => $duration, ]; // 推送结果到所有连接的 WebSocket $this->broadcastResult($taskId, $this->taskResults[$taskId]); } catch (\Throwable $e) { $this->logger->error(sprintf("[MASTER] 任务失败: %s - %s", $taskId, $e->getMessage())); $this->taskResults[$taskId] = [ 'task_id' => $taskId, 'status' => 'failed', 'error' => $e->getMessage(), ]; } }); } /** * 运行具体的 Subagent */ private function runSubagent(string $subagent, string $task): array { $this->logger->info(sprintf("[SUBAGENT-%s] 开始执行", $subagent)); switch ($subagent) { case 'code_agent': return $this->codeAgent($task); case 'math_agent': return $this->mathAgent($task); case 'research_agent': return $this->researchAgent($task); default: return [ 'status' => 'failed', 'error' => sprintf("Unknown subagent: %s", $subagent) ]; } } /** * 代码 Subagent */ private function codeAgent(string $task): array { // 调用 OpenAI API(使用 Hyperf 的 HTTP 客户端) // $client = make(OpenAIClient::class); // $response = $client->chat($task); // 模拟 API 调用 coroutine(function () use ($task) { \Swoole\Coroutine::sleep(2); // 2秒 }); $model = strlen($task) < 200 ? 'gpt-4o-mini' : 'gpt-4o'; return [ 'status' => 'success', 'result' => sprintf("生成的代码(%s 模型): %s", $model, $task) ]; } /** * 数学 Subagent */ private function mathAgent(string $task): array { // 本地计算 if (!preg_match('/^[\d\s\+\-\*\/\(\)\.]+$/', $task)) { return [ 'status' => 'failed', 'error' => 'Invalid math expression' ]; } // 安全的数学计算(生产环境应该用专门的库) try { $result = eval("return $task;"); return [ 'status' => 'success', 'result' => (string)$result ]; } catch (\Throwable $e) { return [ 'status' => 'failed', 'error' => $e->getMessage() ]; } } /** * 研究 Subagent */ private function researchAgent(string $task): array { $this->logger->info(sprintf("[RESEARCH-AGENT] 开始研究: %s", $task)); // 模拟研究(5秒) \Swoole\Coroutine::sleep(5); return [ 'status' => 'success', 'result' => sprintf("研究结果: %s", $task) ]; } /** * 广播结果到所有 WebSocket 连接 */ private function broadcastResult(string $taskId, array $result): void { $this->logger->info(sprintf("[BROADCAST] 广播结果: %s", $taskId)); // 在实际 Hyperf 中,应该遍历所有连接并发送 // 这里简化实现 // 使用内存 channel 或者直接推送到连接 foreach ($this->activeConnections as $conn) { $conn->push(json_encode([ 'type' => 'task_result', 'data' => $result ])); } } } // ==================== WebSocket 控制器 ==================== declare(strict_types=1); namespace App\Controller; use Hyperf\Di\Annotation\Inject; use Hyperf\WebSocketServer\Socket; use Hyperf\Contract\StdoutLoggerInterface; use App\Service\SubagentService; class WebSocketController { #[Inject] protected SubagentService $subagentService; #[Inject] protected \Hyperf\Contract\StdoutLoggerInterface $logger; /** * WebSocket 端点 */ public function onMessage(Socket $socket, Message $message): void { $this->logger->info(sprintf("[WS] 收到消息: %s", $message->getData())); $data = json_decode($message->getData(), true); if (isset($data['type']) && $data['type'] === 'submit_task') { $taskData = $data['data'] ?? []; // 生成唯一 task ID $taskId = uniqid('', more_entropy=true); // 提交任务 $this->subagentService->submitTask( $taskData['subagent'] ?? '', $taskData['task'] ?? '', $taskId ); // 发送确认 $socket->push(json_encode([ 'type' => 'task_accepted', 'data' => [ 'task_id' => $taskId, 'status' => 'queued' ] ])); } elseif (isset($data['type']) && $data['type'] === 'ping') { // 心跳 $socket->push(json_encode(['type' => 'pong'])); } } public function OnOpen(Socket $socket): void { $this->logger->info("[WS] 客户端连接"); } public function OnClose(Socket $socket): void { $this->logger->info("[WS] 客户端断开"); } } /* =================== 配置文件 ==================== // config/autoload/server.php return [ 'mode' => SWOOLE_PROCESS_BASE, 'servers' => [ [ 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 9501, 'settings' => [ 'open_websocket_ping_interval' => 30, // 心跳间隔 'open_websocket_ping_timeout' => 60, ], ], ], ]; // config/autoload/dependencies.php return [ 'dependencies' => [ App\Service\SubagentService::class, ], ]; =================== 启动方式 ==================== # 1. 启动 Hyperf Server php bin/hyperf.php start # 2. 测试 浏览器访问 http://localhost:9501 =================== Hyperf 方案的核心优势 ==================== ✅ 协程:Swoole 协程(用户态并发) ✅ Task Worker:内置任务调度 ✅ 并发性能:比 PHP-FPM 高很多 ✅ 内存管理:协程栈轻量 ✅ 兼容性:支持现有 PHP 生态 =================== 数据流 ==================== Browser WS → Hyperf Task Worker → 协程执行 Subagent → 结果通过内存 Channel → Hyperf WS → Browser 推送 */
九、方案选型总结
Go/Python/Hyperf 协程方案对比
| 维度 | Go goroutine | Python asyncio | Hyperf 协程 |
|---|---|---|---|
| 并发性能 | 最高(2KB 栈) | 高 | 高 |
| 内存效率 | 最高 | 中 | 高 |
| 开发效率 | 中 | 高 | 中 |
| 生态成熟度 | 中 | 最高(异步库丰富) | 高 |
| 适用团队 | Go 团队 | Python 团队 | PHP 团队 |
选型决策树
开始
│
├─ 单次 LLM 调用(通常 < 5秒)?
│ └─ 是 → 考虑同步调用(开发最简单)
│
├─ 需要任务持久化?
│ └─ 是 → 建议队列系统(Celery/Redis Queue)
│
├─ 需要分布式部署?
│ └─ 是 → 建议队列系统
│
├─ 需要自动重试/死信队列?
│ └─ 是 → 建议队列系统
│
├─ 任务时长通常 > 30秒?
│ └─ 是 → 建议队列系统(可靠性优先)
│
└─ 中等时长(约 5-30秒)+ 单机?
└─ 是 → 考虑协程/异步方案(性能优先)
典型场景推荐
| 场景 | 建议方案 | 理由 |
|---|---|---|
| 个人项目/演示 | 同步调用 | 最快开发,单次 LLM 调用 |
| 内部工具(通常 < 10 人) | 协程 | 性能通常足够,运维简单 |
| SaaS 产品(通常 > 100 人) | 队列系统 | 可靠性通常优先 |
| 金融/支付系统 | 队列系统 + 监控 | 通常需要完整保障 |
| 实时聊天/Games | 协程 + WS | 通常优先考虑低延迟 |
| 复杂工作流(多步骤) | 队列系统 | 通常需要持久化和重试 |
| 维度 | Go 方案 | Python asyncio | Hyperf |
|---|---|---|---|
| 并发模型 | goroutine | asyncio | Swoole 协程 |
| 调度控制 | WaitGroup + Channel | asyncio | Task Worker |
| 内存效率 | 最高(2KB/栈) | 中 | 高 |
| 性能 | 最高 | 高 | 高 |
| 开发效率 | 中 | 高 | 中 |
| 学习曲线 | 陡 | 中 | 中 |
| 生态成熟度 | 中 | 最高 | 高 |
| 团队技能 | Go 团队 | Python 团队 | PHP 团队 |
十、通用最佳实践
1. WebSocket 心跳
所有语言都应该实现心跳机制:
javascript// 客户端 setInterval(() => { ws.send(JSON.stringify({type: "ping"})); }, 30000); // 每 30 秒 // 服务端 if (msg.type === "ping") { ws.send(JSON.stringify({type: "pong"})); }
2. 并发控制
Go: Semaphore + Channel Python: asyncio.Semaphore Hyperf: Task Worker 数量限制
3. 超时处理
Go: context.WithTimeout Python: asyncio.wait_for Hyperf: Task Worker timeout 配置
4. 错误处理
go// 所有语言都应该 1. 捕获 panic/exception 2. 记录详细日志 3. 返回用户友好的错误信息 4. 不泄露内部状态
5. 监控指标
go// 应该监控的指标 - 当前活跃 goroutines/协程数 - 任务队列长度 - 平均任务执行时间 - 超时率 - 错误率 - WebSocket 连接数
文档版本:5.1 | 更新时间:2026-02-13