第7章:流式输出 —— 让 Agent 实时汇报进度
一句话:掌握流式和单次两种输出模式,让 Agent 的工作过程可视化。
本章目标
- 理解流式输出和单次输出的本质区别
- 学会用
query()的异步迭代器处理流式消息 - 掌握三种消息类型的处理方法
- 能在终端中实时显示 Agent 的工作过程
- 理解各种 Stop Reason 的含义和处理方式
- 能根据场景选择合适的输出模式
前置知识
- 需要先看完第4章(query() 函数基础)
- 需要先看完第5章(内置工具)
- 需要先看完第6章(权限控制)
7.1 流式 vs 单次 —— 看直播还是看录播?
先讲个故事
想象你请了一个装修工人来装修你的房子。
方式一:看直播 你搬把椅子坐在旁边看。工人砸了一面墙,你马上看到了;工人刷了一面漆,你马上看到了;工人铺了地砖歪了,你马上指出来了。这就是 流式输出(Streaming)。
方式二:看录播 你跟工人说"你干完了给我打电话",然后你就出去逛街了。三天后工人打电话说"搞定了",你回来一看 —— 好的坏的,一次性全看到了。这就是 单次输出(Single)。
技术上的区别
| 对比项 | 流式输出(Streaming) | 单次输出(Single) |
|---|---|---|
| 数据到达方式 | 一条一条,边产生边发给你 | 全部完成后,一次性发给你 |
| 用户体验 | 像看直播,能看到"Agent 在想什么" | 像等快递,只有"已完成"这一个状态 |
| 首字节时间 | 快!几毫秒就能看到第一个字 | 慢,要等 Agent 全部跑完 |
| 适合场景 | 交互式 UI、聊天界面、终端工具 | 后台任务、批处理、API 服务 |
| 实现复杂度 | 稍复杂,要处理多种消息类型 | 简单,收集完处理就行 |
| 类比 | 看厨师做菜的直播 | 看菜做完的照片 |
什么时候用哪个?
经验法则很简单:
- 有人在盯着屏幕等? → 用流式。不然用户会以为你的程序卡死了。
- 没人看,后台默默跑? → 用单次。代码简单,不容易出 bug。
- 需要中途干预? → 用流式。你能看到 Agent 在干嘛,不对劲可以及时叫停。
- 只关心最终结果? → 用单次。管它过程是什么,给我结果就行。
好了,概念讲完了,下面来写代码。
7.2 流式输出详解 —— 一边干活一边汇报
query() 返回的是什么?
还记得第4章学的 query() 函数吗?它返回的是一个 异步迭代器(AsyncGenerator)。
什么意思呢?就是说,它不会一次性给你所有结果,而是"一条一条往外吐"。你用 for await(TypeScript)或 async for(Python)来逐条接收。
就像自来水管 —— 水是一直在流的,你拿杯子接一杯是一杯,不用等水管里所有的水都出来。
最基础的流式处理
先看 TypeScript 版本:
import { query, AssistantMessage, ResultMessage } from "@anthropic-ai/claude-agent-sdk";
async function streamingDemo() {
for await (const message of query({
prompt: "帮我写一首关于编程的打油诗",
options: {
allowedTools: [],
maxTurns: 1,
},
})) {
// 每收到一条消息,就处理一条
if (message instanceof AssistantMessage) {
// Agent 说的话
console.log("[Agent 说]:", message.content);
} else if (message instanceof ResultMessage) {
// 最终结果
console.log("[完成]:", message.subtype);
}
}
}
streamingDemo();
Python 版本:
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
async def streaming_demo():
async for message in query(
prompt="帮我写一首关于编程的打油诗",
options=ClaudeAgentOptions(
allowed_tools=[],
max_turns=1,
),
):
if isinstance(message, AssistantMessage):
print(f"[Agent 说]: {message.content}")
elif isinstance(message, ResultMessage):
print(f"[完成]: {message.subtype}")
asyncio.run(streaming_demo())
消息流的顺序
Agent 干活的过程中,消息是按一定顺序流过来的。就像工厂的流水线,有固定的工序:
┌─────────────────────────────────────────────────────┐
│ │
│ 1. SystemMessage (init) │
│ ↓ "大家好,我是 Agent,已就位" │
│ │
│ 2. AssistantMessage │
│ ↓ "让我想想...我觉得应该先用 Bash 看看文件" │
│ │
│ 3. AssistantMessage (tool_use) │
│ ↓ "我要调用 Bash 工具了,参数是 ls -la" │
│ │
│ 4. AssistantMessage (tool_result) │
│ ↓ "工具执行结果:total 42..." │
│ │
│ 5. AssistantMessage │
│ ↓ "好的,我看到了文件列表,现在来整理..." │
│ │
│ 6. ResultMessage (success) │
│ ↓ "任务完成!" │
│ │
└─────────────────────────────────────────────────────┘
注意中间的 2→3→4→5 可能会重复多次 —— Agent 每调用一次工具就走一轮"思考→调用→拿结果→再思考"的循环。
三种消息类型详解
让我们把三种消息掰开了看:
1. SystemMessage(启动消息)
# SystemMessage 长这样
{
"type": "system",
"subtype": "init",
"session_id": "sess_abc123", # 会话 ID,后面恢复会话要用
"tools": ["Read", "Write", "Bash"], # Agent 可以用的工具列表
"model": "claude-sonnet-4-20250514", # 使用的模型
}
这条消息在最开始出现,而且只出现一次。就像开会前先做自我介绍:"我是谁、我能干什么、我的工号是多少"。
你应该从这条消息里拿到 session_id,保存起来以备后用(第8章会讲怎么恢复会话)。
2. AssistantMessage(Agent 的回复)
这是最"热闹"的消息类型,Agent 说的话、调的工具、拿到的结果,全在这里。
# 纯文本回复
{
"type": "assistant",
"content": [
{
"type": "text",
"text": "让我来帮你分析一下这个代码文件..."
}
],
"stop_reason": "end_turn", # 或 "tool_use"
}
# 工具调用
{
"type": "assistant",
"content": [
{
"type": "text",
"text": "我需要先看看这个文件的内容。"
},
{
"type": "tool_use",
"id": "toolu_abc123",
"name": "Read",
"input": {
"file_path": "/path/to/file.py"
}
}
],
"stop_reason": "tool_use", # 注意这里是 tool_use,说明 Agent 还没完
}
注意 stop_reason 字段:
"end_turn"→ Agent 说完了这一段,不需要调工具"tool_use"→ Agent 要用工具了,循环还要继续转
3. ResultMessage(最终结果)
# 成功完成
{
"type": "result",
"subtype": "success",
"stop_reason": "end_turn",
"cost": {
"input_tokens": 1523,
"output_tokens": 456,
},
"duration_ms": 3200,
"session_id": "sess_abc123",
}
# 出错了
{
"type": "result",
"subtype": "error",
"error": "Max turns exceeded",
"stop_reason": "max_turns",
}
这是最后一条消息,告诉你 Agent 是成功了还是失败了,花了多少 Token、用了多长时间。
完整的流式处理模板
把上面三种消息放在一起,下面是一个你可以直接拿去用的模板:
TypeScript 版本:
import {
query,
SystemMessage,
AssistantMessage,
ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";
async function processStreaming(prompt: string) {
let sessionId: string | undefined;
for await (const message of query({
prompt,
options: {
allowedTools: ["Read", "Bash", "Glob", "Grep"],
maxTurns: 10,
},
})) {
// 第一类:启动消息
if (message instanceof SystemMessage) {
sessionId = message.sessionId;
console.log(`[启动] 会话 ID: ${sessionId}`);
console.log(`[启动] 可用工具: ${message.tools.join(", ")}`);
}
// 第二类:Agent 的回复(这里面内容最丰富)
else if (message instanceof AssistantMessage) {
for (const block of message.content) {
if (block.type === "text") {
// Agent 说了一段话
console.log(`[思考] ${block.text}`);
} else if (block.type === "tool_use") {
// Agent 要调用工具
console.log(`[工具] ${block.name}(${JSON.stringify(block.input)})`);
} else if (block.type === "tool_result") {
// 工具返回了结果
console.log(`[结果] ${block.content?.substring(0, 200)}...`);
}
}
}
// 第三类:最终结果
else if (message instanceof ResultMessage) {
if (message.subtype === "success") {
console.log(`[完成] 耗时 ${message.durationMs}ms`);
console.log(`[费用] 输入 ${message.cost.inputTokens} tokens, 输出 ${message.cost.outputTokens} tokens`);
} else {
console.log(`[失败] ${message.error}`);
}
}
}
return sessionId;
}
processStreaming("帮我看看当前目录有哪些 TypeScript 文件");
Python 版本:
import asyncio
import json
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
SystemMessage,
AssistantMessage,
ResultMessage,
)
async def process_streaming(prompt: str) -> str | None:
session_id = None
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Bash", "Glob", "Grep"],
max_turns=10,
),
):
# 第一类:启动消息
if isinstance(message, SystemMessage):
session_id = message.session_id
print(f"[启动] 会话 ID: {session_id}")
print(f"[启动] 可用工具: {', '.join(message.tools)}")
# 第二类:Agent 的回复
elif isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
print(f"[思考] {block['text']}")
elif block["type"] == "tool_use":
print(f"[工具] {block['name']}({json.dumps(block['input'], ensure_ascii=False)})")
elif block["type"] == "tool_result":
content = block.get("content", "")
print(f"[结果] {content[:200]}...")
# 第三类:最终结果
elif isinstance(message, ResultMessage):
if message.subtype == "success":
print(f"[完成] 耗时 {message.duration_ms}ms")
print(f"[费用] 输入 {message.cost['input_tokens']} tokens, 输出 {message.cost['output_tokens']} tokens")
else:
print(f"[失败] {message.error}")
return session_id
asyncio.run(process_streaming("帮我看看当前目录有哪些 Python 文件"))
7.3 单次输出详解 —— 干完了再说
什么时候用单次模式?
流式模式虽然体验好,但不是所有场景都需要。以下场景用单次模式更合适:
- 后台批处理 —— 比如用 Agent 批量分析100个文件,你不需要盯着每个文件的分析过程
- API 服务 —— 你的服务接收请求,调用 Agent,返回最终结果给客户端
- 定时任务 —— 每天凌晨3点让 Agent 生成报告,你又不在电脑前看
- 管道处理 —— Agent 的输出是另一个程序的输入,中间过程没人看
实现单次模式
思路很简单:把流式消息全部收集起来,最后一起返回。
TypeScript 版本:
import {
query,
AssistantMessage,
ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";
// 辅助函数:把流式消息收集成一个结果
async function queryOnce(prompt: string): Promise<{
response: string;
success: boolean;
cost: { inputTokens: number; outputTokens: number };
}> {
const assistantTexts: string[] = [];
let result: ResultMessage | null = null;
for await (const message of query({
prompt,
options: {
allowedTools: ["Read", "Bash", "Glob", "Grep"],
maxTurns: 10,
},
})) {
if (message instanceof AssistantMessage) {
// 只收集文本内容,工具调用的过程我们不关心
for (const block of message.content) {
if (block.type === "text") {
assistantTexts.push(block.text);
}
}
} else if (message instanceof ResultMessage) {
result = message;
}
}
return {
response: assistantTexts.join("\n"),
success: result?.subtype === "success",
cost: {
inputTokens: result?.cost?.inputTokens ?? 0,
outputTokens: result?.cost?.outputTokens ?? 0,
},
};
}
// 使用起来超简单
async function main() {
const { response, success, cost } = await queryOnce(
"分析一下 package.json 中的依赖,哪些可以升级?"
);
if (success) {
console.log("分析结果:", response);
console.log(`花费:${cost.inputTokens + cost.outputTokens} tokens`);
} else {
console.log("Agent 执行失败了");
}
}
main();
Python 版本:
import asyncio
from dataclasses import dataclass
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
@dataclass
class QueryResult:
response: str
success: bool
input_tokens: int
output_tokens: int
async def query_once(prompt: str) -> QueryResult:
"""把流式消息收集成一个单次结果"""
assistant_texts: list[str] = []
result = None
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Bash", "Glob", "Grep"],
max_turns=10,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
assistant_texts.append(block["text"])
elif isinstance(message, ResultMessage):
result = message
return QueryResult(
response="\n".join(assistant_texts),
success=result.subtype == "success" if result else False,
input_tokens=result.cost.get("input_tokens", 0) if result else 0,
output_tokens=result.cost.get("output_tokens", 0) if result else 0,
)
# 使用
async def main():
result = await query_once("分析一下 package.json 中的依赖,哪些可以升级?")
if result.success:
print(f"分析结果:{result.response}")
print(f"花费:{result.input_tokens + result.output_tokens} tokens")
else:
print("Agent 执行失败了")
asyncio.run(main())
批量处理示例
单次模式的一大优势就是做批处理特别方便。来看一个实际场景 —— 批量分析多个文件的代码质量:
import asyncio
import json
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
async def analyze_file(file_path: str) -> dict:
"""分析单个文件的代码质量"""
texts = []
async for message in query(
prompt=f"请分析文件 {file_path} 的代码质量,给出评分(1-10)和改进建议。请用 JSON 格式回复。",
options=ClaudeAgentOptions(
allowed_tools=["Read"],
max_turns=3,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
texts.append(block["text"])
return {"file": file_path, "analysis": "\n".join(texts)}
async def batch_analyze(files: list[str]) -> list[dict]:
"""并行分析多个文件"""
# 用 asyncio.gather 并行处理,效率翻倍
tasks = [analyze_file(f) for f in files]
results = await asyncio.gather(*tasks)
return list(results)
# 使用
async def main():
files = [
"src/index.ts",
"src/utils.ts",
"src/config.ts",
"src/router.ts",
]
print("开始批量分析...")
results = await batch_analyze(files)
print("\n=== 分析报告 ===")
for r in results:
print(f"\n文件:{r['file']}")
print(f"分析:{r['analysis'][:300]}...")
asyncio.run(main())
这里用了 asyncio.gather() 来并行分析多个文件,速度会快很多。这就是单次模式的优势 —— 代码简单、容易组合、适合批处理。
7.4 实时显示 Agent 工作过程 —— 打造你的 Agent Monitor
上面讲的流式处理只是把消息打印出来,不够酷。现在来做一个真正实用的 "Agent Monitor"—— 在终端里实时展示 Agent 的思考、工具调用和执行结果,就像你在 Claude Code 里看到的那样。
启用部分消息(Partial Messages)
默认情况下,流式输出给你的是"完整的消息" —— Agent 一个完整的回复说完了,你才收到。这就好比直播虽然是实时的,但画面是一帧一帧发的,不是一个像素一个像素发的。
如果你想要"逐字显示"Agent 正在打字的效果(就像 ChatGPT 那样一个字一个字往外蹦),需要启用 partial messages(部分消息):
// TypeScript
const options = {
includePartialMessages: true, // 开启逐字流式
allowedTools: ["Read", "Bash"],
};
# Python
options = ClaudeAgentOptions(
include_partial_messages=True, # 开启逐字流式
allowed_tools=["Read", "Bash"],
)
启用后,你会额外收到一种特殊的消息类型 —— StreamEvent。这是 Claude API 原始的流式事件,包含最细粒度的数据。
StreamEvent 的事件类型
StreamEvent 包装了 Claude API 的原始流式事件,常见的事件类型有:
| 事件类型 | 含义 | 你能拿到什么 |
|---|---|---|
message_start |
开始新一轮回复 | 模型信息、usage 信息 |
content_block_start |
开始一个新内容块 | 块的类型(text 或 tool_use)、工具名 |
content_block_delta |
内容块的增量更新 | 一小段文字、一段 JSON 片段 |
content_block_stop |
一个内容块结束了 | 无 |
message_delta |
消息级别的更新 | stop_reason |
message_stop |
本轮回复结束 | 无 |
消息流的顺序是这样的:
message_start
→ content_block_start (text)
→ content_block_delta (文字片段1)
→ content_block_delta (文字片段2)
→ content_block_delta (文字片段3)
→ content_block_stop
→ content_block_start (tool_use)
→ content_block_delta (JSON 片段1)
→ content_block_delta (JSON 片段2)
→ content_block_stop
→ message_delta (stop_reason: "tool_use")
→ message_stop
... Agent 执行工具,拿到结果,开始新一轮 ...
message_start
→ content_block_start (text)
→ content_block_delta (最终回复片段1)
→ content_block_delta (最终回复片段2)
→ content_block_stop
→ message_delta (stop_reason: "end_turn")
→ message_stop
完整的 Agent Monitor 示例
下面来做一个实际可用的终端 Agent Monitor。它能做到:
- 逐字显示 Agent 说的话(像 ChatGPT 的打字效果)
- 显示 Agent 正在调用什么工具
- 显示工具执行的结果(截断过长内容)
- 用不同的前缀区分不同类型的信息
Python 版本(完整可运行):
import asyncio
import sys
import json
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
SystemMessage,
AssistantMessage,
ResultMessage,
)
from claude_agent_sdk.types import StreamEvent
class AgentMonitor:
"""终端 Agent 监控器 —— 实时显示 Agent 的工作过程"""
def __init__(self):
self.in_tool = False
self.current_tool_name = ""
self.turn_count = 0
def print_header(self, text: str):
"""打印分隔线标题"""
width = 60
print(f"\n{'─' * width}")
print(f" {text}")
print(f"{'─' * width}")
def print_status(self, label: str, text: str):
"""打印状态信息"""
print(f" [{label}] {text}")
async def run(self, prompt: str):
"""启动 Agent 并实时显示输出"""
self.print_header(f"Agent Monitor - 任务开始")
print(f" 提示词: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Write", "Bash", "Glob", "Grep"],
max_turns=10,
)
async for message in query(prompt=prompt, options=options):
self._handle_message(message)
self.print_header("Agent Monitor - 任务结束")
def _handle_message(self, message):
"""根据消息类型分发处理"""
if isinstance(message, SystemMessage):
self._handle_system(message)
elif isinstance(message, StreamEvent):
self._handle_stream_event(message)
elif isinstance(message, AssistantMessage):
self._handle_assistant(message)
elif isinstance(message, ResultMessage):
self._handle_result(message)
def _handle_system(self, message: SystemMessage):
"""处理启动消息"""
self.print_status("系统", f"会话已创建: {message.session_id[:16]}...")
self.print_status("系统", f"模型: {message.model}")
self.print_status("系统", f"工具: {', '.join(message.tools)}")
print()
def _handle_stream_event(self, message: StreamEvent):
"""处理流式事件 —— 这是实现逐字显示的关键"""
event = message.event
event_type = event.get("type")
if event_type == "content_block_start":
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
# 工具调用开始
self.current_tool_name = content_block.get("name", "未知工具")
self.in_tool = True
sys.stdout.write(f"\n [工具调用] {self.current_tool_name}")
sys.stdout.flush()
elif content_block.get("type") == "text":
# 文字内容开始
if not self.in_tool:
sys.stdout.write("\n [Agent] ")
sys.stdout.flush()
elif event_type == "content_block_delta":
delta = event.get("delta", {})
if delta.get("type") == "text_delta" and not self.in_tool:
# 逐字输出 Agent 说的话
text = delta.get("text", "")
sys.stdout.write(text)
sys.stdout.flush()
elif event_type == "content_block_stop":
if self.in_tool:
sys.stdout.write(" ... 执行中\n")
sys.stdout.flush()
self.in_tool = False
elif event_type == "message_delta":
# 一轮回复结束
stop_reason = event.get("delta", {}).get("stop_reason")
if stop_reason == "tool_use":
self.turn_count += 1
print(f"\n [循环] 第 {self.turn_count} 轮工具调用")
def _handle_assistant(self, message: AssistantMessage):
"""处理完整的 Assistant 消息(工具结果等)"""
for block in message.content:
if block.get("type") == "tool_result":
content = block.get("content", "")
if isinstance(content, str) and len(content) > 200:
content = content[:200] + "... (已截断)"
self.print_status("工具结果", str(content)[:300])
def _handle_result(self, message: ResultMessage):
"""处理最终结果"""
print()
if message.subtype == "success":
self.print_status("完成", "任务成功完成!")
if hasattr(message, "cost") and message.cost:
input_t = message.cost.get("input_tokens", 0)
output_t = message.cost.get("output_tokens", 0)
self.print_status("费用", f"输入 {input_t} tokens, 输出 {output_t} tokens")
if hasattr(message, "duration_ms") and message.duration_ms:
self.print_status("耗时", f"{message.duration_ms}ms")
else:
self.print_status("失败", f"错误: {message.error}")
# 运行
async def main():
monitor = AgentMonitor()
await monitor.run("帮我看看当前目录的文件结构,然后找到所有 Python 文件并统计行数")
asyncio.run(main())
运行效果大概长这样:
────────────────────────────────────────────────────────────
Agent Monitor - 任务开始
────────────────────────────────────────────────────────────
提示词: 帮我看看当前目录的文件结构,然后找到所有 Python 文件并统计行数
[系统] 会话已创建: sess_7f3a2b1c...
[系统] 模型: claude-sonnet-4-20250514
[系统] 工具: Read, Write, Bash, Glob, Grep
[Agent] 好的,让我先看看当前目录的文件结构。
[工具调用] Bash ... 执行中
[循环] 第 1 轮工具调用
[工具结果] total 42\ndrwxr-xr-x 5 user staff 160 Feb 25 src/...
[Agent] 我看到了项目结构。现在让我找到所有 Python 文件并统计行数。
[工具调用] Glob ... 执行中
[工具调用] Bash ... 执行中
[循环] 第 2 轮工具调用
[工具结果] 12 files found...
[工具结果] 总计:1,234 行...
[Agent] 分析完成!当前目录下有 12 个 Python 文件,总共 1,234 行代码...
[完成] 任务成功完成!
[费用] 输入 2341 tokens, 输出 567 tokens
[耗时] 4521ms
────────────────────────────────────────────────────────────
Agent Monitor - 任务结束
────────────────────────────────────────────────────────────
TypeScript 版本的 Agent Monitor
import {
query,
SystemMessage,
AssistantMessage,
ResultMessage,
StreamEvent,
} from "@anthropic-ai/claude-agent-sdk";
class AgentMonitor {
private inTool = false;
private turnCount = 0;
async run(prompt: string) {
console.log("\n" + "─".repeat(60));
console.log(` Agent Monitor - 任务开始`);
console.log("─".repeat(60));
console.log(` 提示词: ${prompt.substring(0, 80)}`);
for await (const message of query({
prompt,
options: {
includePartialMessages: true,
allowedTools: ["Read", "Write", "Bash", "Glob", "Grep"],
maxTurns: 10,
},
})) {
if (message instanceof SystemMessage) {
console.log(` [系统] 会话: ${message.sessionId.substring(0, 16)}...`);
console.log(` [系统] 模型: ${message.model}`);
} else if (message instanceof StreamEvent) {
this.handleStreamEvent(message);
} else if (message instanceof AssistantMessage) {
this.handleAssistant(message);
} else if (message instanceof ResultMessage) {
this.handleResult(message);
}
}
console.log("\n" + "─".repeat(60));
console.log(" Agent Monitor - 任务结束");
console.log("─".repeat(60));
}
private handleStreamEvent(message: StreamEvent) {
const event = message.event;
switch (event.type) {
case "content_block_start":
if (event.content_block?.type === "tool_use") {
this.inTool = true;
process.stdout.write(`\n [工具调用] ${event.content_block.name}`);
} else if (!this.inTool) {
process.stdout.write("\n [Agent] ");
}
break;
case "content_block_delta":
if (event.delta?.type === "text_delta" && !this.inTool) {
process.stdout.write(event.delta.text || "");
}
break;
case "content_block_stop":
if (this.inTool) {
process.stdout.write(" ... 执行中\n");
this.inTool = false;
}
break;
case "message_delta":
if (event.delta?.stop_reason === "tool_use") {
this.turnCount++;
console.log(`\n [循环] 第 ${this.turnCount} 轮工具调用`);
}
break;
}
}
private handleAssistant(message: AssistantMessage) {
for (const block of message.content) {
if (block.type === "tool_result") {
const content = typeof block.content === "string"
? block.content.substring(0, 200)
: JSON.stringify(block.content).substring(0, 200);
console.log(` [工具结果] ${content}...`);
}
}
}
private handleResult(message: ResultMessage) {
console.log();
if (message.subtype === "success") {
console.log(" [完成] 任务成功完成!");
if (message.cost) {
console.log(` [费用] 输入 ${message.cost.inputTokens} tokens, 输出 ${message.cost.outputTokens} tokens`);
}
} else {
console.log(` [失败] ${message.error}`);
}
}
}
// 运行
const monitor = new AgentMonitor();
monitor.run("帮我看看当前目录的文件结构");
给终端加点颜色(可选)
如果你想让终端输出更好看,可以用 ANSI 转义码加颜色。这是纯终端的做法,不需要安装任何库:
# ANSI 颜色常量
class Color:
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
# 用法示例
def print_colored(label: str, text: str, color: str):
print(f" {color}{Color.BOLD}[{label}]{Color.RESET} {text}")
# 效果:
# [系统] 会话已创建 ← 蓝色
# [Agent] 让我想想... ← 绿色
# [工具调用] Bash ← 黄色
# [工具结果] total 42 ← 青色
# [完成] 任务成功! ← 绿色
# [失败] 出错了 ← 红色
print_colored("系统", "会话已创建", Color.BLUE)
print_colored("Agent", "让我想想...", Color.GREEN)
print_colored("工具调用", "Bash(ls -la)", Color.YELLOW)
print_colored("完成", "任务成功!", Color.GREEN)
print_colored("失败", "出错了", Color.RED)
7.5 Stop Reasons —— Agent 为什么停下来了?
Agent 不会永远跑下去(那就成死循环了)。它总会停下来,而 stop_reason 就是告诉你"它为什么停了"。
这就像你问一个快递员"你为什么停下来了":
- "送到了啊" →
end_turn - "字写太多纸不够了" →
max_tokens - "需要去仓库拿个东西" →
tool_use - "遇到了禁止通行的牌子" →
stop_sequence - "我不想送这个包裹" →
refusal
各种 Stop Reason 详解
1. end_turn —— 正常完成
# 这是最理想的情况:Agent 觉得任务完成了,主动停下来
{
"stop_reason": "end_turn"
}
出现场景: Agent 认为它已经回答了你的问题,或者完成了你交代的任务。
你需要做什么: 什么都不用做,一切正常。
2. max_tokens —— 回复太长,被截断了
# Agent 话还没说完,但是输出长度已经到上限了
{
"stop_reason": "max_tokens"
}
出现场景: Agent 要说的话太多了。比如你让它写一篇很长的文章,或者分析一个很大的文件。
你需要做什么: 考虑以下几种处理方式:
- 把任务拆小一点,比如"先分析前50行"
- 增加
max_tokens限制(如果 API 支持) - 让 Agent 继续说(通过恢复会话,第8章会讲)
# 处理 max_tokens 的示例
async for message in query(prompt=prompt, options=options):
if isinstance(message, ResultMessage):
if message.stop_reason == "max_tokens":
print("Agent 的回复被截断了,你可能需要:")
print("1. 让任务更具体,减少输出量")
print("2. 恢复会话让 Agent 继续说")
3. tool_use —— 需要用工具
# Agent 话说到一半,发现需要调工具
{
"stop_reason": "tool_use"
}
出现场景: Agent 在回复过程中决定要调用一个工具。这是 Agent 循环的正常一环。
你需要做什么: 通常你什么都不用做 —— SDK 会自动处理。Agent 调完工具,拿到结果,会自己继续下一轮循环。这个 stop_reason 你一般在 AssistantMessage 里看到,在 ResultMessage 里很少看到。
4. stop_sequence —— 遇到了停止词
# Agent 输出到某个特定字符串时被强制停了
{
"stop_reason": "stop_sequence"
}
出现场景: 你在配置中设置了 stop_sequences(停止序列),Agent 的输出恰好包含了这个序列。
你需要做什么: 检查输出是否完整。停止词是你自己设的,所以你应该知道为什么 Agent 会停在这里。
5. refusal —— Agent 拒绝了
# Agent 不想干这个活
{
"stop_reason": "refusal"
}
出现场景: 你让 Agent 做一些它认为不合适的事情。比如生成有害内容、帮忙做违法的事等。
你需要做什么: 尊重 Agent 的拒绝。重新考虑你的请求是否合理。
完整的 Stop Reason 处理模板
下面是一个"生产级"的 Stop Reason 处理代码,你可以直接用在自己的项目里:
Python 版本:
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage, AssistantMessage
async def robust_query(prompt: str, max_retries: int = 2) -> str:
"""
健壮的 Agent 查询函数,优雅处理各种停止原因。
特性:
- max_tokens 时自动提示
- refusal 时给出说明
- 其他异常情况有兜底处理
"""
texts: list[str] = []
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Bash", "Glob", "Grep"],
max_turns=15,
),
):
# 收集 Agent 的文本输出
if isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
texts.append(block["text"])
# 处理最终结果
elif isinstance(message, ResultMessage):
stop_reason = message.stop_reason
if message.subtype == "success":
# 正常完成
if stop_reason == "end_turn":
return "\n".join(texts)
elif stop_reason == "max_tokens":
# 被截断了,返回已有内容并附加提示
truncated = "\n".join(texts)
return (
truncated
+ "\n\n[注意:Agent 的回复因长度限制被截断。"
+ "你可以恢复会话继续获取剩余内容。]"
)
elif stop_reason == "stop_sequence":
# 遇到停止词,正常返回
return "\n".join(texts)
else:
# 其他成功情况
return "\n".join(texts)
else:
# 失败情况
if stop_reason == "refusal":
return "[Agent 拒绝了这个请求。请检查你的提示词是否包含不当内容。]"
elif "max_turns" in str(message.error or ""):
return (
"\n".join(texts)
+ "\n\n[注意:Agent 达到了最大循环次数限制。"
+ "任务可能未完成,请增加 max_turns 或简化任务。]"
)
else:
error_msg = message.error or "未知错误"
return f"[Agent 执行失败:{error_msg}]"
# 如果走到这里,说明根本没有 ResultMessage
return "[异常:Agent 没有返回结果消息]"
# 使用
async def main():
# 正常任务
result = await robust_query("当前目录有哪些文件?")
print(result)
# 可能被截断的长任务
result = await robust_query("详细分析整个项目的代码架构")
print(result)
asyncio.run(main())
TypeScript 版本:
import {
query,
AssistantMessage,
ResultMessage,
} from "@anthropic-ai/claude-agent-sdk";
async function robustQuery(prompt: string): Promise<string> {
const texts: string[] = [];
for await (const message of query({
prompt,
options: {
allowedTools: ["Read", "Bash", "Glob", "Grep"],
maxTurns: 15,
},
})) {
if (message instanceof AssistantMessage) {
for (const block of message.content) {
if (block.type === "text") {
texts.push(block.text);
}
}
} else if (message instanceof ResultMessage) {
const stopReason = message.stopReason;
if (message.subtype === "success") {
switch (stopReason) {
case "end_turn":
return texts.join("\n");
case "max_tokens":
return texts.join("\n")
+ "\n\n[注意:Agent 的回复因长度限制被截断。]";
default:
return texts.join("\n");
}
} else {
if (stopReason === "refusal") {
return "[Agent 拒绝了这个请求]";
}
return `[Agent 执行失败:${message.error || "未知错误"}]`;
}
}
}
return "[异常:没有收到结果]";
}
7.6 流式 vs 单次 —— 选择指南
对比总结表
| 维度 | 流式(Streaming) | 单次(Single) |
|---|---|---|
| 用户体验 | 好,能看到实时进度 | 差,只有"加载中"和"完成" |
| 首字节延迟 | 低(毫秒级) | 高(需等全部完成) |
| 代码复杂度 | 中等,要处理多种事件 | 低,收集完一次处理 |
| 错误处理 | 可以中途发现问题 | 只能最后才知道 |
| 适合长任务 | 是,用户不会觉得卡死 | 否,长时间无反馈 |
| 适合批处理 | 否,增加不必要的复杂度 | 是,代码简单易组合 |
| 资源占用 | 持续占用连接 | 可以释放后再读取 |
| 可中断性 | 好,随时可以停 | 差,要么等完,要么全部取消 |
决策流程图
你的 Agent 输出给谁看?
│
├─→ 给人看(终端/网页/聊天)
│ │
│ ├─→ 任务可能超过 5 秒?
│ │ ├─→ 是 → 【用流式】(不然用户以为卡死了)
│ │ └─→ 否 → 【都行】(快的话差别不大)
│ │
│ └─→ 需要逐字显示效果?
│ ├─→ 是 → 【用流式 + partial messages】
│ └─→ 否 → 【用流式】(普通消息级别就够)
│
└─→ 给程序看(API/管道/批处理)
│
├─→ 需要中途干预?
│ ├─→ 是 → 【用流式】(可以监控 + 中断)
│ └─→ 否 → 【用单次】(简单省事)
│
└─→ 多个任务并行?
├─→ 是 → 【用单次 + asyncio.gather】
└─→ 否 → 【用单次】
性能考量
流式模式不会更快 —— Agent 干活的总时间是一样的。流式只是让你"更早看到中间结果",并不会让 Agent 跑得更快。
流式模式略微增加网络开销 —— 因为数据是一小块一小块发的,HTTP 层面的开销会多一点。但这个差异在绝大多数场景下可以忽略。
partial messages 有额外开销 —— 启用
include_partial_messages后,消息数量会大幅增加(从几条变成几十上百条)。如果你不需要逐字效果,不要开这个选项。批处理用单次模式更高效 —— 因为代码更简单,可以方便地用
asyncio.gather()并行多个任务,反而比流式逐个处理快得多。
一个混合模式的实战示例
有时候你两种都需要 —— 在终端给用户看流式进度,同时把结果收集起来存到数据库。下面是一个混合模式的例子:
import asyncio
from claude_agent_sdk import (
query,
ClaudeAgentOptions,
SystemMessage,
AssistantMessage,
ResultMessage,
)
async def hybrid_query(prompt: str) -> dict:
"""
混合模式:一边流式显示进度,一边收集完整结果。
返回结构化的结果,同时终端能看到实时进度。
"""
collected_texts: list[str] = []
tool_calls: list[dict] = []
session_id = None
cost_info = None
async for message in query(
prompt=prompt,
options=ClaudeAgentOptions(
allowed_tools=["Read", "Bash", "Glob", "Grep"],
max_turns=10,
),
):
if isinstance(message, SystemMessage):
session_id = message.session_id
# 流式显示
print(f"[Agent 已就绪] 会话: {session_id[:12]}...")
elif isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
text = block["text"]
collected_texts.append(text)
# 流式显示
print(f"[Agent] {text[:100]}{'...' if len(text) > 100 else ''}")
elif block["type"] == "tool_use":
tool_info = {
"name": block["name"],
"input": block["input"],
}
tool_calls.append(tool_info)
# 流式显示
print(f"[工具] {block['name']}({str(block['input'])[:80]})")
elif block["type"] == "tool_result":
# 流式显示
content = str(block.get("content", ""))[:150]
print(f"[结果] {content}")
elif isinstance(message, ResultMessage):
cost_info = message.cost if hasattr(message, "cost") else None
if message.subtype == "success":
print("[完成] 任务成功!")
else:
print(f"[失败] {message.error}")
# 返回收集好的结构化结果
return {
"session_id": session_id,
"response": "\n".join(collected_texts),
"tool_calls": tool_calls,
"tool_call_count": len(tool_calls),
"cost": cost_info,
}
async def main():
# 终端能看到实时进度
result = await hybrid_query("分析当前目录的 README.md 文件")
# 同时也拿到了完整结果,可以存数据库或做后续处理
print(f"\n=== 汇总 ===")
print(f"总共调用了 {result['tool_call_count']} 次工具")
print(f"回复长度: {len(result['response'])} 字符")
# save_to_database(result) # 存数据库
asyncio.run(main())
注意事项与限制
使用流式输出时,有几个点需要注意:
结构化输出(Structured Output)和流式 partial messages 不兼容 —— 如果你用了
outputFormat来要求 JSON 输出,就不能同时开include_partial_messages。这两个功能互斥。StreamEvent 的格式可能随 API 版本变化 ——
StreamEvent.event里的内容是 Claude API 原始的流式事件格式。如果 Anthropic 更新了 API,事件格式可能会变。建议做好兼容处理。subagent 的流式事件带有 parent_tool_use_id —— 如果你的 Agent 有子 Agent(第13章会讲),子 Agent 的流式事件会带一个
parent_tool_use_id字段,让你知道这是哪个子 Agent 产生的。网络断开的处理 —— 流式连接是长连接,如果网络波动断了,你需要做好重连逻辑。单次模式在这方面更健壮,因为它是一次性请求。
动手练习
练习1:打造终端 Agent UI
目标: 做一个漂亮的终端 Agent 界面,能实时显示 Agent 的思考和行动。
要求:
- 用不同颜色区分不同类型的消息(思考=绿色,工具=黄色,结果=青色,错误=红色)
- Agent 的文字要有"逐字打字"效果
- 工具调用时显示一个简单的旋转动画(比如
|/-\) - 在任务结束时显示统计信息(耗时、Token 用量、工具调用次数)
提示:
import asyncio
import sys
# 旋转动画
spinner_chars = ["|", "/", "-", "\\"]
async def show_spinner(message: str):
"""显示旋转动画,直到被取消"""
i = 0
try:
while True:
sys.stdout.write(f"\r {spinner_chars[i % 4]} {message}")
sys.stdout.flush()
i += 1
await asyncio.sleep(0.1)
except asyncio.CancelledError:
sys.stdout.write(f"\r ✓ {message} 完成\n")
sys.stdout.flush()
# 使用方式
# spinner_task = asyncio.create_task(show_spinner("正在执行 Bash..."))
# ... 等工具执行完 ...
# spinner_task.cancel()
骨架代码:
import asyncio
import sys
import time
from claude_agent_sdk import query, ClaudeAgentOptions
from claude_agent_sdk.types import StreamEvent
class TerminalAgentUI:
def __init__(self):
self.tool_count = 0
self.start_time = None
# 你的代码...
async def run(self, prompt: str):
self.start_time = time.time()
# 实现流式处理...
# 用颜色、旋转动画、逐字效果让输出好看
def show_stats(self, result):
# 显示统计信息
elapsed = time.time() - self.start_time
# 你的代码...
async def main():
ui = TerminalAgentUI()
await ui.run("帮我看看当前项目的 Git 提交历史,总结最近5次提交")
asyncio.run(main())
练习2:批量文件处理器
目标: 用单次模式批量处理多个文件,生成一份汇总报告。
要求:
- 接收一个目录路径,找到其中所有的
.py或.ts文件 - 用 Agent 逐个分析每个文件(并行处理,最多5个并发)
- 分析内容包括:文件用途、代码行数、主要函数/类、潜在问题
- 所有文件分析完后,生成一份 Markdown 格式的汇总报告
提示:
import asyncio
async def batch_process_with_limit(tasks, max_concurrent=5):
"""带并发限制的批量处理"""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_task(task):
async with semaphore:
return await task
return await asyncio.gather(*[limited_task(t) for t in tasks])
骨架代码:
import asyncio
from claude_agent_sdk import query, ClaudeAgentOptions, AssistantMessage, ResultMessage
async def analyze_single_file(file_path: str) -> dict:
"""用 Agent 分析单个文件"""
texts = []
async for message in query(
prompt=f"请分析文件 {file_path},告诉我:1. 文件用途 2. 代码行数 3. 主要函数/类 4. 潜在问题",
options=ClaudeAgentOptions(
allowed_tools=["Read"],
max_turns=3,
),
):
if isinstance(message, AssistantMessage):
for block in message.content:
if block["type"] == "text":
texts.append(block["text"])
return {"file": file_path, "analysis": "\n".join(texts)}
async def generate_report(results: list[dict]) -> str:
"""生成 Markdown 汇总报告"""
# 你的代码...
report = "# 代码分析报告\n\n"
for r in results:
report += f"## {r['file']}\n\n{r['analysis']}\n\n---\n\n"
return report
async def main():
# 1. 找到所有目标文件(可以用 Glob 工具或 os.walk)
files = [...] # 你的代码
# 2. 并行分析(最多5个并发)
# 你的代码...
# 3. 生成报告
# 你的代码...
asyncio.run(main())
本章小结
这一章我们深入学习了 Agent 的两种输出模式,来回顾一下关键知识点:
流式 vs 单次:流式像看直播,一边干一边看;单次像看录播,干完一次性给你。交互式场景用流式,后台任务用单次。
三种消息类型:
SystemMessage:开场白,包含会话 IDAssistantMessage:Agent 说的话和调的工具,是最丰富的消息类型ResultMessage:终场总结,包含成功/失败状态和费用信息
Partial Messages:开启
include_partial_messages后,你能拿到StreamEvent,实现逐字打字效果。事件类型包括content_block_start/delta/stop和message_start/delta/stop。Stop Reasons:
end_turn:正常完成max_tokens:输出太长被截断tool_use:需要调工具(循环中的正常状态)stop_sequence:遇到停止词refusal:Agent 拒绝执行
混合模式:可以一边流式显示给用户看,一边收集结果做后续处理。
性能提示:流式不会让 Agent 跑得更快,但能让用户"感觉"更快。批处理场景用单次模式 + 并行执行效率更高。
下一章预告
下一章我们学 会话管理 —— 让 Agent 记住上下文。
你有没有过这种体验:跟客服聊了半天,结果网页刷新了,要从头再来?Agent 也有这个问题。下一章我们学的 Session(会话)机制,就是让 Agent 能"保存进度"、"恢复对话",甚至"开分支试不同方案"。
具体来说,你将学到:
- 如何获取和保存 Session ID
- 如何恢复一个中断的会话(resume)
- 如何分叉一个会话来尝试不同方案(fork)
- 会话的生命周期管理
翻到下一页 → 第8章:会话管理