跳转至

Item 13: Think of Workflows as Directed Graphs with Superstep Synchronization

Workflow 是有向图,不是流水线。理解 superstep(超步)同步语义是关键。

核心心智模型

Superstep 1:  所有节点并行执行
              A: set_state("x", 1)  ─┐
              B: get_state("x") = ?  ─┼─ 本 superstep 内互相看不到
              C: get_state("x") = ?  ─┘

Superstep 边界:状态提交

Superstep 2:  所有节点看到提交的值
              A: get_state("x") = 1
              B: get_state("x") = 1
              C: get_state("x") = 1

构建工作流

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler, executor

class UpperCase(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message(text.upper())

@executor(id="reverse")
async def reverse(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.yield_output(text[::-1])

workflow = (
    WorkflowBuilder(start_executor=UpperCase())
    .add_edge(UpperCase(), reverse)
    .build()
)

result = await workflow.run("hello")

三种边类型

边类型 含义 场景
SingleEdge 一对一 管道式处理
FanOutEdgeGroup 一对多 广播任务
FanInEdgeGroup 多对一 收集结果
SwitchCaseEdgeGroup 条件路由 分支处理

Things to Remember

  • Workflow = Executor 图 + Edge 连接
  • Superstep 内 set_state() 的值,同 superstep 内其他节点看不到
  • ctx.send_message() 发给下游,ctx.yield_output() 产出最终结果
  • Edge 决定数据流向,不是执行顺序