第19章:动手实现 —— 一行一行把 MiniClaw 写出来
一句话:从零编写 MiniClaw 的完整代码,做出一个真正能用的 AI 助手。
本章目标
- 从空文件夹开始,一步步建起整个 MiniClaw 项目
- 每一个文件的每一行代码都有中文注释,看完就能理解
- 代码全部可以直接复制粘贴运行,不需要"脑补"任何缺失的部分
- 完成后你将拥有一个真正能用的 AI 助手 —— 通过 Telegram 或命令行和它对话
前置知识
- 需要先看完第18章(项目设计)
- 需要先看完第4-9章(SDK 核心功能)
- 需要先看完第10章(自定义工具)
- 需要对 TypeScript、Node.js 有基本了解
好了,前面学了那么多章,终于到了最激动人心的时刻 —— 写代码!
这一章是整本教程最长、最重要的一章。我们要从一个空文件夹开始,一行一行地把 MiniClaw 的每个文件写出来。写完之后,你就拥有一个真正能工作的 AI 助手了。
不用紧张,我会把每一行代码都讲清楚。跟着敲就行。
19.1 项目初始化 —— 从零开始
第一步:创建项目目录,初始化 Node.js 项目
打开终端,运行下面的命令:
# 创建项目目录并进入
mkdir miniclaw && cd miniclaw
# 初始化 Node.js 项目(-y 表示全部使用默认设置)
pnpm init
第二步:安装依赖
MiniClaw 用到的库不多,总共就这几个:
# 安装运行时依赖
pnpm add @anthropic-ai/claude-agent-sdk better-sqlite3 grammy dotenv pino zod cron-parser
# 安装开发时依赖
pnpm add -D typescript @types/node @types/better-sqlite3 tsx
每个库是干什么的,解释一下:
| 库 | 干什么的 | 为什么选它 |
|---|---|---|
@anthropic-ai/claude-agent-sdk |
调用 Claude Agent | 本教程的主角 |
better-sqlite3 |
SQLite 数据库 | 轻量级,不需要额外安装数据库服务 |
grammy |
Telegram Bot 框架 | API 友好,文档全,TypeScript 支持好 |
dotenv |
读取 .env 文件 | 管理环境变量的标准方案 |
pino |
日志库 | 性能好,JSON 格式,方便后续分析 |
zod |
数据校验 | TypeScript 类型安全的校验库 |
cron-parser |
解析 cron 表达式 | 定时任务要用到 |
第三步:创建目录结构
# 创建源代码目录
mkdir -p src/channels src/agent
# 创建数据目录
mkdir -p groups/main data
# 创建主群组的 CLAUDE.md(Agent 的"记忆文件")
echo "# 主群组记忆\n\n这里记录重要信息。" > groups/main/CLAUDE.md
创建完后,你的项目结构应该长这样:
miniclaw/
├── src/
│ ├── channels/ # 消息渠道(Telegram、命令行)
│ │ ├── telegram.ts
│ │ └── cli.ts
│ ├── agent/ # Agent 相关
│ │ ├── runner.ts # Agent 运行器(核心!)
│ │ ├── container.ts# 容器管理
│ │ └── tools.ts # 自定义工具
│ ├── config.ts # 配置管理
│ ├── db.ts # 数据库操作
│ ├── router.ts # 消息路由
│ ├── queue.ts # 任务队列
│ ├── scheduler.ts # 定时任务
│ └── index.ts # 主入口
├── groups/ # 每个群的记忆文件
│ └── main/
│ └── CLAUDE.md
├── data/ # 数据存储目录
├── package.json
├── tsconfig.json
├── .env.example
└── Dockerfile
第四步:TypeScript 配置
创建 tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"esModuleInterop": true,
"strict": true,
"outDir": "dist",
"rootDir": "src",
"declaration": true,
"sourceMap": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
这里面几个关键配置解释一下:
target: "ES2022"—— 使用较新的 JavaScript 特性,比如for awaitmodule: "ESNext"—— 使用 ES 模块(import/export),不用 CommonJS(require)strict: true—— 开启严格模式,帮你在编译时发现更多错误
第五步:配置 package.json 的 scripts
打开 package.json,在 scripts 里加入以下内容:
{
"name": "miniclaw",
"version": "1.0.0",
"type": "module",
"scripts": {
"dev": "tsx src/index.ts",
"dev:cli": "CHANNEL=cli tsx src/index.ts",
"build": "tsc",
"start": "node dist/index.js"
},
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.1.0",
"better-sqlite3": "^11.0.0",
"cron-parser": "^5.0.0",
"dotenv": "^16.0.0",
"grammy": "^1.30.0",
"pino": "^9.0.0",
"zod": "^3.23.0"
},
"devDependencies": {
"@types/better-sqlite3": "^7.0.0",
"@types/node": "^22.0.0",
"tsx": "^4.0.0",
"typescript": "^5.6.0"
}
}
注意 "type": "module" 这一行 —— 这告诉 Node.js 使用 ES 模块语法。
第六步:环境变量模板
创建 .env.example 文件,告诉使用者需要配置哪些环境变量:
# ===== 必填 =====
# Anthropic API Key —— 在 console.anthropic.com 获取
ANTHROPIC_API_KEY=sk-ant-xxx
# Telegram Bot Token —— 通过 @BotFather 创建 Bot 获取
TELEGRAM_BOT_TOKEN=123456:ABC-DEF
# ===== 选填(有默认值) =====
# 助手名字,也是触发词(默认:mini)
# 在群里发 "@mini 帮我查一下天气" 就会触发
ASSISTANT_NAME=mini
# 消息渠道:telegram 或 cli(默认:telegram)
CHANNEL=telegram
# 最大并发容器数(默认:3)
MAX_CONCURRENT_CONTAINERS=3
# 空闲超时时间,毫秒(默认:300000,即5分钟)
IDLE_TIMEOUT=300000
# 容器运行超时时间,毫秒(默认:600000,即10分钟)
CONTAINER_TIMEOUT=600000
# 数据库文件路径(默认:./data/miniclaw.db)
DB_PATH=./data/miniclaw.db
# 群组目录路径(默认:./groups)
GROUPS_DIR=./groups
# 使用的模型(默认:sonnet)
MODEL=sonnet
然后把它复制一份作为你的实际配置:
cp .env.example .env
# 编辑 .env,填入你的 API Key 和 Telegram Bot Token
好了,项目骨架搭好了。接下来我们一个文件一个文件地写代码。
19.2 配置管理(config.ts)—— 约 50 行
这是最简单的一个文件。它的职责就是:从环境变量读取配置,做好校验,然后导出给其他模块用。
创建 src/config.ts:
/**
* config.ts —— MiniClaw 配置管理
*
* 职责:
* 1. 从 .env 文件和环境变量中读取配置
* 2. 用 Zod 做校验,确保配置都是合法的
* 3. 导出配置对象给其他模块使用
*/
import "dotenv/config"; // 这一行会自动读取项目根目录的 .env 文件
import { z } from "zod";
import path from "path";
// ========== 用 Zod 定义配置的"模板" ==========
// Zod 会帮我们检查每个配置项是不是合法的
// 比如 MAX_CONCURRENT_CONTAINERS 必须是正整数,不能是 "abc"
const ConfigSchema = z.object({
// Telegram Bot Token —— 必填(除非使用 CLI 模式)
TELEGRAM_BOT_TOKEN: z.string().default(""),
// Anthropic API Key —— 必填
ANTHROPIC_API_KEY: z.string().min(1, "必须提供 ANTHROPIC_API_KEY"),
// 助手名字,同时也是触发词
// 在群里发 "@mini 你好" 就会触发 Agent
ASSISTANT_NAME: z.string().default("mini"),
// 消息渠道:telegram 或 cli
CHANNEL: z.enum(["telegram", "cli"]).default("telegram"),
// 最大并发任务数 —— 同时最多跑几个 Agent
// 设太大可能把你的 API 额度吃光,设太小用户要排队
MAX_CONCURRENT_CONTAINERS: z.coerce.number().int().positive().default(3),
// 空闲超时 —— Agent 多久没收到新消息就关闭(毫秒)
IDLE_TIMEOUT: z.coerce.number().int().positive().default(300000), // 5分钟
// 容器运行超时 —— Agent 最多跑多长时间就强制停止(毫秒)
CONTAINER_TIMEOUT: z.coerce.number().int().positive().default(600000), // 10分钟
// 数据库文件路径
DB_PATH: z.string().default("./data/miniclaw.db"),
// 群组记忆文件目录
GROUPS_DIR: z.string().default("./groups"),
// 使用的模型
MODEL: z.enum(["sonnet", "opus", "haiku"]).default("sonnet"),
});
// ========== 解析并校验配置 ==========
// 这里会从 process.env(环境变量)中读取所有配置
// 如果有配置不合法(比如必填项没填),程序会直接报错退出
const parseResult = ConfigSchema.safeParse(process.env);
if (!parseResult.success) {
console.error("配置错误!请检查你的 .env 文件:");
for (const issue of parseResult.error.issues) {
console.error(` - ${issue.path.join(".")}: ${issue.message}`);
}
process.exit(1);
}
// ========== 导出配置 ==========
// 其他文件只需要 import { config } from "./config.js" 就能用
export const config = parseResult.data;
// 把相对路径转成绝对路径,避免后续因为工作目录变化出问题
export const DB_PATH = path.resolve(config.DB_PATH);
export const GROUPS_DIR = path.resolve(config.GROUPS_DIR);
// 触发词的正则表达式
// 比如 ASSISTANT_NAME 是 "mini",那正则就是 /^@mini\b/i
// \b 是单词边界,确保 "@miniature" 不会误触发
// i 是大小写不敏感
function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
export const TRIGGER_PATTERN = new RegExp(
`^@${escapeRegex(config.ASSISTANT_NAME)}\\b`,
"i"
);
// 轮询间隔 —— 多久检查一次新消息(毫秒)
export const POLL_INTERVAL = 2000;
// 定时任务检查间隔 —— 多久检查一次是否有定时任务该执行了
export const SCHEDULER_POLL_INTERVAL = 60000; // 1分钟
代码讲解:
这个文件做了三件事:
- 读取配置:
dotenv/config会自动读取.env文件里的配置,放进process.env - 校验配置:用 Zod 定义每个配置项的类型和约束,如果不对就直接报错
- 导出配置:把处理好的配置导出给其他模块用
为什么用 Zod 而不是直接读 process.env?因为 process.env 里的值全是字符串,你需要自己转换类型(比如把 "3" 变成数字 3)。Zod 的 coerce 功能可以自动帮你做这个转换,还能校验值是否合法。
19.3 数据库层(db.ts)—— 约 110 行
这个文件负责所有数据库操作。MiniClaw 用 SQLite 作为数据库 —— 轻量、不需要额外安装服务、一个文件搞定。
创建 src/db.ts:
/**
* db.ts —— MiniClaw 数据库层
*
* 职责:
* 1. 初始化数据库和表结构
* 2. 提供消息的增删改查操作
* 3. 提供群组信息的存取
* 4. 提供会话(session)的存取
* 5. 提供定时任务的存取
*
* 用 better-sqlite3 操作 SQLite,它是同步的,比异步简单,
* 而且 SQLite 本来就是单线程的,同步操作反而更合适。
*/
import Database from "better-sqlite3";
import fs from "fs";
import path from "path";
import { DB_PATH } from "./config.js";
// 数据库实例 —— 在 initDatabase() 中初始化
let db: Database.Database;
// ========== 定义数据类型 ==========
/** 一条消息 */
export interface Message {
id: string; // 消息唯一ID
chat_id: string; // 聊天(群组)ID
sender: string; // 发送者ID
sender_name: string; // 发送者昵称
content: string; // 消息内容
timestamp: string; // 时间戳(ISO格式)
processed: number; // 是否已处理:0=未处理,1=已处理
}
/** 一个群组 */
export interface Group {
chat_id: string; // 群组ID
name: string; // 群组名称
folder: string; // 对应的文件夹名(在 groups/ 下)
trigger: string; // 触发词
created_at: string; // 创建时间
}
/** 一条定时任务 */
export interface ScheduledTask {
id: string; // 任务ID
group_folder: string; // 所属群组文件夹
chat_id: string; // 要发送结果的聊天ID
prompt: string; // 要执行的提示词
schedule_type: string; // 调度类型:cron / interval / once
schedule_value: string;// 调度表达式(如 "0 9 * * *" 或 "3600000")
next_run: string | null; // 下次执行时间
last_run: string | null; // 上次执行时间
last_result: string | null;// 上次执行结果
status: string; // 状态:active / paused / completed
created_at: string; // 创建时间
}
// ========== 建表语句 ==========
function createSchema(database: Database.Database): void {
database.exec(`
-- 消息表:存储所有聊天消息
CREATE TABLE IF NOT EXISTS messages (
id TEXT,
chat_id TEXT,
sender TEXT,
sender_name TEXT,
content TEXT,
timestamp TEXT,
processed INTEGER DEFAULT 0,
PRIMARY KEY (id, chat_id)
);
-- 按时间戳建索引,方便按时间查询
CREATE INDEX IF NOT EXISTS idx_msg_timestamp ON messages(timestamp);
-- 按处理状态建索引,方便查找未处理的消息
CREATE INDEX IF NOT EXISTS idx_msg_processed ON messages(processed);
-- 群组表:记录注册的群组信息
CREATE TABLE IF NOT EXISTS groups (
chat_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
folder TEXT NOT NULL UNIQUE,
trigger TEXT NOT NULL,
created_at TEXT NOT NULL
);
-- 会话表:记录每个群组的 Agent 会话ID
-- 有了 session_id,下次可以接着上次的对话继续
CREATE TABLE IF NOT EXISTS sessions (
group_folder TEXT PRIMARY KEY,
session_id TEXT NOT NULL
);
-- 定时任务表
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id TEXT PRIMARY KEY,
group_folder TEXT NOT NULL,
chat_id TEXT NOT NULL,
prompt TEXT NOT NULL,
schedule_type TEXT NOT NULL,
schedule_value TEXT NOT NULL,
next_run TEXT,
last_run TEXT,
last_result TEXT,
status TEXT DEFAULT 'active',
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_task_next_run ON scheduled_tasks(next_run);
CREATE INDEX IF NOT EXISTS idx_task_status ON scheduled_tasks(status);
`);
}
// ========== 初始化 ==========
/** 初始化数据库 —— 必须在程序启动时调用一次 */
export function initDatabase(): void {
// 确保数据目录存在
fs.mkdirSync(path.dirname(DB_PATH), { recursive: true });
// 打开(或创建)数据库文件
db = new Database(DB_PATH);
// 开启 WAL 模式 —— 提高并发读写性能
db.pragma("journal_mode = WAL");
// 创建表结构
createSchema(db);
}
// ========== 消息相关操作 ==========
/** 保存一条消息 */
export function saveMessage(msg: Omit<Message, "processed">): void {
db.prepare(`
INSERT OR REPLACE INTO messages (id, chat_id, sender, sender_name, content, timestamp, processed)
VALUES (?, ?, ?, ?, ?, ?, 0)
`).run(msg.id, msg.chat_id, msg.sender, msg.sender_name, msg.content, msg.timestamp);
}
/** 获取某个群组的未处理消息(按时间排序) */
export function getUnprocessedMessages(chatId: string): Message[] {
return db.prepare(`
SELECT * FROM messages
WHERE chat_id = ? AND processed = 0
ORDER BY timestamp ASC
`).all(chatId) as Message[];
}
/** 把消息标记为已处理 */
export function markAsProcessed(chatId: string): void {
db.prepare(`
UPDATE messages SET processed = 1
WHERE chat_id = ? AND processed = 0
`).run(chatId);
}
/** 获取某个群组最近 N 条消息(用于给 Agent 提供上下文) */
export function getRecentMessages(chatId: string, limit: number = 50): Message[] {
return db.prepare(`
SELECT * FROM messages
WHERE chat_id = ?
ORDER BY timestamp DESC
LIMIT ?
`).all(chatId, limit) as Message[];
}
// ========== 群组相关操作 ==========
/** 注册一个群组(或更新已有群组) */
export function saveGroup(group: Group): void {
db.prepare(`
INSERT OR REPLACE INTO groups (chat_id, name, folder, trigger, created_at)
VALUES (?, ?, ?, ?, ?)
`).run(group.chat_id, group.name, group.folder, group.trigger, group.created_at);
}
/** 根据 chat_id 查找群组 */
export function getGroup(chatId: string): Group | undefined {
return db.prepare(`SELECT * FROM groups WHERE chat_id = ?`).get(chatId) as Group | undefined;
}
/** 获取所有已注册的群组 */
export function getAllGroups(): Group[] {
return db.prepare(`SELECT * FROM groups`).all() as Group[];
}
// ========== 会话相关操作 ==========
/** 保存或更新某个群组的会话ID */
export function saveSession(groupFolder: string, sessionId: string): void {
db.prepare(`
INSERT OR REPLACE INTO sessions (group_folder, session_id)
VALUES (?, ?)
`).run(groupFolder, sessionId);
}
/** 获取某个群组的会话ID */
export function getSession(groupFolder: string): string | undefined {
const row = db.prepare(`
SELECT session_id FROM sessions WHERE group_folder = ?
`).get(groupFolder) as { session_id: string } | undefined;
return row?.session_id;
}
// ========== 定时任务相关操作 ==========
/** 创建一条定时任务 */
export function createTask(task: ScheduledTask): void {
db.prepare(`
INSERT INTO scheduled_tasks
(id, group_folder, chat_id, prompt, schedule_type, schedule_value, next_run, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(
task.id, task.group_folder, task.chat_id, task.prompt,
task.schedule_type, task.schedule_value, task.next_run,
task.status, task.created_at
);
}
/** 获取所有到期该执行的任务 */
export function getDueTasks(): ScheduledTask[] {
const now = new Date().toISOString();
return db.prepare(`
SELECT * FROM scheduled_tasks
WHERE status = 'active' AND next_run IS NOT NULL AND next_run <= ?
ORDER BY next_run ASC
`).all(now) as ScheduledTask[];
}
/** 获取某个群组的所有定时任务 */
export function getTasksForGroup(groupFolder: string): ScheduledTask[] {
return db.prepare(`
SELECT * FROM scheduled_tasks
WHERE group_folder = ?
ORDER BY created_at DESC
`).all(groupFolder) as ScheduledTask[];
}
/** 更新任务的执行状态 */
export function updateTaskAfterRun(
taskId: string,
nextRun: string | null,
lastResult: string
): void {
const now = new Date().toISOString();
db.prepare(`
UPDATE scheduled_tasks
SET next_run = ?, last_run = ?, last_result = ?,
status = CASE WHEN ? IS NULL THEN 'completed' ELSE status END
WHERE id = ?
`).run(nextRun, now, lastResult, nextRun, taskId);
}
/** 删除一条定时任务 */
export function deleteTask(taskId: string): void {
db.prepare(`DELETE FROM scheduled_tasks WHERE id = ?`).run(taskId);
}
代码讲解:
数据库层是整个项目的"数据仓库"。它的设计遵循一个原则:对外暴露简单的函数,对内封装 SQL 细节。
几个要点:
- 表设计:四张表对应四种数据 —— 消息、群组、会话、定时任务
- WAL 模式:SQLite 默认用 journal 模式,一次只能一个写操作。开启 WAL(Write-Ahead Logging)后,读和写可以同时进行
- 同步操作:better-sqlite3 是同步的,在 Node.js 里这不是问题,因为 SQLite 的操作本身就很快(通常毫秒级)
19.4 Telegram 接入(channels/telegram.ts)—— 约 120 行
这是 MiniClaw 的"耳朵和嘴巴" —— 通过 Telegram 接收用户消息,把 Agent 的回复发送回去。
创建 src/channels/telegram.ts:
/**
* channels/telegram.ts —— Telegram 消息渠道
*
* 职责:
* 1. 连接 Telegram Bot API
* 2. 监听群组和私聊消息
* 3. 把消息转换成统一格式保存到数据库
* 4. 把 Agent 的回复发送给用户
*
* 使用 grammy 库 —— 一个很好用的 Telegram Bot 框架
*/
import { Bot, Context } from "grammy";
import { config } from "../config.js";
import { saveMessage } from "../db.js";
import pino from "pino";
const logger = pino({ name: "telegram" });
// ========== 类型定义 ==========
/** 消息处理回调 —— 当收到新消息时调用 */
export type OnMessageCallback = (chatId: string, message: {
id: string;
chat_id: string;
sender: string;
sender_name: string;
content: string;
timestamp: string;
}) => void;
// ========== Telegram 渠道类 ==========
export class TelegramChannel {
private bot: Bot;
private onMessage: OnMessageCallback;
constructor(onMessage: OnMessageCallback) {
// 检查 Token 是否配置了
if (!config.TELEGRAM_BOT_TOKEN) {
throw new Error(
"TELEGRAM_BOT_TOKEN 未配置!请在 .env 文件中设置。\n" +
"获取方法:在 Telegram 中找 @BotFather,发送 /newbot 创建一个新 Bot"
);
}
this.bot = new Bot(config.TELEGRAM_BOT_TOKEN);
this.onMessage = onMessage;
// 注册消息处理器
this.setupHandlers();
}
/** 设置消息处理器 */
private setupHandlers(): void {
// 监听所有文本消息
this.bot.on("message:text", async (ctx: Context) => {
// 拿到消息的基本信息
const msg = ctx.message!;
const chatId = String(msg.chat.id);
const senderId = String(msg.from?.id || "unknown");
const senderName = this.getSenderName(msg.from);
const content = msg.text || "";
const timestamp = new Date(msg.date * 1000).toISOString();
const messageId = String(msg.message_id);
// 忽略空消息
if (!content.trim()) return;
logger.info({
chatId,
sender: senderName,
contentLength: content.length,
}, "收到 Telegram 消息");
// 保存到数据库
const message = {
id: messageId,
chat_id: chatId,
sender: senderId,
sender_name: senderName,
content,
timestamp,
};
saveMessage(message);
// 通知主程序有新消息
this.onMessage(chatId, message);
});
// 监听错误
this.bot.catch((err) => {
logger.error({ err }, "Telegram Bot 错误");
});
}
/** 获取发送者的显示名称 */
private getSenderName(from: Context["message"] extends { from?: infer F } ? F : never): string {
if (!from) return "未知用户";
if (from.first_name && from.last_name) {
return `${from.first_name} ${from.last_name}`;
}
return from.first_name || from.username || "未知用户";
}
/** 启动 Bot */
async start(): Promise<void> {
logger.info("Telegram Bot 启动中...");
// 获取 Bot 信息(顺便验证 Token 是否有效)
const me = await this.bot.api.getMe();
logger.info({ botName: me.first_name, botUsername: me.username }, "Telegram Bot 已连接");
// 开始接收消息
// 注意:bot.start() 会阻塞,所以不要 await 它
this.bot.start({
// 允许的更新类型
allowed_updates: ["message"],
// 遇到错误时的重试策略
onStart: (info) => {
logger.info({ username: info.username }, "Bot 开始接收消息");
},
});
}
/** 发送消息到指定聊天 */
async sendMessage(chatId: string, text: string): Promise<void> {
// Telegram 单条消息最大 4096 字符
// 如果 Agent 回复太长,需要分段发送
const MAX_LENGTH = 4000; // 留点余量
if (text.length <= MAX_LENGTH) {
// 短消息直接发
await this.bot.api.sendMessage(chatId, text);
} else {
// 长消息分段发
const chunks = this.splitMessage(text, MAX_LENGTH);
for (const chunk of chunks) {
await this.bot.api.sendMessage(chatId, chunk);
// 每段之间稍微等一下,避免发送太快被限流
await new Promise(resolve => setTimeout(resolve, 200));
}
}
}
/** 把长文本按段落边界切分 */
private splitMessage(text: string, maxLength: number): string[] {
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
chunks.push(remaining);
break;
}
// 尽量在换行符处切分,避免把一段话切成两半
let cutPoint = remaining.lastIndexOf("\n", maxLength);
if (cutPoint === -1 || cutPoint < maxLength * 0.5) {
// 如果没有合适的换行符,在空格处切分
cutPoint = remaining.lastIndexOf(" ", maxLength);
}
if (cutPoint === -1 || cutPoint < maxLength * 0.5) {
// 实在没办法,硬切
cutPoint = maxLength;
}
chunks.push(remaining.slice(0, cutPoint));
remaining = remaining.slice(cutPoint).trimStart();
}
return chunks;
}
/** 设置"正在输入"状态 */
async setTyping(chatId: string): Promise<void> {
try {
await this.bot.api.sendChatAction(chatId, "typing");
} catch {
// 设置输入状态失败不影响功能,忽略
}
}
/** 停止 Bot */
async stop(): Promise<void> {
logger.info("Telegram Bot 停止中...");
this.bot.stop();
}
}
代码讲解:
Telegram 渠道做的事情很简单:
- 收消息:
bot.on("message:text", ...)监听所有文本消息,把它转换成统一格式保存到数据库 - 发消息:
sendMessage()把 Agent 的回复发给用户。需要注意 Telegram 单条消息有 4096 字符限制,超长要分段发送 - 分段逻辑:尽量在换行符或空格处切分,而不是把一个词切成两半
有一个需要注意的地方:bot.start() 是一个长时间运行的操作(它会不停地轮询 Telegram 服务器获取新消息),所以我们不 await 它,而是让它在后台跑。
19.5 CLI 接入(channels/cli.ts)—— 约 60 行
开发和测试时不想每次都用 Telegram 怎么办?搞一个命令行接口就行了。直接在终端里和 Agent 对话。
创建 src/channels/cli.ts:
/**
* channels/cli.ts —— 命令行消息渠道
*
* 职责:
* 1. 从终端读取用户输入
* 2. 把输入转换成统一格式的消息
* 3. 在终端显示 Agent 的回复
*
* 主要用于开发和测试 —— 不需要配置 Telegram,
* 直接在终端里就能和 Agent 对话。
*/
import readline from "readline";
import { config } from "../config.js";
import { saveMessage } from "../db.js";
import pino from "pino";
const logger = pino({ name: "cli" });
// CLI 模式下只有一个"聊天",用固定的 chat_id
const CLI_CHAT_ID = "cli:local";
const CLI_SENDER = "local-user";
const CLI_SENDER_NAME = "你";
/** 消息处理回调 */
export type OnMessageCallback = (chatId: string, message: {
id: string;
chat_id: string;
sender: string;
sender_name: string;
content: string;
timestamp: string;
}) => void;
export class CliChannel {
private rl: readline.Interface;
private onMessage: OnMessageCallback;
private messageCounter = 0;
constructor(onMessage: OnMessageCallback) {
this.onMessage = onMessage;
// 创建 readline 接口,从标准输入读取
this.rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
}
/** 启动 CLI 交互 */
async start(): Promise<void> {
logger.info("CLI 模式启动");
console.log("========================================");
console.log(` MiniClaw CLI 模式`);
console.log(` 触发词:@${config.ASSISTANT_NAME}`);
console.log(` 输入消息开始对话,输入 quit 退出`);
console.log("========================================\n");
// 显示提示符,等待用户输入
this.promptUser();
}
/** 显示提示符,等待用户输入 */
private promptUser(): void {
this.rl.question("你: ", (input) => {
const trimmed = input.trim();
// 输入 quit 或 exit 退出
if (trimmed === "quit" || trimmed === "exit") {
console.log("再见!");
process.exit(0);
}
// 忽略空输入
if (!trimmed) {
this.promptUser();
return;
}
// 构造消息对象
this.messageCounter++;
const message = {
id: `cli-${this.messageCounter}`,
chat_id: CLI_CHAT_ID,
sender: CLI_SENDER,
sender_name: CLI_SENDER_NAME,
content: trimmed,
timestamp: new Date().toISOString(),
};
// 保存到数据库
saveMessage(message);
// 通知主程序
this.onMessage(CLI_CHAT_ID, message);
// 继续等待下一条输入
// 注意:Agent 处理消息是异步的,不会阻塞输入
this.promptUser();
});
}
/** 发送消息(在 CLI 模式下就是打印到终端) */
async sendMessage(_chatId: string, text: string): Promise<void> {
console.log(`\n${config.ASSISTANT_NAME}: ${text}\n`);
}
/** 设置输入状态(CLI 模式下显示一个提示) */
async setTyping(_chatId: string): Promise<void> {
process.stdout.write(`[${config.ASSISTANT_NAME} 正在思考...]\r`);
}
/** 停止 CLI */
async stop(): Promise<void> {
this.rl.close();
}
}
代码讲解:
CLI 渠道是 Telegram 渠道的"简化版"。它用 Node.js 内置的 readline 模块,在终端里实现一问一答的交互。
开发时可以用 pnpm run dev:cli 启动 CLI 模式,不需要配置 Telegram Bot,方便调试。
19.6 消息路由(router.ts)—— 约 80 行
路由器是 MiniClaw 的"分拣中心"。收到一条消息后,它决定:这条消息要不要交给 Agent 处理?如果要,怎么把消息和上下文一起打包发给 Agent?
创建 src/router.ts:
/**
* router.ts —— 消息路由器
*
* 职责:
* 1. 判断一条消息是否需要 Agent 处理(触发词检测)
* 2. 把消息和上下文打包成 Agent 能理解的格式
* 3. 加载群组的 CLAUDE.md 记忆文件
* 4. 组装最终的 prompt
*
* 消息格式采用 XML 标签 —— 这是 NanoClaw 的做法,
* 因为 Claude 对 XML 格式的理解特别好。
*/
import fs from "fs";
import path from "path";
import { config, TRIGGER_PATTERN, GROUPS_DIR } from "./config.js";
import { Message, getRecentMessages, getGroup } from "./db.js";
import pino from "pino";
const logger = pino({ name: "router" });
// ========== XML 转义 ==========
/** 转义 XML 特殊字符,防止消息内容破坏 XML 结构 */
function escapeXml(s: string): string {
if (!s) return "";
return s
.replace(/&/g, "&")
.replace(/</g, "<")
.replace(/>/g, ">")
.replace(/"/g, """);
}
// ========== 触发词检测 ==========
/**
* 检查消息是否需要 Agent 处理
*
* 触发条件:
* 1. 消息以触发词开头(如 "@mini")
* 2. 这是一条私聊消息(不需要触发词)
*
* 返回 true 表示"这条消息需要处理"
*/
export function shouldProcess(chatId: string, messages: Message[]): boolean {
// 私聊消息(CLI 模式)总是处理
if (chatId.startsWith("cli:")) return true;
// 群组消息:检查是否有触发词
return messages.some((msg) => TRIGGER_PATTERN.test(msg.content.trim()));
}
// ========== 消息格式化 ==========
/**
* 把消息列表格式化成 XML 格式
*
* 输出示例:
* <messages>
* <message sender="小明" time="2024-01-01T12:00:00Z">@mini 今天天气怎么样?</message>
* <message sender="小红" time="2024-01-01T12:01:00Z">我也想知道</message>
* </messages>
*/
export function formatMessages(messages: Message[]): string {
const lines = messages.map((m) =>
`<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`
);
return `<messages>\n${lines.join("\n")}\n</messages>`;
}
// ========== 加载群组记忆 ==========
/**
* 读取群组的 CLAUDE.md 文件
*
* 每个群组在 groups/ 目录下有自己的文件夹,
* 里面的 CLAUDE.md 就是 Agent 对这个群的"记忆"。
* Agent 可以在对话过程中修改这个文件,实现持久记忆。
*/
export function loadGroupMemory(groupFolder: string): string {
const claudeMdPath = path.join(GROUPS_DIR, groupFolder, "CLAUDE.md");
try {
if (fs.existsSync(claudeMdPath)) {
return fs.readFileSync(claudeMdPath, "utf-8");
}
} catch (err) {
logger.warn({ groupFolder, err }, "读取群组记忆失败");
}
return "";
}
// ========== 组装完整 Prompt ==========
/**
* 组装发送给 Agent 的完整 prompt
*
* 包含三部分信息:
* 1. 群组记忆(CLAUDE.md 的内容)
* 2. 最近的聊天历史(给 Agent 提供上下文)
* 3. 当前待处理的新消息
*
* 这个结构参考了 NanoClaw 的设计 —— 用 XML 标签把不同部分包起来,
* Claude 能很好地理解这种结构。
*/
export function buildPrompt(
chatId: string,
newMessages: Message[],
groupFolder: string
): string {
const parts: string[] = [];
// 第一部分:群组记忆
const memory = loadGroupMemory(groupFolder);
if (memory) {
parts.push(`<group_memory>\n${memory}\n</group_memory>`);
}
// 第二部分:最近的聊天历史
// 取最近 30 条消息作为上下文,帮助 Agent 理解对话背景
const history = getRecentMessages(chatId, 30);
if (history.length > 0) {
// getRecentMessages 返回的是倒序的,翻转成正序
const chronological = [...history].reverse();
parts.push(`<recent_history>\n${formatMessages(chronological)}\n</recent_history>`);
}
// 第三部分:当前待处理的新消息
parts.push(`<new_messages>\n${formatMessages(newMessages)}\n</new_messages>`);
return parts.join("\n\n");
}
/**
* 去掉 Agent 回复中的内部标签
*
* Agent 有时会用 <internal>...</internal> 来做内部推理,
* 这些内容不应该发给用户看。
*/
export function stripInternalTags(text: string): string {
return text.replace(/<internal>[\s\S]*?<\/internal>/g, "").trim();
}
代码讲解:
路由器的核心是 buildPrompt() 函数。它把三种信息组装在一起:
- 群组记忆:Agent 对这个群的长期记忆(CLAUDE.md),比如群规、重要事项
- 聊天历史:最近的对话,帮 Agent 理解当前语境
- 新消息:需要 Agent 处理的消息
为什么用 XML 标签?因为 Claude 模型对 XML 的理解特别好。用 <group_memory> 和 <new_messages> 这样的标签把不同内容分开,Agent 就能清楚地区分"这是记忆"和"这是用户的新消息"。
19.7 任务队列(queue.ts)—— 约 70 行
任务队列解决两个问题:
- 同一个群组的消息要串行处理 —— 如果 Agent 还在处理上一条消息,新消息要排队等着
- 全局并发要控制 —— 不能同时跑太多 Agent,否则 API 额度很快就用完了
创建 src/queue.ts:
/**
* queue.ts —— 任务队列
*
* 职责:
* 1. 每个群组一个队列,同一群组的任务串行执行
* 2. 全局并发限制,防止同时运行太多 Agent
* 3. 支持任务入队、出队、执行
*
* 设计思路:
* - 用 Map 维护每个群组的队列状态
* - 用计数器控制全局并发数
* - 任务完成后自动执行队列中的下一个任务
*/
import { config } from "./config.js";
import pino from "pino";
const logger = pino({ name: "queue" });
/** 任务函数类型 —— 返回 Promise,完成时 resolve */
type TaskFn = () => Promise<void>;
/** 每个群组的队列状态 */
interface GroupQueueState {
isRunning: boolean; // 当前是否有任务在执行
pendingTasks: TaskFn[]; // 等待执行的任务列表
}
export class TaskQueue {
// 每个群组(chatId)一个队列
private groups = new Map<string, GroupQueueState>();
// 当前全局正在运行的任务数
private activeCount = 0;
// 因为全局并发限制而等待的任务
private waitingForSlot: Array<{ chatId: string; task: TaskFn }> = [];
/** 获取或创建某个群组的队列状态 */
private getGroupState(chatId: string): GroupQueueState {
let state = this.groups.get(chatId);
if (!state) {
state = { isRunning: false, pendingTasks: [] };
this.groups.set(chatId, state);
}
return state;
}
/**
* 往队列里加一个任务
*
* 逻辑:
* 1. 如果这个群组当前没有任务在跑 → 直接执行
* 2. 如果这个群组有任务在跑 → 排队等着
* 3. 如果全局并发数已满 → 等有空位再执行
*/
enqueue(chatId: string, task: TaskFn): void {
const state = this.getGroupState(chatId);
if (state.isRunning) {
// 该群组有任务在跑,排队
state.pendingTasks.push(task);
logger.debug({ chatId, queueLength: state.pendingTasks.length }, "任务已排队(群组内)");
return;
}
if (this.activeCount >= config.MAX_CONCURRENT_CONTAINERS) {
// 全局并发数已满,等有空位
this.waitingForSlot.push({ chatId, task });
logger.debug({
chatId,
activeCount: this.activeCount,
waitingCount: this.waitingForSlot.length,
}, "任务已排队(等待空位)");
return;
}
// 可以直接执行
this.executeTask(chatId, task);
}
/** 执行一个任务 */
private async executeTask(chatId: string, task: TaskFn): Promise<void> {
const state = this.getGroupState(chatId);
state.isRunning = true;
this.activeCount++;
logger.debug({
chatId,
activeCount: this.activeCount,
}, "开始执行任务");
try {
await task();
} catch (err) {
logger.error({ chatId, err }, "任务执行出错");
} finally {
state.isRunning = false;
this.activeCount--;
logger.debug({
chatId,
activeCount: this.activeCount,
}, "任务执行完毕");
// 任务完成了,看看有没有后续任务
this.drainNext(chatId);
}
}
/** 任务完成后,看看有没有下一个要执行的 */
private drainNext(chatId: string): void {
const state = this.getGroupState(chatId);
// 优先执行同群组的下一个任务
if (state.pendingTasks.length > 0) {
const nextTask = state.pendingTasks.shift()!;
this.executeTask(chatId, nextTask);
return;
}
// 该群组没有排队的了,看看有没有其他群组在等空位
if (this.waitingForSlot.length > 0) {
const next = this.waitingForSlot.shift()!;
this.executeTask(next.chatId, next.task);
}
}
/** 获取当前队列状态(用于调试和监控) */
getStatus(): { active: number; waiting: number; groups: number } {
return {
active: this.activeCount,
waiting: this.waitingForSlot.length,
groups: this.groups.size,
};
}
}
代码讲解:
任务队列的核心逻辑可以用一句话概括:同一群组串行,全局限并发。
想象一个场景:你有三个群组同时在用 MiniClaw,MAX_CONCURRENT_CONTAINERS 设为 3。
- 群A 发来消息 → 开始处理(活跃数:1)
- 群B 发来消息 → 开始处理(活跃数:2)
- 群C 发来消息 → 开始处理(活跃数:3,已满)
- 群A 又发来消息 → 排队等群A上一个任务完成
- 群D 发来消息 → 等待空位
这样就不会因为消息洪水把你的 API 额度吃光。
19.8 容器管理(agent/container.ts)—— 约 100 行
在生产环境中,我们希望 Agent 在 Docker 容器里运行 —— 这样即使 Agent 执行了危险命令(比如 rm -rf /),也只会影响容器内部,不会伤害你的宿主机。
但开发时每次都启动 Docker 太麻烦了。所以我们提供两种模式:
- 容器模式(生产用):Agent 在 Docker 容器中运行
- 直连模式(开发用):Agent 直接在当前进程中运行
创建 src/agent/container.ts:
/**
* agent/container.ts —— 容器管理
*
* 职责:
* 1. 创建 Docker 容器,设置资源限制和文件挂载
* 2. 在容器内启动 Agent
* 3. 管理容器的生命周期(创建、启动、停止、销毁)
* 4. 提供"直连模式"作为开发替代方案
*
* 容器的作用就像一个"沙箱" ——
* Agent 在里面随便折腾,都不会影响外面的世界。
*/
import { execSync, spawn, ChildProcess } from "child_process";
import path from "path";
import { config, GROUPS_DIR } from "../config.js";
import pino from "pino";
const logger = pino({ name: "container" });
// ========== Docker 工具函数 ==========
/** 检查 Docker 是否可用 */
export function isDockerAvailable(): boolean {
try {
execSync("docker info", { stdio: "ignore" });
return true;
} catch {
return false;
}
}
// ========== 容器选项 ==========
export interface ContainerOptions {
/** 群组文件夹名 —— 用于文件挂载 */
groupFolder: string;
/** 容器名称前缀 */
namePrefix?: string;
/** CPU 限制(核心数,比如 1.0 表示最多用 1 个核心) */
cpuLimit?: number;
/** 内存限制(比如 "512m") */
memoryLimit?: string;
}
// ========== Docker 容器模式 ==========
/**
* 在 Docker 容器中运行命令
*
* 容器设置:
* - 挂载群组目录(读写),让 Agent 能修改 CLAUDE.md
* - 挂载项目目录(只读),让 Agent 能看到项目代码
* - 设置资源限制,防止 Agent 吃光 CPU 和内存
* - 设置网络隔离,只允许访问必要的地址
* - 容器运行结束后自动删除(--rm)
*/
export function createContainer(
command: string[],
options: ContainerOptions,
env: Record<string, string> = {}
): ChildProcess {
const containerName = `${options.namePrefix || "miniclaw"}-${options.groupFolder}-${Date.now()}`;
const groupDir = path.resolve(GROUPS_DIR, options.groupFolder);
// 组装 docker run 命令的参数
const args: string[] = [
"run",
"--rm", // 容器退出后自动删除
"--name", containerName, // 容器名称
"--cpus", String(options.cpuLimit || 1), // CPU 限制
"--memory", options.memoryLimit || "512m", // 内存限制
"-v", `${groupDir}:/workspace/group:rw`, // 挂载群组目录(可读写)
"-w", "/workspace/group", // 工作目录设为群组目录
];
// 注入环境变量
for (const [key, value] of Object.entries(env)) {
args.push("-e", `${key}=${value}`);
}
// 使用 Node.js 镜像
args.push("node:20-slim");
// 运行的命令
args.push(...command);
logger.info({ containerName, groupFolder: options.groupFolder }, "创建 Docker 容器");
const proc = spawn("docker", args, {
stdio: ["pipe", "pipe", "pipe"],
});
// 设置超时 —— 防止容器运行太久
const timeout = setTimeout(() => {
logger.warn({ containerName }, "容器运行超时,强制停止");
try {
execSync(`docker stop ${containerName}`, { timeout: 10000 });
} catch {
execSync(`docker kill ${containerName}`, { timeout: 5000 }).toString();
}
}, config.CONTAINER_TIMEOUT);
// 容器退出时清除超时定时器
proc.on("exit", () => {
clearTimeout(timeout);
logger.info({ containerName }, "Docker 容器已退出");
});
return proc;
}
// ========== 直连模式(开发用) ==========
/**
* 不用 Docker,直接在当前 Node.js 进程中运行
*
* 优点:启动快,调试方便
* 缺点:没有隔离,Agent 执行的命令会直接影响宿主机
*
* 仅用于开发和测试!生产环境请使用 Docker 容器模式。
*/
export function runDirect(
command: string[],
cwd: string,
env: Record<string, string> = {}
): ChildProcess {
logger.info({ cwd }, "直连模式运行(无容器隔离)");
const proc = spawn(command[0], command.slice(1), {
cwd,
env: { ...process.env, ...env },
stdio: ["pipe", "pipe", "pipe"],
});
return proc;
}
/**
* 获取群组的工作目录
*
* 在容器模式下是 /workspace/group
* 在直连模式下是 groups/{groupFolder} 的绝对路径
*/
export function getWorkDir(groupFolder: string, useContainer: boolean): string {
if (useContainer) {
return "/workspace/group";
}
return path.resolve(GROUPS_DIR, groupFolder);
}
代码讲解:
容器管理提供了两种运行模式:
- Docker 容器模式:安全但稍慢,Agent 在隔离环境中运行。适合生产环境
- 直连模式:快速但不安全,Agent 直接在当前进程中运行。适合开发调试
容器模式的关键是文件挂载:
- 群组目录挂载为可读写 → Agent 可以修改 CLAUDE.md 来更新记忆
- 项目目录挂载为只读 → Agent 能看到代码但不能修改
19.9 Agent 运行器(agent/runner.ts)—— 核心!约 130 行
这是整个 MiniClaw 最核心的文件。 它调用 Claude Agent SDK 的 query() 函数,让 Agent 真正"活"起来。
创建 src/agent/runner.ts:
/**
* agent/runner.ts —— Agent 运行器
*
* 这是 MiniClaw 的心脏!
*
* 职责:
* 1. 调用 Claude Agent SDK 的 query() 函数启动 Agent
* 2. 配置 Agent 的能力(工具、权限、模型等)
* 3. 收集 Agent 的输出
* 4. 管理 Agent 会话(支持恢复上次对话)
* 5. 错误处理和超时控制
*
* 整个 MiniClaw 最重要的代码就在这里了。
* 前面所有的准备工作 —— 配置、数据库、消息渠道、路由、队列 ——
* 都是为了把数据送到这里,然后把 Agent 的回复送出去。
*/
import { query } from "@anthropic-ai/claude-agent-sdk";
import fs from "fs";
import path from "path";
import { config, GROUPS_DIR } from "../config.js";
import { saveSession, getSession } from "../db.js";
import pino from "pino";
const logger = pino({ name: "agent-runner" });
// ========== 默认的 System Prompt ==========
/**
* Agent 的"人设" —— 告诉 Agent 它是谁、该怎么做事
*
* 这是整个 MiniClaw 最重要的一段文本。
* 它决定了 Agent 的行为方式、说话风格、安全边界。
*
* 你可以根据自己的需求修改这段 prompt。
* 比如如果你想让 Agent 说英文,就把这里改成英文。
*/
const DEFAULT_SYSTEM_PROMPT = `你是 ${config.ASSISTANT_NAME},一个运行在群组聊天中的 AI 助手。
## 你的身份
- 你是一个乐于助人的 AI 助手,通过聊天群和用户交流
- 你的名字是 ${config.ASSISTANT_NAME}
- 用户通过在消息开头加 @${config.ASSISTANT_NAME} 来呼唤你
## 你的能力
- 你可以读写文件、执行命令、搜索网页
- 你可以帮用户写代码、分析问题、查找信息
- 你可以修改当前目录下的 CLAUDE.md 来记住重要信息
- 你可以创建定时任务(通过 schedule_task 工具)
## 行为准则
1. 回复简洁明了,不要过于冗长
2. 使用和用户相同的语言回复(如果用户说中文,你就用中文)
3. 如果不确定,就问用户,不要瞎猜
4. 执行危险操作前要确认(比如删除文件)
5. 始终保持友好和专业
## 记忆系统
- 当前目录有一个 CLAUDE.md 文件,这是你的"记忆"
- 需要记住的重要信息,请写入 CLAUDE.md
- 每次对话开始时,CLAUDE.md 的内容会自动加载
## 消息格式
- 用户的消息用 XML 标签包裹,格式为 <message sender="用户名" time="时间">内容</message>
- 如果有多条消息,它们按时间顺序排列
- <recent_history> 是最近的聊天历史,用于了解上下文
- <new_messages> 是新收到的、需要你处理的消息
- <group_memory> 是群组记忆(CLAUDE.md 的内容)
## 安全规则
- 不要执行可能损坏系统的命令(如 rm -rf /)
- 不要泄露 API Key 或其他敏感信息
- 不要访问不该访问的文件和目录
`;
// ========== Agent 运行器 ==========
/** Agent 运行选项 */
export interface RunAgentOptions {
/** 要发送给 Agent 的 prompt */
prompt: string;
/** 群组文件夹名 */
groupFolder: string;
/** Agent 的工作目录 */
cwd: string;
/** 自定义 system prompt(可选,默认用上面的) */
systemPrompt?: string;
/** 是否为定时任务触发(可选) */
isScheduledTask?: boolean;
}
/** Agent 运行结果 */
export interface RunAgentResult {
/** Agent 的回复文本 */
response: string;
/** 会话ID(用于下次恢复对话) */
sessionId?: string;
/** 是否成功 */
success: boolean;
/** 错误信息(如果失败) */
error?: string;
}
/**
* 运行 Agent —— 这是最核心的函数!
*
* 做三件事:
* 1. 调用 query() 启动 Agent 循环
* 2. 收集 Agent 的输出
* 3. 保存会话ID(下次可以接着聊)
*/
export async function runAgent(options: RunAgentOptions): Promise<RunAgentResult> {
const { prompt, groupFolder, cwd, systemPrompt, isScheduledTask } = options;
// 尝试获取之前的会话ID,实现"接着上次聊"
const existingSessionId = getSession(groupFolder);
logger.info({
groupFolder,
hasSession: !!existingSessionId,
promptLength: prompt.length,
isScheduledTask,
}, "启动 Agent");
// 确保群组工作目录存在
fs.mkdirSync(cwd, { recursive: true });
// 收集 Agent 的所有文本输出
const results: string[] = [];
// 新的会话ID(如果创建了新会话的话)
let newSessionId: string | undefined;
// 计时开始
const startTime = Date.now();
try {
// ========== 核心:调用 query() ==========
// 这里就是整个 MiniClaw 的"灵魂"所在
// query() 返回一个异步迭代器,每次 yield 一条消息
for await (const message of query({
prompt: isScheduledTask
? `[定时任务] ${prompt}`
: prompt,
options: {
// System Prompt —— Agent 的"人设"
systemPrompt: systemPrompt || DEFAULT_SYSTEM_PROMPT,
// 允许 Agent 使用的工具
// 这里给了 Agent 相当多的能力,你可以根据需要限制
allowedTools: [
"Read", // 读文件
"Write", // 写文件
"Edit", // 编辑文件
"Bash", // 执行命令
"Glob", // 查找文件
"Grep", // 搜索文件内容
"WebSearch", // 搜索网页
"WebFetch", // 获取网页内容
"Task", // 创建子任务
],
// 权限模式 —— 绕过所有权限检查
// 因为 Agent 已经在容器里了,有容器隔离保底
// 如果你没有用容器,建议改成 "default"
permissionMode: "bypassPermissions",
// Agent 的工作目录
cwd,
// 最多执行多少轮
// 每一轮是 Agent 的一次"思考 → 行动"
// 20 轮对大部分任务够用了
maxTurns: 20,
// 使用的模型
model: config.MODEL,
// 恢复上一次的会话(如果有的话)
resume: existingSessionId,
// 传递环境变量(不包含敏感信息)
env: {
// 只传必要的环境变量,不要传 API Key
HOME: process.env.HOME || "/root",
PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin",
},
},
})) {
// ========== 处理 Agent 返回的消息 ==========
// 系统消息 —— 包含会话初始化信息
if (message.type === "system" && message.subtype === "init") {
newSessionId = message.session_id;
logger.debug({ sessionId: newSessionId }, "Agent 会话已创建");
}
// Assistant 消息 —— Agent 的回复
if (message.type === "assistant") {
for (const block of message.message.content) {
if (block.type === "text" && block.text) {
results.push(block.text);
}
}
}
// 结果消息 —— Agent 完成了一个阶段的工作
if (message.type === "result") {
if (message.subtype === "error_max_turns") {
logger.warn({ groupFolder }, "Agent 达到最大轮次限制");
}
}
}
// ========== 收尾工作 ==========
const elapsed = Date.now() - startTime;
logger.info({ groupFolder, elapsed, resultCount: results.length }, "Agent 运行完毕");
// 保存会话ID,下次可以接着聊
if (newSessionId) {
saveSession(groupFolder, newSessionId);
}
// 合并所有文本输出
const response = results.join("\n").trim();
return {
response: response || "(Agent 没有回复内容)",
sessionId: newSessionId,
success: true,
};
} catch (err) {
const elapsed = Date.now() - startTime;
const errorMessage = err instanceof Error ? err.message : String(err);
logger.error({ groupFolder, elapsed, error: errorMessage }, "Agent 运行出错");
return {
response: `抱歉,处理你的消息时出了点问题:${errorMessage}`,
success: false,
error: errorMessage,
};
}
}
代码讲解:
这个文件是整个 MiniClaw 的灵魂。我们来仔细看看 query() 函数的调用:
for await (const message of query({
prompt: "用户的消息",
options: {
systemPrompt: "Agent 的人设",
allowedTools: ["Read", "Write", ...],
permissionMode: "bypassPermissions",
cwd: "/workspace/group",
maxTurns: 20,
model: "sonnet",
resume: existingSessionId,
}
})) {
// 处理每一条消息
}
query() 返回一个异步迭代器。每次循环会拿到 Agent 的一条消息,可能是:
- system (init):Agent 启动了,给你一个 session_id
- assistant:Agent 的回复文本或工具调用
- result:Agent 的一个阶段结束了
我们收集所有 assistant 消息中的文本,最后合并成完整的回复发给用户。
关于 System Prompt:
DEFAULT_SYSTEM_PROMPT 这段文本非常关键。它定义了 Agent 的行为方式。你可以把它理解为新员工的"入职手册" —— Agent 看了这个手册,就知道自己该怎么做事。
关于会话恢复:
通过 resume: existingSessionId,Agent 可以"接着上次聊"。这意味着它能记住之前的对话内容,不用每次都从头开始。
19.10 自定义工具(agent/tools.ts)—— 约 80 行
除了 SDK 自带的工具(Read、Write、Bash 等),我们还可以给 Agent 添加自定义工具。MiniClaw 需要三个自定义工具:
- schedule_task —— 创建定时任务
- list_tasks —— 查看定时任务列表
- send_message —— 给指定群组发消息
创建 src/agent/tools.ts:
/**
* agent/tools.ts —— 自定义工具
*
* 职责:
* 1. 定义 MiniClaw 特有的工具
* 2. 创建 MCP Server 供 Agent 调用
*
* 为什么需要自定义工具?
* - SDK 自带的工具只能操作文件和执行命令
* - 但 Agent 还需要"安排定时任务"、"给其他群发消息"这样的能力
* - 自定义工具就是给 Agent 装上这些新技能
*/
import { z } from "zod";
import crypto from "crypto";
import { CronExpressionParser } from "cron-parser";
import { createTask, getTasksForGroup, deleteTask, ScheduledTask } from "../db.js";
import pino from "pino";
const logger = pino({ name: "tools" });
// ========== 工具定义 ==========
/**
* 工具1:创建定时任务
*
* Agent 可以用这个工具安排定时执行的任务,比如:
* - "每天早上 9 点给我发天气预报"
* - "每小时检查一次服务器状态"
* - "5 分钟后提醒我开会"
*/
export interface ScheduleTaskInput {
/** 任务描述 / 要执行的 prompt */
prompt: string;
/** 调度类型:cron(周期性)/ interval(固定间隔)/ once(执行一次) */
schedule_type: "cron" | "interval" | "once";
/** 调度表达式:cron 表达式 / 毫秒数 / ISO 时间字符串 */
schedule_value: string;
}
/**
* 创建一条定时任务
*
* @param input 任务参数
* @param groupFolder 所属群组
* @param chatId 要发送结果的聊天ID
* @returns 创建结果
*/
export function handleScheduleTask(
input: ScheduleTaskInput,
groupFolder: string,
chatId: string
): string {
// 生成唯一的任务ID
const taskId = `task-${crypto.randomUUID().slice(0, 8)}`;
// 计算下次执行时间
let nextRun: string;
try {
if (input.schedule_type === "cron") {
// 解析 cron 表达式,计算下次执行时间
const interval = CronExpressionParser.parse(input.schedule_value);
nextRun = interval.next().toISOString();
} else if (input.schedule_type === "interval") {
// 固定间隔:当前时间 + 间隔毫秒数
const ms = parseInt(input.schedule_value, 10);
if (isNaN(ms) || ms <= 0) {
return "错误:interval 的值必须是正整数(毫秒数)";
}
nextRun = new Date(Date.now() + ms).toISOString();
} else {
// 执行一次:直接用给定的时间
nextRun = new Date(input.schedule_value).toISOString();
}
} catch (err) {
return `错误:无法解析调度表达式 "${input.schedule_value}" —— ${err instanceof Error ? err.message : String(err)}`;
}
// 保存到数据库
const task: ScheduledTask = {
id: taskId,
group_folder: groupFolder,
chat_id: chatId,
prompt: input.prompt,
schedule_type: input.schedule_type,
schedule_value: input.schedule_value,
next_run: nextRun,
last_run: null,
last_result: null,
status: "active",
created_at: new Date().toISOString(),
};
createTask(task);
logger.info({ taskId, groupFolder, scheduleType: input.schedule_type }, "定时任务已创建");
return `定时任务已创建!
- 任务ID: ${taskId}
- 类型: ${input.schedule_type}
- 表达式: ${input.schedule_value}
- 下次执行: ${nextRun}
- 任务内容: ${input.prompt.slice(0, 100)}${input.prompt.length > 100 ? "..." : ""}`;
}
/**
* 工具2:查看定时任务列表
*/
export function handleListTasks(groupFolder: string): string {
const tasks = getTasksForGroup(groupFolder);
if (tasks.length === 0) {
return "当前没有定时任务。";
}
const lines = tasks.map((t) => {
return `- [${t.status}] ${t.id}: "${t.prompt.slice(0, 60)}${t.prompt.length > 60 ? "..." : ""}"
类型: ${t.schedule_type} (${t.schedule_value})
下次执行: ${t.next_run || "无"}
上次执行: ${t.last_run || "从未执行"}`;
});
return `定时任务列表(共 ${tasks.length} 个):\n${lines.join("\n\n")}`;
}
/**
* 工具3:删除定时任务
*/
export function handleDeleteTask(taskId: string): string {
try {
deleteTask(taskId);
logger.info({ taskId }, "定时任务已删除");
return `已删除任务 ${taskId}`;
} catch (err) {
return `删除失败:${err instanceof Error ? err.message : String(err)}`;
}
}
// ========== 工具的 JSON Schema 定义 ==========
// 这些 Schema 告诉 Agent 每个工具需要什么参数
export const toolSchemas = {
schedule_task: {
name: "schedule_task",
description: "创建一个定时任务。支持 cron 表达式(周期性执行)、interval(固定间隔)和 once(执行一次)。",
input_schema: {
type: "object" as const,
properties: {
prompt: {
type: "string",
description: "任务要执行的内容描述。Agent 会在指定时间收到这个 prompt 并执行。",
},
schedule_type: {
type: "string",
enum: ["cron", "interval", "once"],
description: "调度类型。cron: 使用 cron 表达式(如 '0 9 * * *' 表示每天9点);interval: 固定间隔(毫秒);once: 执行一次",
},
schedule_value: {
type: "string",
description: "调度表达式。cron 类型填 cron 表达式;interval 类型填毫秒数;once 类型填 ISO 时间字符串",
},
},
required: ["prompt", "schedule_type", "schedule_value"],
},
},
list_tasks: {
name: "list_tasks",
description: "查看当前群组的所有定时任务列表",
input_schema: {
type: "object" as const,
properties: {},
},
},
delete_task: {
name: "delete_task",
description: "删除一个定时任务",
input_schema: {
type: "object" as const,
properties: {
task_id: {
type: "string",
description: "要删除的任务ID",
},
},
required: ["task_id"],
},
},
};
代码讲解:
自定义工具让 Agent 能做超出 SDK 内置能力的事情。每个工具需要三样东西:
- 名字和描述 —— Agent 根据描述来决定什么时候用这个工具
- 参数 Schema —— 告诉 Agent 需要传什么参数
- 执行函数 —— 实际干活的代码
比如 schedule_task 工具,Agent 可以这样调用它:
Agent 思考:用户说"每天早上 9 点提醒我锻炼身体",我应该创建一个定时任务
Agent 调用:schedule_task({
prompt: "提醒用户:该锻炼身体了!保持健康很重要。",
schedule_type: "cron",
schedule_value: "0 9 * * *"
})
19.11 定时任务调度器(scheduler.ts)—— 约 70 行
调度器负责定时检查有没有任务该执行了。如果有,就把任务交给 Agent 去做。
创建 src/scheduler.ts:
/**
* scheduler.ts —— 定时任务调度器
*
* 职责:
* 1. 定时检查数据库里有没有到期的任务
* 2. 如果有,把任务交给 Agent 执行
* 3. 执行完毕后更新任务状态和下次执行时间
* 4. 把执行结果发送回对应的群组
*
* 工作方式很简单:每隔一段时间(默认 1 分钟)查一次数据库,
* 看看有没有 next_run 已经过了当前时间的任务。
*/
import { CronExpressionParser } from "cron-parser";
import { getDueTasks, updateTaskAfterRun } from "./db.js";
import { runAgent } from "./agent/runner.js";
import { TaskQueue } from "./queue.js";
import { GROUPS_DIR, SCHEDULER_POLL_INTERVAL } from "./config.js";
import path from "path";
import pino from "pino";
const logger = pino({ name: "scheduler" });
// ========== 类型定义 ==========
/** 发送消息的回调函数 */
type SendMessageFn = (chatId: string, text: string) => Promise<void>;
// ========== 调度器 ==========
/**
* 启动定时任务调度循环
*
* 每隔 SCHEDULER_POLL_INTERVAL(默认 60 秒)检查一次,
* 有到期任务就执行。
*
* @param queue 任务队列(用于控制并发)
* @param sendMessage 发送消息的函数
*/
export function startScheduler(
queue: TaskQueue,
sendMessage: SendMessageFn
): void {
logger.info(
{ pollInterval: SCHEDULER_POLL_INTERVAL },
"定时任务调度器启动"
);
// 调度循环
const loop = async () => {
try {
// 从数据库查询到期的任务
const dueTasks = getDueTasks();
if (dueTasks.length > 0) {
logger.info({ count: dueTasks.length }, "发现到期任务");
}
for (const task of dueTasks) {
// 把每个到期任务加入队列
// 队列会控制并发,不会同时跑太多 Agent
queue.enqueue(task.chat_id, async () => {
logger.info({ taskId: task.id, prompt: task.prompt.slice(0, 80) }, "执行定时任务");
const startTime = Date.now();
try {
// 运行 Agent 执行任务
const result = await runAgent({
prompt: task.prompt,
groupFolder: task.group_folder,
cwd: path.resolve(GROUPS_DIR, task.group_folder),
isScheduledTask: true,
});
const elapsed = Date.now() - startTime;
// 把结果发送给用户
if (result.response) {
await sendMessage(task.chat_id, `[定时任务结果]\n${result.response}`);
}
// 计算下次执行时间
let nextRun: string | null = null;
if (task.schedule_type === "cron") {
// cron 类型:用 cron 表达式计算下一次
const interval = CronExpressionParser.parse(task.schedule_value);
nextRun = interval.next().toISOString();
} else if (task.schedule_type === "interval") {
// interval 类型:当前时间 + 间隔
const ms = parseInt(task.schedule_value, 10);
nextRun = new Date(Date.now() + ms).toISOString();
}
// once 类型:nextRun 为 null,任务会被标记为 completed
// 更新数据库
updateTaskAfterRun(
task.id,
nextRun,
result.response.slice(0, 200) // 只保存前 200 字作为摘要
);
logger.info({ taskId: task.id, elapsed, nextRun }, "定时任务执行完毕");
} catch (err) {
const elapsed = Date.now() - startTime;
const errorMsg = err instanceof Error ? err.message : String(err);
logger.error({ taskId: task.id, elapsed, error: errorMsg }, "定时任务执行失败");
// 即使失败也要更新状态,否则同一任务会被反复执行
let nextRun: string | null = null;
if (task.schedule_type === "cron") {
try {
const interval = CronExpressionParser.parse(task.schedule_value);
nextRun = interval.next().toISOString();
} catch { /* ignore parse error */ }
} else if (task.schedule_type === "interval") {
const ms = parseInt(task.schedule_value, 10);
if (!isNaN(ms)) {
nextRun = new Date(Date.now() + ms).toISOString();
}
}
updateTaskAfterRun(task.id, nextRun, `错误: ${errorMsg}`);
}
});
}
} catch (err) {
logger.error({ err }, "调度器循环出错");
}
// 等待下一次检查
setTimeout(loop, SCHEDULER_POLL_INTERVAL);
};
// 启动循环(不阻塞主线程)
loop();
}
代码讲解:
调度器的工作原理非常简单:
每隔 1 分钟:
1. 查数据库:有没有 next_run <= 当前时间 的任务?
2. 有 → 把任务加入队列
3. 队列执行任务 → 调用 runAgent() → 把结果发给用户
4. 计算下次执行时间 → 更新数据库
三种调度类型的区别:
- cron:标准 cron 表达式,比如
0 9 * * *表示每天早上 9 点 - interval:固定间隔,比如
3600000表示每小时(3600000 毫秒) - once:只执行一次,执行完后状态变为
completed
19.12 主入口(index.ts)—— 约 80 行
最后一个文件 —— 把所有组件串起来!
创建 src/index.ts:
/**
* index.ts —— MiniClaw 主入口
*
* 职责:
* 1. 初始化所有组件(数据库、消息渠道、任务队列)
* 2. 连接各组件之间的"管道"
* 3. 启动消息轮询循环
* 4. 启动定时任务调度器
* 5. 处理优雅退出
*
* 这个文件就像一个"总指挥" —— 它不干具体的活,
* 但它负责把所有干活的人组织在一起。
*/
import { config, GROUPS_DIR, TRIGGER_PATTERN, POLL_INTERVAL } from "./config.js";
import { initDatabase, saveGroup, getGroup, getAllGroups, getUnprocessedMessages, markAsProcessed } from "./db.js";
import { TelegramChannel } from "./channels/telegram.js";
import { CliChannel } from "./channels/cli.js";
import { shouldProcess, buildPrompt, stripInternalTags } from "./router.js";
import { TaskQueue } from "./queue.js";
import { runAgent } from "./agent/runner.js";
import { startScheduler } from "./scheduler.js";
import fs from "fs";
import path from "path";
import pino from "pino";
const logger = pino({ name: "main" });
// ========== 全局状态 ==========
const queue = new TaskQueue();
// 消息渠道(Telegram 或 CLI)的引用
let channel: {
sendMessage: (chatId: string, text: string) => Promise<void>;
setTyping: (chatId: string) => Promise<void>;
stop: () => Promise<void>;
};
// ========== 消息处理 ==========
/**
* 处理新消息
*
* 当消息渠道收到新消息时,这个函数会被调用。
* 它决定是否需要让 Agent 处理,如果需要就加入队列。
*/
function handleNewMessage(chatId: string, _message: unknown): void {
// 检查这个聊天是否已注册为群组
let group = getGroup(chatId);
if (!group) {
// 如果是 CLI 模式,自动注册为 main 群组
if (chatId.startsWith("cli:")) {
group = {
chat_id: chatId,
name: "CLI",
folder: "main",
trigger: config.ASSISTANT_NAME,
created_at: new Date().toISOString(),
};
saveGroup(group);
logger.info("CLI 群组已自动注册");
} else {
// 未注册的群组,先自动注册
// 用 chatId 的后 6 位作为文件夹名
const folder = `group-${chatId.slice(-6)}`;
group = {
chat_id: chatId,
name: `Group ${chatId}`,
folder,
trigger: config.ASSISTANT_NAME,
created_at: new Date().toISOString(),
};
saveGroup(group);
// 创建群组目录和默认的 CLAUDE.md
const groupDir = path.resolve(GROUPS_DIR, folder);
fs.mkdirSync(groupDir, { recursive: true });
const claudeMdPath = path.join(groupDir, "CLAUDE.md");
if (!fs.existsSync(claudeMdPath)) {
fs.writeFileSync(claudeMdPath, `# ${group.name} 记忆\n\n这里记录重要信息。\n`);
}
logger.info({ chatId, folder }, "新群组已自动注册");
}
}
// 获取该群组的未处理消息
const unprocessed = getUnprocessedMessages(chatId);
if (unprocessed.length === 0) return;
// 检查是否需要 Agent 处理(触发词检测)
if (!shouldProcess(chatId, unprocessed)) return;
// 加入任务队列
queue.enqueue(chatId, async () => {
logger.info({ chatId, messageCount: unprocessed.length }, "开始处理消息");
// 设置"正在输入"状态
await channel.setTyping(chatId);
// 组装 prompt
const prompt = buildPrompt(chatId, unprocessed, group!.folder);
const cwd = path.resolve(GROUPS_DIR, group!.folder);
// 运行 Agent
const result = await runAgent({
prompt,
groupFolder: group!.folder,
cwd,
});
// 标记消息为已处理
markAsProcessed(chatId);
// 发送回复
if (result.response) {
const cleanResponse = stripInternalTags(result.response);
if (cleanResponse) {
await channel.sendMessage(chatId, cleanResponse);
}
}
});
}
// ========== 启动 ==========
async function main(): Promise<void> {
logger.info("MiniClaw 启动中...");
// 第1步:初始化数据库
initDatabase();
logger.info("数据库初始化完成");
// 第2步:确保必要目录存在
fs.mkdirSync(GROUPS_DIR, { recursive: true });
fs.mkdirSync(path.resolve(GROUPS_DIR, "main"), { recursive: true });
// 第3步:创建消息渠道
if (config.CHANNEL === "cli") {
// CLI 模式
const cli = new CliChannel(handleNewMessage);
channel = cli;
await cli.start();
} else {
// Telegram 模式
const telegram = new TelegramChannel(handleNewMessage);
channel = telegram;
await telegram.start();
}
logger.info({ channel: config.CHANNEL }, "消息渠道已启动");
// 第4步:启动定时任务调度器
startScheduler(queue, async (chatId: string, text: string) => {
await channel.sendMessage(chatId, text);
});
logger.info("定时任务调度器已启动");
// 第5步:注册优雅退出处理
const shutdown = async (signal: string) => {
logger.info({ signal }, "收到退出信号,正在关闭...");
try {
await channel.stop();
logger.info("消息渠道已关闭");
} catch (err) {
logger.error({ err }, "关闭消息渠道时出错");
}
logger.info("MiniClaw 已关闭,再见!");
process.exit(0);
};
process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
// 启动完毕!
logger.info({
assistant: config.ASSISTANT_NAME,
channel: config.CHANNEL,
model: config.MODEL,
maxConcurrent: config.MAX_CONCURRENT_CONTAINERS,
}, "MiniClaw 启动完成!准备接收消息...");
}
// ========== 启动入口 ==========
main().catch((err) => {
logger.error({ err }, "MiniClaw 启动失败");
process.exit(1);
});
代码讲解:
index.ts 是整个项目的"总指挥"。它按顺序做这几件事:
1. 初始化数据库 → 建表、准备好存储
2. 创建消息渠道 → 连接 Telegram 或启动 CLI
3. 启动调度器 → 定时检查有没有到期任务
4. 注册退出处理 → Ctrl+C 时优雅关闭
消息的完整流转路径是这样的:
用户发消息 → Telegram/CLI 收到
→ handleNewMessage() 被调用
→ shouldProcess() 检查触发词
→ queue.enqueue() 加入队列
→ buildPrompt() 组装 prompt
→ runAgent() 调用 Claude
→ channel.sendMessage() 发送回复
19.13 Dockerfile —— 容器化部署
如果你想把 MiniClaw 部署到服务器上,用 Docker 是最方便的。
创建 Dockerfile:
# ===== 第一阶段:构建 =====
FROM node:20-slim AS builder
# 安装 pnpm
RUN npm install -g pnpm
# 设置工作目录
WORKDIR /app
# 先复制依赖文件(利用 Docker 缓存)
COPY package.json pnpm-lock.yaml* ./
# 安装依赖
RUN pnpm install --frozen-lockfile
# 复制源代码
COPY tsconfig.json ./
COPY src/ ./src/
# 编译 TypeScript
RUN pnpm run build
# ===== 第二阶段:运行 =====
FROM node:20-slim
# 安装 pnpm(运行时也需要)
RUN npm install -g pnpm
WORKDIR /app
# 只复制运行需要的文件
COPY package.json pnpm-lock.yaml* ./
RUN pnpm install --prod --frozen-lockfile
# 从构建阶段复制编译好的代码
COPY --from=builder /app/dist ./dist
# 创建数据和群组目录
RUN mkdir -p /app/data /app/groups/main
# 环境变量
ENV NODE_ENV=production
ENV DB_PATH=/app/data/miniclaw.db
ENV GROUPS_DIR=/app/groups
# 启动命令
CMD ["node", "dist/index.js"]
使用方法:
# 构建镜像
docker build -t miniclaw .
# 运行(把你的 .env 文件传进去)
docker run -d \
--name miniclaw \
--env-file .env \
-v $(pwd)/data:/app/data \
-v $(pwd)/groups:/app/groups \
--restart unless-stopped \
miniclaw
几个关键点:
- 两阶段构建:第一阶段编译 TypeScript,第二阶段只留运行需要的文件,镜像更小
- 数据持久化:通过
-v挂载data/和groups/目录,容器删了数据还在 - 自动重启:
--restart unless-stopped确保进程崩溃后自动重启
19.14 运行和测试
开发模式(CLI)—— 最简单的测试方式
不需要 Telegram,直接在终端里和 Agent 对话:
# 确保 .env 文件里配置了 ANTHROPIC_API_KEY
# TELEGRAM_BOT_TOKEN 可以先不填
# 启动 CLI 模式
pnpm run dev:cli
你会看到:
========================================
MiniClaw CLI 模式
触发词:@mini
输入消息开始对话,输入 quit 退出
========================================
你:
试试输入:
你: @mini 你好,请自我介绍一下
等几秒钟,Agent 会回复类似这样的内容:
mini: 你好!我是 mini,一个运行在群组聊天中的 AI 助手。
我可以帮你做很多事情,比如写代码、分析问题、搜索信息等。
有什么我可以帮你的吗?
再试试让它记住点东西:
你: @mini 请记住我叫小明,我喜欢编程
Agent 会把这个信息写入 groups/main/CLAUDE.md,下次对话还记得。
开发模式(Telegram)
- 在 Telegram 中找
@BotFather,发送/newbot创建一个 Bot - 拿到 Bot Token,填入
.env文件 - 运行:
pnpm run dev
- 在 Telegram 中找到你的 Bot,发送消息就行了
常见问题排查
Q: 报错 "必须提供 ANTHROPIC_API_KEY"
A: 你的 .env 文件里没有配置 ANTHROPIC_API_KEY。去 console.anthropic.com 注册并获取 API Key。
Q: Telegram Bot 启动但收不到消息
A: 检查几件事:
- Bot Token 是否正确
- 你是否在 Telegram 中给 Bot 发了消息(Bot 不能主动给你发消息,需要你先发)
- 如果是群组,Bot 是否已经被拉进群组,且有读取消息的权限
Q: Agent 回复太慢
A: Claude Agent 需要一些时间来思考和执行工具。通常需要 5-30 秒。如果你觉得太慢,可以:
- 在
.env中把MODEL改成haiku(更快但没那么聪明) - 减少
maxTurns(在runner.ts中修改)
Q: 报错 "Cannot find module '@anthropic-ai/claude-agent-sdk'"
A: 确保你已经正确安装了依赖:pnpm install。如果还是不行,删掉 node_modules 重新安装。
Q: 数据库报错
A: 确保 data/ 目录存在且有写入权限。运行 mkdir -p data 创建。
19.15 完整代码统计
我们来数一下,总共写了多少行代码:
| 文件 | 行数 | 职责 |
|---|---|---|
src/config.ts |
~50 | 配置管理 |
src/db.ts |
~110 | 数据库操作 |
src/channels/telegram.ts |
~120 | Telegram 接入 |
src/channels/cli.ts |
~60 | 命令行接入 |
src/router.ts |
~80 | 消息路由 |
src/queue.ts |
~70 | 任务队列 |
src/agent/container.ts |
~100 | 容器管理 |
src/agent/runner.ts |
~130 | Agent 运行器(核心) |
src/agent/tools.ts |
~80 | 自定义工具 |
src/scheduler.ts |
~70 | 定时任务调度 |
src/index.ts |
~80 | 主入口 |
| 合计 | ~950 | 一个完整的 AI 助手 |
不到 1000 行代码,我们就做出了一个功能完整的 AI 助手。来和其他项目对比一下:
| 项目 | 代码量 | 功能范围 |
|---|---|---|
| MiniClaw(我们的) | ~950 行 | Telegram + CLI、Agent 对话、文件操作、定时任务 |
| NanoClaw | ~500 行核心 + Skills | WhatsApp + 多渠道、Agent Swarm、容器隔离 |
| OpenClaw | ~430,000 行 | 15+ 渠道、插件市场、企业级安全、团队协作 |
MiniClaw 在功能上介于 NanoClaw 和 OpenClaw 之间 —— 没有 NanoClaw 那么精简(它只有 ~500 行),也没有 OpenClaw 那么庞大。但对于学习和个人使用来说,刚刚好。
19.16 架构回顾 —— 数据流全景图
在结束之前,让我们站高一点,看看 MiniClaw 的完整数据流:
用户在 Telegram/CLI 发送消息
│
▼
┌──────────────────┐
│ 消息渠道层 │ telegram.ts / cli.ts
│ 收到消息 │ 将消息保存到数据库
│ 调用回调函数 │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 消息处理层 │ index.ts → handleNewMessage()
│ 查询未处理消息 │
│ 触发词检测 │ router.ts → shouldProcess()
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 任务队列层 │ queue.ts → enqueue()
│ 群组内串行 │
│ 全局并发控制 │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Prompt 组装层 │ router.ts → buildPrompt()
│ 加载群组记忆 │ 合并消息历史 + 新消息
│ 格式化为 XML │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Agent 执行层 │ runner.ts → runAgent()
│ 调用 query() │ Claude Agent SDK 启动 Agentic Loop
│ 收集输出 │ 思考 → 工具调用 → 观察 → 再思考...
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 结果处理层 │ index.ts
│ 清理内部标签 │ router.ts → stripInternalTags()
│ 标记消息已处理 │ db.ts → markAsProcessed()
└────────┬─────────┘
│
▼
┌──────────────────┐
│ 消息发送层 │ telegram.ts / cli.ts → sendMessage()
│ 长消息分段 │
│ 发送给用户 │
└──────────────────┘
同时,在后台还有定时任务调度器在默默工作:
定时任务调度器(每 60 秒检查一次)
│
▼
查询到期任务 → 加入任务队列 → 运行 Agent → 发送结果
本章小结
恭喜你!到这里,你已经完成了 MiniClaw 的全部代码。让我们回顾一下学到的要点:
- 项目结构:按职责分文件,每个文件只干一件事。config 管配置、db 管数据、router 管路由、runner 管 Agent
- 配置管理:用 Zod 做校验,确保配置合法。用
.env文件管理敏感信息 - 数据库设计:四张表(messages、groups、sessions、scheduled_tasks)覆盖所有数据需求
- 消息渠道抽象:Telegram 和 CLI 用相同的接口,切换渠道只需改一个配置
- 消息路由:用触发词控制 Agent 什么时候该工作,用 XML 格式组织上下文
- 任务队列:同群组串行 + 全局限并发,防止资源争抢
- Agent 运行器:核心就是
query()函数,配好工具和权限,让 Agent 自己去干活 - 自定义工具:给 Agent 添加定时任务等特殊能力
- 定时任务:简单的轮询机制,每分钟检查一次到期任务
- 容器隔离:生产环境用 Docker 隔离 Agent 的执行环境
整个项目不到 1000 行代码,但五脏俱全。这证明了一件事:有了好的 SDK 和清晰的架构,做一个 AI 助手并不复杂。
下一章预告
代码写完了,但还有很多可以改进的地方。下一章《扩展与优化》,我们会:
- 添加 Agent Swarm 模式(多个 Agent 协作)
- 优化记忆系统(从简单的文件变成更智能的方案)
- 添加成本监控和操作审计
- 和 NanoClaw、OpenClaw 做详细对比,看看工业级项目是怎么做的
我们的 MiniClaw 已经能跑了,接下来要让它跑得更好!