AI Agent 教程

第7章:流式输出 —— 让 Agent 实时汇报进度

一句话:掌握流式和单次两种输出模式,让 Agent 的工作过程可视化。

本章目标

前置知识


7.1 流式 vs 单次 —— 看直播还是看录播?

先讲个故事

想象你请了一个装修工人来装修你的房子。

方式一:看直播 你搬把椅子坐在旁边看。工人砸了一面墙,你马上看到了;工人刷了一面漆,你马上看到了;工人铺了地砖歪了,你马上指出来了。这就是 流式输出(Streaming)

方式二:看录播 你跟工人说"你干完了给我打电话",然后你就出去逛街了。三天后工人打电话说"搞定了",你回来一看 —— 好的坏的,一次性全看到了。这就是 单次输出(Single)

技术上的区别

对比项 流式输出(Streaming) 单次输出(Single)
数据到达方式 一条一条,边产生边发给你 全部完成后,一次性发给你
用户体验 像看直播,能看到"Agent 在想什么" 像等快递,只有"已完成"这一个状态
首字节时间 快!几毫秒就能看到第一个字 慢,要等 Agent 全部跑完
适合场景 交互式 UI、聊天界面、终端工具 后台任务、批处理、API 服务
实现复杂度 稍复杂,要处理多种消息类型 简单,收集完处理就行
类比 看厨师做菜的直播 看菜做完的照片

什么时候用哪个?

经验法则很简单:

好了,概念讲完了,下面来写代码。


7.2 流式输出详解 —— 一边干活一边汇报

query() 返回的是什么?

还记得第4章学的 query() 函数吗?它返回的是一个 异步迭代器(AsyncGenerator)

什么意思呢?就是说,它不会一次性给你所有结果,而是"一条一条往外吐"。你用 for await(TypeScript)或 async for(Python)来逐条接收。

就像自来水管 —— 水是一直在流的,你拿杯子接一杯是一杯,不用等水管里所有的水都出来。

最基础的流式处理

先看 TypeScript 版本:

import { query, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-agent-sdk";

async function streamingDemo() {
  for await (const message of query({
    prompt: "帮我写一首关于编程的打油诗",
    options: {
      allowedTools: [],
      maxTurns: 1,
    },
  })) {
    // 每收到一条消息,就处理一条
    if (message instanceof AssistantMessage) {
      // Agent 说的话
      console.log("[Agent 说]:", message.content);
    } else if (message instanceof ResultMessage) {
      // 最终结果
      console.log("[完成]:", message.subtype);
    }
  }
}

streamingDemo();

Python 版本:

import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage

async def streaming_demo():
    async for message in query(
        prompt="帮我写一首关于编程的打油诗",
        options=ClaudeAgentOptions(
            allowed_tools=[],
            max_turns=1,
        ),
    ):
        if isinstance(message, AssistantMessage):
            print(f"[Agent 说]: {message.content}")
        elif isinstance(message, ResultMessage):
            print(f"[完成]: {message.subtype}")

asyncio.run(streaming_demo())

消息流的顺序

Agent 干活的过程中,消息是按一定顺序流过来的。就像工厂的流水线,有固定的工序:

┌─────────────────────────────────────────────────────┐
│                                                     │
│  1. SystemMessage (init)                            │
│     ↓  "大家好,我是 Agent,已就位"                    │
│                                                     │
│  2. AssistantMessage                                │
│     ↓  "让我想想...我觉得应该先用 Bash 看看文件"       │
│                                                     │
│  3. AssistantMessage (tool_use)                     │
│     ↓  "我要调用 Bash 工具了,参数是 ls -la"           │
│                                                     │
│  4. AssistantMessage (tool_result)                  │
│     ↓  "工具执行结果:total 42..."                    │
│                                                     │
│  5. AssistantMessage                                │
│     ↓  "好的,我看到了文件列表,现在来整理..."          │
│                                                     │
│  6. ResultMessage (success)                         │
│     ↓  "任务完成!"                                   │
│                                                     │
└─────────────────────────────────────────────────────┘

注意中间的 2→3→4→5 可能会重复多次 —— Agent 每调用一次工具就走一轮"思考→调用→拿结果→再思考"的循环。

三种消息类型详解

让我们把三种消息掰开了看:

1. SystemMessage(启动消息)

# SystemMessage 长这样
{
    "type": "system",
    "subtype": "init",
    "session_id": "sess_abc123",        # 会话 ID,后面恢复会话要用
    "tools": ["Read", "Write", "Bash"], # Agent 可以用的工具列表
    "model": "claude-sonnet-4-20250514",  # 使用的模型
}

这条消息在最开始出现,而且只出现一次。就像开会前先做自我介绍:"我是谁、我能干什么、我的工号是多少"。

你应该从这条消息里拿到 session_id,保存起来以备后用(第8章会讲怎么恢复会话)。

2. AssistantMessage(Agent 的回复)

这是最"热闹"的消息类型,Agent 说的话、调的工具、拿到的结果,全在这里。

# 纯文本回复
{
    "type": "assistant",
    "content": [
        {
            "type": "text",
            "text": "让我来帮你分析一下这个代码文件..."
        }
    ],
    "stop_reason": "end_turn",  # 或 "tool_use"
}

# 工具调用
{
    "type": "assistant",
    "content": [
        {
            "type": "text",
            "text": "我需要先看看这个文件的内容。"
        },
        {
            "type": "tool_use",
            "id": "toolu_abc123",
            "name": "Read",
            "input": {
                "file_path": "/path/to/file.py"
            }
        }
    ],
    "stop_reason": "tool_use",  # 注意这里是 tool_use,说明 Agent 还没完
}

注意 stop_reason 字段:

3. ResultMessage(最终结果)

# 成功完成
{
    "type": "result",
    "subtype": "success",
    "stop_reason": "end_turn",
    "cost": {
        "input_tokens": 1523,
        "output_tokens": 456,
    },
    "duration_ms": 3200,
    "session_id": "sess_abc123",
}

# 出错了
{
    "type": "result",
    "subtype": "error",
    "error": "Max turns exceeded",
    "stop_reason": "max_turns",
}

这是最后一条消息,告诉你 Agent 是成功了还是失败了,花了多少 Token、用了多长时间。

完整的流式处理模板

把上面三种消息放在一起,下面是一个你可以直接拿去用的模板:

TypeScript 版本:

import {
  query,
  SystemMessage,
  AssistantMessage,
  ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";

async function processStreaming(prompt: string) {
  let sessionId: string | undefined;

  for await (const message of query({
    prompt,
    options: {
      allowedTools: ["Read", "Bash", "Glob", "Grep"],
      maxTurns: 10,
    },
  })) {
    // 第一类:启动消息
    if (message instanceof SystemMessage) {
      sessionId = message.sessionId;
      console.log(`[启动] 会话 ID: ${sessionId}`);
      console.log(`[启动] 可用工具: ${message.tools.join(", ")}`);
    }

    // 第二类:Agent 的回复(这里面内容最丰富)
    else if (message instanceof AssistantMessage) {
      for (const block of message.content) {
        if (block.type === "text") {
          // Agent 说了一段话
          console.log(`[思考] ${block.text}`);
        } else if (block.type === "tool_use") {
          // Agent 要调用工具
          console.log(`[工具] ${block.name}(${JSON.stringify(block.input)})`);
        } else if (block.type === "tool_result") {
          // 工具返回了结果
          console.log(`[结果] ${block.content?.substring(0, 200)}...`);
        }
      }
    }

    // 第三类:最终结果
    else if (message instanceof ResultMessage) {
      if (message.subtype === "success") {
        console.log(`[完成] 耗时 ${message.durationMs}ms`);
        console.log(`[费用] 输入 ${message.cost.inputTokens} tokens, 输出 ${message.cost.outputTokens} tokens`);
      } else {
        console.log(`[失败] ${message.error}`);
      }
    }
  }

  return sessionId;
}

processStreaming("帮我看看当前目录有哪些 TypeScript 文件");

Python 版本:

import asyncio
import json
from claude_agent_sdk import (
    query,
    ClaudeAgentOptions,
    SystemMessage,
    AssistantMessage,
    ResultMessage,
)

async def process_streaming(prompt: str) -> str | None:
    session_id = None

    async for message in query(
        prompt=prompt,
        options=ClaudeAgentOptions(
            allowed_tools=["Read", "Bash", "Glob", "Grep"],
            max_turns=10,
        ),
    ):
        # 第一类:启动消息
        if isinstance(message, SystemMessage):
            session_id = message.session_id
            print(f"[启动] 会话 ID: {session_id}")
            print(f"[启动] 可用工具: {', '.join(message.tools)}")

        # 第二类:Agent 的回复
        elif isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    print(f"[思考] {block['text']}")
                elif block["type"] == "tool_use":
                    print(f"[工具] {block['name']}({json.dumps(block['input'], ensure_ascii=False)})")
                elif block["type"] == "tool_result":
                    content = block.get("content", "")
                    print(f"[结果] {content[:200]}...")

        # 第三类:最终结果
        elif isinstance(message, ResultMessage):
            if message.subtype == "success":
                print(f"[完成] 耗时 {message.duration_ms}ms")
                print(f"[费用] 输入 {message.cost['input_tokens']} tokens, 输出 {message.cost['output_tokens']} tokens")
            else:
                print(f"[失败] {message.error}")

    return session_id

asyncio.run(process_streaming("帮我看看当前目录有哪些 Python 文件"))

7.3 单次输出详解 —— 干完了再说

什么时候用单次模式?

流式模式虽然体验好,但不是所有场景都需要。以下场景用单次模式更合适:

  1. 后台批处理 —— 比如用 Agent 批量分析100个文件,你不需要盯着每个文件的分析过程
  2. API 服务 —— 你的服务接收请求,调用 Agent,返回最终结果给客户端
  3. 定时任务 —— 每天凌晨3点让 Agent 生成报告,你又不在电脑前看
  4. 管道处理 —— Agent 的输出是另一个程序的输入,中间过程没人看

实现单次模式

思路很简单:把流式消息全部收集起来,最后一起返回。

TypeScript 版本:

import {
  query,
  AssistantMessage,
  ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";

// 辅助函数:把流式消息收集成一个结果
async function queryOnce(prompt: string): Promise<{
  response: string;
  success: boolean;
  cost: { inputTokens: number; outputTokens: number };
}> {
  const assistantTexts: string[] = [];
  let result: ResultMessage | null = null;

  for await (const message of query({
    prompt,
    options: {
      allowedTools: ["Read", "Bash", "Glob", "Grep"],
      maxTurns: 10,
    },
  })) {
    if (message instanceof AssistantMessage) {
      // 只收集文本内容,工具调用的过程我们不关心
      for (const block of message.content) {
        if (block.type === "text") {
          assistantTexts.push(block.text);
        }
      }
    } else if (message instanceof ResultMessage) {
      result = message;
    }
  }

  return {
    response: assistantTexts.join("\n"),
    success: result?.subtype === "success",
    cost: {
      inputTokens: result?.cost?.inputTokens ?? 0,
      outputTokens: result?.cost?.outputTokens ?? 0,
    },
  };
}

// 使用起来超简单
async function main() {
  const { response, success, cost } = await queryOnce(
    "分析一下 package.json 中的依赖,哪些可以升级?"
  );

  if (success) {
    console.log("分析结果:", response);
    console.log(`花费:${cost.inputTokens + cost.outputTokens} tokens`);
  } else {
    console.log("Agent 执行失败了");
  }
}

main();

Python 版本:

import asyncio
from dataclasses import dataclass
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage

@dataclass
class QueryResult:
    response: str
    success: bool
    input_tokens: int
    output_tokens: int

async def query_once(prompt: str) -> QueryResult:
    """把流式消息收集成一个单次结果"""
    assistant_texts: list[str] = []
    result = None

    async for message in query(
        prompt=prompt,
        options=ClaudeAgentOptions(
            allowed_tools=["Read", "Bash", "Glob", "Grep"],
            max_turns=10,
        ),
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    assistant_texts.append(block["text"])
        elif isinstance(message, ResultMessage):
            result = message

    return QueryResult(
        response="\n".join(assistant_texts),
        success=result.subtype == "success" if result else False,
        input_tokens=result.cost.get("input_tokens", 0) if result else 0,
        output_tokens=result.cost.get("output_tokens", 0) if result else 0,
    )

# 使用
async def main():
    result = await query_once("分析一下 package.json 中的依赖,哪些可以升级?")

    if result.success:
        print(f"分析结果:{result.response}")
        print(f"花费:{result.input_tokens + result.output_tokens} tokens")
    else:
        print("Agent 执行失败了")

asyncio.run(main())

批量处理示例

单次模式的一大优势就是做批处理特别方便。来看一个实际场景 —— 批量分析多个文件的代码质量:

import asyncio
import json
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage

async def analyze_file(file_path: str) -> dict:
    """分析单个文件的代码质量"""
    texts = []
    async for message in query(
        prompt=f"请分析文件 {file_path} 的代码质量,给出评分(1-10)和改进建议。请用 JSON 格式回复。",
        options=ClaudeAgentOptions(
            allowed_tools=["Read"],
            max_turns=3,
        ),
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    texts.append(block["text"])

    return {"file": file_path, "analysis": "\n".join(texts)}

async def batch_analyze(files: list[str]) -> list[dict]:
    """并行分析多个文件"""
    # 用 asyncio.gather 并行处理,效率翻倍
    tasks = [analyze_file(f) for f in files]
    results = await asyncio.gather(*tasks)
    return list(results)

# 使用
async def main():
    files = [
        "src/index.ts",
        "src/utils.ts",
        "src/config.ts",
        "src/router.ts",
    ]

    print("开始批量分析...")
    results = await batch_analyze(files)

    print("\n=== 分析报告 ===")
    for r in results:
        print(f"\n文件:{r['file']}")
        print(f"分析:{r['analysis'][:300]}...")

asyncio.run(main())

这里用了 asyncio.gather() 来并行分析多个文件,速度会快很多。这就是单次模式的优势 —— 代码简单、容易组合、适合批处理。


7.4 实时显示 Agent 工作过程 —— 打造你的 Agent Monitor

上面讲的流式处理只是把消息打印出来,不够酷。现在来做一个真正实用的 "Agent Monitor"—— 在终端里实时展示 Agent 的思考、工具调用和执行结果,就像你在 Claude Code 里看到的那样。

启用部分消息(Partial Messages)

默认情况下,流式输出给你的是"完整的消息" —— Agent 一个完整的回复说完了,你才收到。这就好比直播虽然是实时的,但画面是一帧一帧发的,不是一个像素一个像素发的。

如果你想要"逐字显示"Agent 正在打字的效果(就像 ChatGPT 那样一个字一个字往外蹦),需要启用 partial messages(部分消息)

// TypeScript
const options = {
  includePartialMessages: true,  // 开启逐字流式
  allowedTools: ["Read", "Bash"],
};
# Python
options = ClaudeAgentOptions(
    include_partial_messages=True,  # 开启逐字流式
    allowed_tools=["Read", "Bash"],
)

启用后,你会额外收到一种特殊的消息类型 —— StreamEvent。这是 Claude API 原始的流式事件,包含最细粒度的数据。

StreamEvent 的事件类型

StreamEvent 包装了 Claude API 的原始流式事件,常见的事件类型有:

事件类型 含义 你能拿到什么
message_start 开始新一轮回复 模型信息、usage 信息
content_block_start 开始一个新内容块 块的类型(text 或 tool_use)、工具名
content_block_delta 内容块的增量更新 一小段文字、一段 JSON 片段
content_block_stop 一个内容块结束了
message_delta 消息级别的更新 stop_reason
message_stop 本轮回复结束

消息流的顺序是这样的:

message_start
  → content_block_start (text)
    → content_block_delta (文字片段1)
    → content_block_delta (文字片段2)
    → content_block_delta (文字片段3)
  → content_block_stop
  → content_block_start (tool_use)
    → content_block_delta (JSON 片段1)
    → content_block_delta (JSON 片段2)
  → content_block_stop
→ message_delta (stop_reason: "tool_use")
→ message_stop

... Agent 执行工具,拿到结果,开始新一轮 ...

message_start
  → content_block_start (text)
    → content_block_delta (最终回复片段1)
    → content_block_delta (最终回复片段2)
  → content_block_stop
→ message_delta (stop_reason: "end_turn")
→ message_stop

完整的 Agent Monitor 示例

下面来做一个实际可用的终端 Agent Monitor。它能做到:

  1. 逐字显示 Agent 说的话(像 ChatGPT 的打字效果)
  2. 显示 Agent 正在调用什么工具
  3. 显示工具执行的结果(截断过长内容)
  4. 用不同的前缀区分不同类型的信息

Python 版本(完整可运行):

import asyncio
import sys
import json
from claude_agent_sdk import (
    query,
    ClaudeAgentOptions,
    SystemMessage,
    AssistantMessage,
    ResultMessage,
)
from claude_agent_sdk.types import StreamEvent


class AgentMonitor:
    """终端 Agent 监控器 —— 实时显示 Agent 的工作过程"""

    def __init__(self):
        self.in_tool = False
        self.current_tool_name = ""
        self.turn_count = 0

    def print_header(self, text: str):
        """打印分隔线标题"""
        width = 60
        print(f"\n{'─' * width}")
        print(f"  {text}")
        print(f"{'─' * width}")

    def print_status(self, label: str, text: str):
        """打印状态信息"""
        print(f"  [{label}] {text}")

    async def run(self, prompt: str):
        """启动 Agent 并实时显示输出"""
        self.print_header(f"Agent Monitor - 任务开始")
        print(f"  提示词: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")

        options = ClaudeAgentOptions(
            include_partial_messages=True,
            allowed_tools=["Read", "Write", "Bash", "Glob", "Grep"],
            max_turns=10,
        )

        async for message in query(prompt=prompt, options=options):
            self._handle_message(message)

        self.print_header("Agent Monitor - 任务结束")

    def _handle_message(self, message):
        """根据消息类型分发处理"""
        if isinstance(message, SystemMessage):
            self._handle_system(message)
        elif isinstance(message, StreamEvent):
            self._handle_stream_event(message)
        elif isinstance(message, AssistantMessage):
            self._handle_assistant(message)
        elif isinstance(message, ResultMessage):
            self._handle_result(message)

    def _handle_system(self, message: SystemMessage):
        """处理启动消息"""
        self.print_status("系统", f"会话已创建: {message.session_id[:16]}...")
        self.print_status("系统", f"模型: {message.model}")
        self.print_status("系统", f"工具: {', '.join(message.tools)}")
        print()

    def _handle_stream_event(self, message: StreamEvent):
        """处理流式事件 —— 这是实现逐字显示的关键"""
        event = message.event
        event_type = event.get("type")

        if event_type == "content_block_start":
            content_block = event.get("content_block", {})
            if content_block.get("type") == "tool_use":
                # 工具调用开始
                self.current_tool_name = content_block.get("name", "未知工具")
                self.in_tool = True
                sys.stdout.write(f"\n  [工具调用] {self.current_tool_name}")
                sys.stdout.flush()
            elif content_block.get("type") == "text":
                # 文字内容开始
                if not self.in_tool:
                    sys.stdout.write("\n  [Agent] ")
                    sys.stdout.flush()

        elif event_type == "content_block_delta":
            delta = event.get("delta", {})
            if delta.get("type") == "text_delta" and not self.in_tool:
                # 逐字输出 Agent 说的话
                text = delta.get("text", "")
                sys.stdout.write(text)
                sys.stdout.flush()

        elif event_type == "content_block_stop":
            if self.in_tool:
                sys.stdout.write(" ... 执行中\n")
                sys.stdout.flush()
                self.in_tool = False

        elif event_type == "message_delta":
            # 一轮回复结束
            stop_reason = event.get("delta", {}).get("stop_reason")
            if stop_reason == "tool_use":
                self.turn_count += 1
                print(f"\n  [循环] 第 {self.turn_count} 轮工具调用")

    def _handle_assistant(self, message: AssistantMessage):
        """处理完整的 Assistant 消息(工具结果等)"""
        for block in message.content:
            if block.get("type") == "tool_result":
                content = block.get("content", "")
                if isinstance(content, str) and len(content) > 200:
                    content = content[:200] + "... (已截断)"
                self.print_status("工具结果", str(content)[:300])

    def _handle_result(self, message: ResultMessage):
        """处理最终结果"""
        print()
        if message.subtype == "success":
            self.print_status("完成", "任务成功完成!")
            if hasattr(message, "cost") and message.cost:
                input_t = message.cost.get("input_tokens", 0)
                output_t = message.cost.get("output_tokens", 0)
                self.print_status("费用", f"输入 {input_t} tokens, 输出 {output_t} tokens")
            if hasattr(message, "duration_ms") and message.duration_ms:
                self.print_status("耗时", f"{message.duration_ms}ms")
        else:
            self.print_status("失败", f"错误: {message.error}")


# 运行
async def main():
    monitor = AgentMonitor()
    await monitor.run("帮我看看当前目录的文件结构,然后找到所有 Python 文件并统计行数")

asyncio.run(main())

运行效果大概长这样:

────────────────────────────────────────────────────────────
  Agent Monitor - 任务开始
────────────────────────────────────────────────────────────
  提示词: 帮我看看当前目录的文件结构,然后找到所有 Python 文件并统计行数
  [系统] 会话已创建: sess_7f3a2b1c...
  [系统] 模型: claude-sonnet-4-20250514
  [系统] 工具: Read, Write, Bash, Glob, Grep

  [Agent] 好的,让我先看看当前目录的文件结构。
  [工具调用] Bash ... 执行中
  [循环] 第 1 轮工具调用
  [工具结果] total 42\ndrwxr-xr-x  5 user  staff  160 Feb 25 src/...

  [Agent] 我看到了项目结构。现在让我找到所有 Python 文件并统计行数。
  [工具调用] Glob ... 执行中
  [工具调用] Bash ... 执行中
  [循环] 第 2 轮工具调用
  [工具结果] 12 files found...
  [工具结果] 总计:1,234 行...

  [Agent] 分析完成!当前目录下有 12 个 Python 文件,总共 1,234 行代码...

  [完成] 任务成功完成!
  [费用] 输入 2341 tokens, 输出 567 tokens
  [耗时] 4521ms
────────────────────────────────────────────────────────────
  Agent Monitor - 任务结束
────────────────────────────────────────────────────────────

TypeScript 版本的 Agent Monitor

import {
  query,
  SystemMessage,
  AssistantMessage,
  ResultMessage,
  StreamEvent,
} from "@anthropic-ai/claude-agent-sdk";

class AgentMonitor {
  private inTool = false;
  private turnCount = 0;

  async run(prompt: string) {
    console.log("\n" + "─".repeat(60));
    console.log(`  Agent Monitor - 任务开始`);
    console.log("─".repeat(60));
    console.log(`  提示词: ${prompt.substring(0, 80)}`);

    for await (const message of query({
      prompt,
      options: {
        includePartialMessages: true,
        allowedTools: ["Read", "Write", "Bash", "Glob", "Grep"],
        maxTurns: 10,
      },
    })) {
      if (message instanceof SystemMessage) {
        console.log(`  [系统] 会话: ${message.sessionId.substring(0, 16)}...`);
        console.log(`  [系统] 模型: ${message.model}`);
      } else if (message instanceof StreamEvent) {
        this.handleStreamEvent(message);
      } else if (message instanceof AssistantMessage) {
        this.handleAssistant(message);
      } else if (message instanceof ResultMessage) {
        this.handleResult(message);
      }
    }

    console.log("\n" + "─".repeat(60));
    console.log("  Agent Monitor - 任务结束");
    console.log("─".repeat(60));
  }

  private handleStreamEvent(message: StreamEvent) {
    const event = message.event;

    switch (event.type) {
      case "content_block_start":
        if (event.content_block?.type === "tool_use") {
          this.inTool = true;
          process.stdout.write(`\n  [工具调用] ${event.content_block.name}`);
        } else if (!this.inTool) {
          process.stdout.write("\n  [Agent] ");
        }
        break;

      case "content_block_delta":
        if (event.delta?.type === "text_delta" && !this.inTool) {
          process.stdout.write(event.delta.text || "");
        }
        break;

      case "content_block_stop":
        if (this.inTool) {
          process.stdout.write(" ... 执行中\n");
          this.inTool = false;
        }
        break;

      case "message_delta":
        if (event.delta?.stop_reason === "tool_use") {
          this.turnCount++;
          console.log(`\n  [循环] 第 ${this.turnCount} 轮工具调用`);
        }
        break;
    }
  }

  private handleAssistant(message: AssistantMessage) {
    for (const block of message.content) {
      if (block.type === "tool_result") {
        const content = typeof block.content === "string"
          ? block.content.substring(0, 200)
          : JSON.stringify(block.content).substring(0, 200);
        console.log(`  [工具结果] ${content}...`);
      }
    }
  }

  private handleResult(message: ResultMessage) {
    console.log();
    if (message.subtype === "success") {
      console.log("  [完成] 任务成功完成!");
      if (message.cost) {
        console.log(`  [费用] 输入 ${message.cost.inputTokens} tokens, 输出 ${message.cost.outputTokens} tokens`);
      }
    } else {
      console.log(`  [失败] ${message.error}`);
    }
  }
}

// 运行
const monitor = new AgentMonitor();
monitor.run("帮我看看当前目录的文件结构");

给终端加点颜色(可选)

如果你想让终端输出更好看,可以用 ANSI 转义码加颜色。这是纯终端的做法,不需要安装任何库:

# ANSI 颜色常量
class Color:
    RESET = "\033[0m"
    BOLD = "\033[1m"
    DIM = "\033[2m"

    RED = "\033[31m"
    GREEN = "\033[32m"
    YELLOW = "\033[33m"
    BLUE = "\033[34m"
    MAGENTA = "\033[35m"
    CYAN = "\033[36m"

# 用法示例
def print_colored(label: str, text: str, color: str):
    print(f"  {color}{Color.BOLD}[{label}]{Color.RESET} {text}")

# 效果:
# [系统] 会话已创建     ← 蓝色
# [Agent] 让我想想...   ← 绿色
# [工具调用] Bash       ← 黄色
# [工具结果] total 42   ← 青色
# [完成] 任务成功!       ← 绿色
# [失败] 出错了          ← 红色

print_colored("系统", "会话已创建", Color.BLUE)
print_colored("Agent", "让我想想...", Color.GREEN)
print_colored("工具调用", "Bash(ls -la)", Color.YELLOW)
print_colored("完成", "任务成功!", Color.GREEN)
print_colored("失败", "出错了", Color.RED)

7.5 Stop Reasons —— Agent 为什么停下来了?

Agent 不会永远跑下去(那就成死循环了)。它总会停下来,而 stop_reason 就是告诉你"它为什么停了"。

这就像你问一个快递员"你为什么停下来了":

各种 Stop Reason 详解

1. end_turn —— 正常完成

# 这是最理想的情况:Agent 觉得任务完成了,主动停下来
{
    "stop_reason": "end_turn"
}

出现场景: Agent 认为它已经回答了你的问题,或者完成了你交代的任务。

你需要做什么: 什么都不用做,一切正常。

2. max_tokens —— 回复太长,被截断了

# Agent 话还没说完,但是输出长度已经到上限了
{
    "stop_reason": "max_tokens"
}

出现场景: Agent 要说的话太多了。比如你让它写一篇很长的文章,或者分析一个很大的文件。

你需要做什么: 考虑以下几种处理方式:

# 处理 max_tokens 的示例
async for message in query(prompt=prompt, options=options):
    if isinstance(message, ResultMessage):
        if message.stop_reason == "max_tokens":
            print("Agent 的回复被截断了,你可能需要:")
            print("1. 让任务更具体,减少输出量")
            print("2. 恢复会话让 Agent 继续说")

3. tool_use —— 需要用工具

# Agent 话说到一半,发现需要调工具
{
    "stop_reason": "tool_use"
}

出现场景: Agent 在回复过程中决定要调用一个工具。这是 Agent 循环的正常一环。

你需要做什么: 通常你什么都不用做 —— SDK 会自动处理。Agent 调完工具,拿到结果,会自己继续下一轮循环。这个 stop_reason 你一般在 AssistantMessage 里看到,在 ResultMessage 里很少看到。

4. stop_sequence —— 遇到了停止词

# Agent 输出到某个特定字符串时被强制停了
{
    "stop_reason": "stop_sequence"
}

出现场景: 你在配置中设置了 stop_sequences(停止序列),Agent 的输出恰好包含了这个序列。

你需要做什么: 检查输出是否完整。停止词是你自己设的,所以你应该知道为什么 Agent 会停在这里。

5. refusal —— Agent 拒绝了

# Agent 不想干这个活
{
    "stop_reason": "refusal"
}

出现场景: 你让 Agent 做一些它认为不合适的事情。比如生成有害内容、帮忙做违法的事等。

你需要做什么: 尊重 Agent 的拒绝。重新考虑你的请求是否合理。

完整的 Stop Reason 处理模板

下面是一个"生产级"的 Stop Reason 处理代码,你可以直接用在自己的项目里:

Python 版本:

import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage, AssistantMessage

async def robust_query(prompt: str, max_retries: int = 2) -> str:
    """
    健壮的 Agent 查询函数,优雅处理各种停止原因。

    特性:
    - max_tokens 时自动提示
    - refusal 时给出说明
    - 其他异常情况有兜底处理
    """
    texts: list[str] = []

    async for message in query(
        prompt=prompt,
        options=ClaudeAgentOptions(
            allowed_tools=["Read", "Bash", "Glob", "Grep"],
            max_turns=15,
        ),
    ):
        # 收集 Agent 的文本输出
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    texts.append(block["text"])

        # 处理最终结果
        elif isinstance(message, ResultMessage):
            stop_reason = message.stop_reason

            if message.subtype == "success":
                # 正常完成
                if stop_reason == "end_turn":
                    return "\n".join(texts)

                elif stop_reason == "max_tokens":
                    # 被截断了,返回已有内容并附加提示
                    truncated = "\n".join(texts)
                    return (
                        truncated
                        + "\n\n[注意:Agent 的回复因长度限制被截断。"
                        + "你可以恢复会话继续获取剩余内容。]"
                    )

                elif stop_reason == "stop_sequence":
                    # 遇到停止词,正常返回
                    return "\n".join(texts)

                else:
                    # 其他成功情况
                    return "\n".join(texts)

            else:
                # 失败情况
                if stop_reason == "refusal":
                    return "[Agent 拒绝了这个请求。请检查你的提示词是否包含不当内容。]"

                elif "max_turns" in str(message.error or ""):
                    return (
                        "\n".join(texts)
                        + "\n\n[注意:Agent 达到了最大循环次数限制。"
                        + "任务可能未完成,请增加 max_turns 或简化任务。]"
                    )

                else:
                    error_msg = message.error or "未知错误"
                    return f"[Agent 执行失败:{error_msg}]"

    # 如果走到这里,说明根本没有 ResultMessage
    return "[异常:Agent 没有返回结果消息]"


# 使用
async def main():
    # 正常任务
    result = await robust_query("当前目录有哪些文件?")
    print(result)

    # 可能被截断的长任务
    result = await robust_query("详细分析整个项目的代码架构")
    print(result)

asyncio.run(main())

TypeScript 版本:

import {
  query,
  AssistantMessage,
  ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";

async function robustQuery(prompt: string): Promise<string> {
  const texts: string[] = [];

  for await (const message of query({
    prompt,
    options: {
      allowedTools: ["Read", "Bash", "Glob", "Grep"],
      maxTurns: 15,
    },
  })) {
    if (message instanceof AssistantMessage) {
      for (const block of message.content) {
        if (block.type === "text") {
          texts.push(block.text);
        }
      }
    } else if (message instanceof ResultMessage) {
      const stopReason = message.stopReason;

      if (message.subtype === "success") {
        switch (stopReason) {
          case "end_turn":
            return texts.join("\n");

          case "max_tokens":
            return texts.join("\n")
              + "\n\n[注意:Agent 的回复因长度限制被截断。]";

          default:
            return texts.join("\n");
        }
      } else {
        if (stopReason === "refusal") {
          return "[Agent 拒绝了这个请求]";
        }
        return `[Agent 执行失败:${message.error || "未知错误"}]`;
      }
    }
  }

  return "[异常:没有收到结果]";
}

7.6 流式 vs 单次 —— 选择指南

对比总结表

维度 流式(Streaming) 单次(Single)
用户体验 好,能看到实时进度 差,只有"加载中"和"完成"
首字节延迟 低(毫秒级) 高(需等全部完成)
代码复杂度 中等,要处理多种事件 低,收集完一次处理
错误处理 可以中途发现问题 只能最后才知道
适合长任务 是,用户不会觉得卡死 否,长时间无反馈
适合批处理 否,增加不必要的复杂度 是,代码简单易组合
资源占用 持续占用连接 可以释放后再读取
可中断性 好,随时可以停 差,要么等完,要么全部取消

决策流程图

你的 Agent 输出给谁看?
│
├─→ 给人看(终端/网页/聊天)
│   │
│   ├─→ 任务可能超过 5 秒?
│   │   ├─→ 是 → 【用流式】(不然用户以为卡死了)
│   │   └─→ 否 → 【都行】(快的话差别不大)
│   │
│   └─→ 需要逐字显示效果?
│       ├─→ 是 → 【用流式 + partial messages】
│       └─→ 否 → 【用流式】(普通消息级别就够)
│
└─→ 给程序看(API/管道/批处理)
    │
    ├─→ 需要中途干预?
    │   ├─→ 是 → 【用流式】(可以监控 + 中断)
    │   └─→ 否 → 【用单次】(简单省事)
    │
    └─→ 多个任务并行?
        ├─→ 是 → 【用单次 + asyncio.gather】
        └─→ 否 → 【用单次】

性能考量

  1. 流式模式不会更快 —— Agent 干活的总时间是一样的。流式只是让你"更早看到中间结果",并不会让 Agent 跑得更快。

  2. 流式模式略微增加网络开销 —— 因为数据是一小块一小块发的,HTTP 层面的开销会多一点。但这个差异在绝大多数场景下可以忽略。

  3. partial messages 有额外开销 —— 启用 include_partial_messages 后,消息数量会大幅增加(从几条变成几十上百条)。如果你不需要逐字效果,不要开这个选项。

  4. 批处理用单次模式更高效 —— 因为代码更简单,可以方便地用 asyncio.gather() 并行多个任务,反而比流式逐个处理快得多。

一个混合模式的实战示例

有时候你两种都需要 —— 在终端给用户看流式进度,同时把结果收集起来存到数据库。下面是一个混合模式的例子:

import asyncio
from claude_agent_sdk import (
    query,
    ClaudeAgentOptions,
    SystemMessage,
    AssistantMessage,
    ResultMessage,
)

async def hybrid_query(prompt: str) -> dict:
    """
    混合模式:一边流式显示进度,一边收集完整结果。
    返回结构化的结果,同时终端能看到实时进度。
    """
    collected_texts: list[str] = []
    tool_calls: list[dict] = []
    session_id = None
    cost_info = None

    async for message in query(
        prompt=prompt,
        options=ClaudeAgentOptions(
            allowed_tools=["Read", "Bash", "Glob", "Grep"],
            max_turns=10,
        ),
    ):
        if isinstance(message, SystemMessage):
            session_id = message.session_id
            # 流式显示
            print(f"[Agent 已就绪] 会话: {session_id[:12]}...")

        elif isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    text = block["text"]
                    collected_texts.append(text)
                    # 流式显示
                    print(f"[Agent] {text[:100]}{'...' if len(text) > 100 else ''}")

                elif block["type"] == "tool_use":
                    tool_info = {
                        "name": block["name"],
                        "input": block["input"],
                    }
                    tool_calls.append(tool_info)
                    # 流式显示
                    print(f"[工具] {block['name']}({str(block['input'])[:80]})")

                elif block["type"] == "tool_result":
                    # 流式显示
                    content = str(block.get("content", ""))[:150]
                    print(f"[结果] {content}")

        elif isinstance(message, ResultMessage):
            cost_info = message.cost if hasattr(message, "cost") else None
            if message.subtype == "success":
                print("[完成] 任务成功!")
            else:
                print(f"[失败] {message.error}")

    # 返回收集好的结构化结果
    return {
        "session_id": session_id,
        "response": "\n".join(collected_texts),
        "tool_calls": tool_calls,
        "tool_call_count": len(tool_calls),
        "cost": cost_info,
    }


async def main():
    # 终端能看到实时进度
    result = await hybrid_query("分析当前目录的 README.md 文件")

    # 同时也拿到了完整结果,可以存数据库或做后续处理
    print(f"\n=== 汇总 ===")
    print(f"总共调用了 {result['tool_call_count']} 次工具")
    print(f"回复长度: {len(result['response'])} 字符")
    # save_to_database(result)  # 存数据库

asyncio.run(main())

注意事项与限制

使用流式输出时,有几个点需要注意:

  1. 结构化输出(Structured Output)和流式 partial messages 不兼容 —— 如果你用了 outputFormat 来要求 JSON 输出,就不能同时开 include_partial_messages。这两个功能互斥。

  2. StreamEvent 的格式可能随 API 版本变化 —— StreamEvent.event 里的内容是 Claude API 原始的流式事件格式。如果 Anthropic 更新了 API,事件格式可能会变。建议做好兼容处理。

  3. subagent 的流式事件带有 parent_tool_use_id —— 如果你的 Agent 有子 Agent(第13章会讲),子 Agent 的流式事件会带一个 parent_tool_use_id 字段,让你知道这是哪个子 Agent 产生的。

  4. 网络断开的处理 —— 流式连接是长连接,如果网络波动断了,你需要做好重连逻辑。单次模式在这方面更健壮,因为它是一次性请求。


动手练习

练习1:打造终端 Agent UI

目标: 做一个漂亮的终端 Agent 界面,能实时显示 Agent 的思考和行动。

要求:

  1. 用不同颜色区分不同类型的消息(思考=绿色,工具=黄色,结果=青色,错误=红色)
  2. Agent 的文字要有"逐字打字"效果
  3. 工具调用时显示一个简单的旋转动画(比如 | / - \
  4. 在任务结束时显示统计信息(耗时、Token 用量、工具调用次数)

提示:

import asyncio
import sys

# 旋转动画
spinner_chars = ["|", "/", "-", "\\"]

async def show_spinner(message: str):
    """显示旋转动画,直到被取消"""
    i = 0
    try:
        while True:
            sys.stdout.write(f"\r  {spinner_chars[i % 4]} {message}")
            sys.stdout.flush()
            i += 1
            await asyncio.sleep(0.1)
    except asyncio.CancelledError:
        sys.stdout.write(f"\r  ✓ {message} 完成\n")
        sys.stdout.flush()

# 使用方式
# spinner_task = asyncio.create_task(show_spinner("正在执行 Bash..."))
# ... 等工具执行完 ...
# spinner_task.cancel()

骨架代码:

import asyncio
import sys
import time
from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent

class TerminalAgentUI:
    def __init__(self):
        self.tool_count = 0
        self.start_time = None
        # 你的代码...

    async def run(self, prompt: str):
        self.start_time = time.time()
        # 实现流式处理...
        # 用颜色、旋转动画、逐字效果让输出好看

    def show_stats(self, result):
        # 显示统计信息
        elapsed = time.time() - self.start_time
        # 你的代码...

async def main():
    ui = TerminalAgentUI()
    await ui.run("帮我看看当前项目的 Git 提交历史,总结最近5次提交")

asyncio.run(main())

练习2:批量文件处理器

目标: 用单次模式批量处理多个文件,生成一份汇总报告。

要求:

  1. 接收一个目录路径,找到其中所有的 .py.ts 文件
  2. 用 Agent 逐个分析每个文件(并行处理,最多5个并发)
  3. 分析内容包括:文件用途、代码行数、主要函数/类、潜在问题
  4. 所有文件分析完后,生成一份 Markdown 格式的汇总报告

提示:

import asyncio

async def batch_process_with_limit(tasks, max_concurrent=5):
    """带并发限制的批量处理"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*[limited_task(t) for t in tasks])

骨架代码:

import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage

async def analyze_single_file(file_path: str) -> dict:
    """用 Agent 分析单个文件"""
    texts = []
    async for message in query(
        prompt=f"请分析文件 {file_path},告诉我:1. 文件用途 2. 代码行数 3. 主要函数/类 4. 潜在问题",
        options=ClaudeAgentOptions(
            allowed_tools=["Read"],
            max_turns=3,
        ),
    ):
        if isinstance(message, AssistantMessage):
            for block in message.content:
                if block["type"] == "text":
                    texts.append(block["text"])
    return {"file": file_path, "analysis": "\n".join(texts)}

async def generate_report(results: list[dict]) -> str:
    """生成 Markdown 汇总报告"""
    # 你的代码...
    report = "# 代码分析报告\n\n"
    for r in results:
        report += f"## {r['file']}\n\n{r['analysis']}\n\n---\n\n"
    return report

async def main():
    # 1. 找到所有目标文件(可以用 Glob 工具或 os.walk)
    files = [...]  # 你的代码

    # 2. 并行分析(最多5个并发)
    # 你的代码...

    # 3. 生成报告
    # 你的代码...

asyncio.run(main())

本章小结

这一章我们深入学习了 Agent 的两种输出模式,来回顾一下关键知识点:

  1. 流式 vs 单次:流式像看直播,一边干一边看;单次像看录播,干完一次性给你。交互式场景用流式,后台任务用单次。

  2. 三种消息类型

    • SystemMessage:开场白,包含会话 ID
    • AssistantMessage:Agent 说的话和调的工具,是最丰富的消息类型
    • ResultMessage:终场总结,包含成功/失败状态和费用信息
  3. Partial Messages:开启 include_partial_messages 后,你能拿到 StreamEvent,实现逐字打字效果。事件类型包括 content_block_start/delta/stopmessage_start/delta/stop

  4. Stop Reasons

    • end_turn:正常完成
    • max_tokens:输出太长被截断
    • tool_use:需要调工具(循环中的正常状态)
    • stop_sequence:遇到停止词
    • refusal:Agent 拒绝执行
  5. 混合模式:可以一边流式显示给用户看,一边收集结果做后续处理。

  6. 性能提示:流式不会让 Agent 跑得更快,但能让用户"感觉"更快。批处理场景用单次模式 + 并行执行效率更高。


下一章预告

下一章我们学 会话管理 —— 让 Agent 记住上下文。

你有没有过这种体验:跟客服聊了半天,结果网页刷新了,要从头再来?Agent 也有这个问题。下一章我们学的 Session(会话)机制,就是让 Agent 能"保存进度"、"恢复对话",甚至"开分支试不同方案"。

具体来说,你将学到:

翻到下一页 → 第8章:会话管理

← 上一章6. 权限控制