跳转至

Item 10: Use Agent Pipeline to Chain Multiple Agents Together

问题

你想让多个 Agent 协作完成复杂任务,但不知道怎么做:

# 你想做:翻译 → 总结 → 分析
translator = Agent(client=client, name="Translator", ...)
summarizer = Agent(client=client, name="Summarizer", ...)
analyzer = Agent(client=client, name="Analyzer", ...)

# 但你怎么让它们顺序执行?
result1 = await translator.run(text)        # 第一步
result2 = await summarizer.run(result1.text) # 第二步
result3 = await analyzer.run(result2.text)   # 第三步
# 这样写可以,但不够优雅,错误处理麻烦

或者你想让一个 Agent 判断下一步该调用哪个 Agent:

# 你想让 Router Agent 自动决定调用哪个 Worker
router = Agent(client=client, name="Router", ...)

# 但怎么让 Router 调用其他 Agent?
# Agent 不能直接调用其他 Agent

深入解释

Agent Pipeline = 多个 Agent 按顺序/条件协作

┌─────────────────────────────────────────────────────────────┐
│              单 Agent(无法处理复杂任务)                       │
├─────────────────────────────────────────────────────────────┤
│  User Input → Agent → Response                              │
│                                                              │
│  适合:简单、独立的任务                                        │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│              Agent Pipeline(多 Agent 协作)                  │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  User Input → Router → [Translator] → [Summarizer] → ...   │
│                      ↓                                       │
│                   Worker 1                                   │
│                      ↓                                       │
│                   Worker 2                                   │
│                      ↓                                       │
│                   Response                                    │
│                                                              │
│  适合:复杂任务、需要多角色协作                               │
└─────────────────────────────────────────────────────────────┘

心智模型:Pipeline 就像工厂流水线——原材料(User Input)经过多个工序(Agent)加工,最终产出成品(Response),每个工序专注做一件事。

推荐做法

顺序 Pipeline

async def pipeline_sequential(user_input: str) -> str:
    """顺序执行多个 Agent"""

    # Step 1: 翻译
    translator = Agent(
        client=client,
        name="Translator",
        instructions="将文本翻译成英文,保持专业术语准确。"
    )
    translated = await translator.run(user_input)

    # Step 2: 总结
    summarizer = Agent(
        client=client,
        name="Summarizer",
        instructions="用简洁的语言总结文本要点,不超过 3 句话。"
    )
    summarized = await summarizer.run(translated.text)

    # Step 3: 分析
    analyzer = Agent(
        client=client,
        name="Analyzer",
        instructions="分析文本,给出专业评价和建议。"
    )
    analyzed = await analyzer.run(summarized.text)

    return analyzed.text


# 使用
result = await pipeline_sequential("你的长文本内容...")

带 Session 共享的 Pipeline

async def pipeline_with_session(user_input: str) -> str:
    """Pipeline 共享 Session,让 Agent 感知完整上下文"""

    # 创建共享 Session
    session = AgentSession()

    translator = Agent(client=client, name="Translator", instructions="翻译成英文...")
    summarizer = Agent(client=client, name="Summarizer", instructions="总结上文...")
    analyzer = Agent(client=client, name="Analyzer", instructions="分析上文...")

    # 同一个 Session,所有 Agent 共享历史
    t_result = await translator.run(user_input, session=session)
    s_result = await summarizer.run(t_result.text, session=session)
    a_result = await analyzer.run(s_result.text, session=session)

    return a_result.text

Router Agent(条件路由)

from enum import Enum

class RequestType(Enum):
    TRANSLATION = "translation"
    SUMMARY = "summary"
    ANALYSIS = "analysis"
    OTHER = "other"

async def router_pipeline(user_input: str) -> str:
    """Router Agent 判断请求类型,路由到对应 Worker"""

    # Router 负责分类
    router = Agent(
        client=client,
        name="Router",
        instructions="""
        分析用户输入,判断属于哪种类型:
        - translation: 需要翻译
        - summary: 需要总结
        - analysis: 需要分析
        - other: 其他

        只输出一个词:translation/summary/analysis/other
        """
    )

    request_type_text = await router.run(user_input)
    request_type = request_type_text.text.strip().lower()

    # 根据类型路由到不同 Worker
    if request_type == "translation":
        worker = Agent(client=client, name="Translator", instructions="翻译成英文...")
        result = await worker.run(user_input)
    elif request_type == "summary":
        worker = Agent(client=client, name="Summarizer", instructions="总结要点...")
        result = await worker.run(user_input)
    elif request_type == "analysis":
        worker = Agent(client=client, name="Analyzer", instructions="深度分析...")
        result = await worker.run(user_input)
    else:
        default_agent = Agent(client=client, name="Default", instructions="通用回答...")
        result = await default_agent.run(user_input)

    return result.text

并行 Pipeline

async def pipeline_parallel(user_input: str) -> str:
    """多个 Agent 并行处理,返回组合结果"""

    # 三个 Agent 并行执行
    translator = Agent(client=client, name="Translator", instructions="翻译成英文...")
    summarizer = Agent(client=client, name="Summarizer", instructions="总结...")
    sentiment = Agent(client=client, name="Sentiment", instructions="分析情感...")

    # 并行启动
    t_task = asyncio.create_task(translator.run(user_input))
    s_task = asyncio.create_task(summarizer.run(user_input))
    se_task = asyncio.create_task(sentiment.run(user_input))

    # 等待所有完成
    t_result, s_result, se_result = await asyncio.gather(t_task, s_task, se_task)

    # 组合结果
    combiner = Agent(
        client=client,
        name="Combiner",
        instructions="整合多个分析结果为连贯的报告。"
    )

    combined = await combiner.run(
        f"翻译:{t_result.text}\n\n总结:{s_result.text}\n\n情感分析:{se_result.text}"
    )

    return combined.text

好 vs 坏对比

# 坏做法:硬编码顺序,无错误处理
result1 = await translator.run(text)
result2 = await summarizer.run(result1.text)
result3 = await analyzer.run(result2.text)
# 中间任何一步失败,整个 Pipeline 就断了

# 好做法:Pipeline 封装 + 错误处理
async def safe_pipeline(user_input: str) -> str:
    try:
        session = AgentSession()
        translator = Agent(client=client, name="Translator", ...)
        summarizer = Agent(client=client, name="Summarizer", ...)

        result = await translator.run(user_input, session=session)
        if not result.text:
            return "翻译失败"

        summarized = await summarizer.run(result.text, session=session)
        if not summarized.text:
            return "总结失败"

        return summarized.text
    except Exception as e:
        logger.error(f"Pipeline error: {e}")
        return f"处理失败: {e}"

扩展讨论

Pipeline 模式对比

模式 特点 适用场景
Sequential 顺序执行,上一步输出下一步输入 强依赖的步骤
Parallel 并行执行,汇总结果 独立子任务
Router 条件路由到不同分支 多类型请求
Fan-out/Fan-in 分发到多个,结果汇总 批量处理

Pipeline 中间件

class PipelineLoggingMiddleware(Middleware):
    async def on_agent_start(self, agent, input_data, context):
        print(f"[Pipeline] {agent.name} started")

    async def on_agent_end(self, agent, output, context):
        print(f"[Pipeline] {agent.name} ended: {output.text[:50]}...")

    async def on_error(self, agent, error, context):
        print(f"[Pipeline] {agent.name} error: {error}")
        raise error  # Pipeline 通常应该 fail-fast

超时和重试

async def pipeline_with_timeout(user_input: str) -> str:
    try:
        result = await asyncio.wait_for(
            sequential_pipeline(user_input),
            timeout=30.0
        )
        return result
    except asyncio.TimeoutError:
        return "处理超时,请稍后重试"

async def pipeline_with_retry(user_input: str, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            return await sequential_pipeline(user_input)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            logger.warning(f"Attempt {attempt + 1} failed: {e}")
            await asyncio.sleep(1 * (attempt + 1))  # 指数退避

企业级考虑

# 1. Pipeline 配置外部化
class PipelineConfig:
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = yaml.safe_load(f)

    def get_pipeline(self, name: str) -> list[dict]:
        return self.config["pipelines"][name]

# pipelines.yaml
# my_pipeline:
#   - agent: translator
#     timeout: 10
#   - agent: summarizer
#     timeout: 5

# 2. Pipeline 监控
class PipelineMetrics:
    def record_pipeline_duration(self, pipeline_name: str, duration: float):
        # 发送到 Prometheus
        pipeline_duration.labels(pipeline=pipeline_name).observe(duration)

    def record_pipeline_step(self, pipeline_name: str, step: str, success: bool):
        pipeline_steps.labels(pipeline=pipeline_name, step=step, success=success).inc()

Things to Remember

  • Pipeline = 多个 Agent 顺序/条件协作,解决复杂任务
  • Sequential Pipeline:强依赖步骤,上一步输出是下一步输入
  • Parallel Pipeline:独立子任务并行执行,最后汇总
  • Router Pipeline:条件路由,根据输入决定调用哪个 Agent
  • 共享 Session 让多个 Agent 能感知完整上下文
  • Pipeline 需要错误处理(try/except)和超时控制
  • 每个 Agent 在 Pipeline 中应该职责单一
  • Pipeline 失败时应该快速失败(fail-fast),不要传播错误到下游
  • 复杂 Pipeline 应该配置外部化,而不是硬编码