AI Agent 教程

第19章:动手实现 —— 一行一行把 MiniClaw 写出来

一句话:从零编写 MiniClaw 的完整代码,做出一个真正能用的 AI 助手。

本章目标

前置知识


好了,前面学了那么多章,终于到了最激动人心的时刻 —— 写代码!

这一章是整本教程最长、最重要的一章。我们要从一个空文件夹开始,一行一行地把 MiniClaw 的每个文件写出来。写完之后,你就拥有一个真正能工作的 AI 助手了。

不用紧张,我会把每一行代码都讲清楚。跟着敲就行。


19.1 项目初始化 —— 从零开始

第一步:创建项目目录,初始化 Node.js 项目

打开终端,运行下面的命令:

# 创建项目目录并进入
mkdir miniclaw && cd miniclaw

# 初始化 Node.js 项目(-y 表示全部使用默认设置)
pnpm init

第二步:安装依赖

MiniClaw 用到的库不多,总共就这几个:

# 安装运行时依赖
pnpm add @anthropic-ai/claude-agent-sdk better-sqlite3 grammy dotenv pino zod cron-parser

# 安装开发时依赖
pnpm add -D typescript @types/node @types/better-sqlite3 tsx

每个库是干什么的,解释一下:

干什么的 为什么选它
@anthropic-ai/claude-agent-sdk 调用 Claude Agent 本教程的主角
better-sqlite3 SQLite 数据库 轻量级,不需要额外安装数据库服务
grammy Telegram Bot 框架 API 友好,文档全,TypeScript 支持好
dotenv 读取 .env 文件 管理环境变量的标准方案
pino 日志库 性能好,JSON 格式,方便后续分析
zod 数据校验 TypeScript 类型安全的校验库
cron-parser 解析 cron 表达式 定时任务要用到

第三步:创建目录结构

# 创建源代码目录
mkdir -p src/channels src/agent

# 创建数据目录
mkdir -p groups/main data

# 创建主群组的 CLAUDE.md(Agent 的"记忆文件")
echo "# 主群组记忆\n\n这里记录重要信息。" > groups/main/CLAUDE.md

创建完后,你的项目结构应该长这样:

miniclaw/
├── src/
│   ├── channels/       # 消息渠道(Telegram、命令行)
│   │   ├── telegram.ts
│   │   └── cli.ts
│   ├── agent/          # Agent 相关
│   │   ├── runner.ts   # Agent 运行器(核心!)
│   │   ├── container.ts# 容器管理
│   │   └── tools.ts    # 自定义工具
│   ├── config.ts       # 配置管理
│   ├── db.ts           # 数据库操作
│   ├── router.ts       # 消息路由
│   ├── queue.ts        # 任务队列
│   ├── scheduler.ts    # 定时任务
│   └── index.ts        # 主入口
├── groups/             # 每个群的记忆文件
│   └── main/
│       └── CLAUDE.md
├── data/               # 数据存储目录
├── package.json
├── tsconfig.json
├── .env.example
└── Dockerfile

第四步:TypeScript 配置

创建 tsconfig.json

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ESNext",
    "moduleResolution": "bundler",
    "esModuleInterop": true,
    "strict": true,
    "outDir": "dist",
    "rootDir": "src",
    "declaration": true,
    "sourceMap": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "dist"]
}

这里面几个关键配置解释一下:

第五步:配置 package.json 的 scripts

打开 package.json,在 scripts 里加入以下内容:

{
  "name": "miniclaw",
  "version": "1.0.0",
  "type": "module",
  "scripts": {
    "dev": "tsx src/index.ts",
    "dev:cli": "CHANNEL=cli tsx src/index.ts",
    "build": "tsc",
    "start": "node dist/index.js"
  },
  "dependencies": {
    "@anthropic-ai/claude-agent-sdk": "^0.1.0",
    "better-sqlite3": "^11.0.0",
    "cron-parser": "^5.0.0",
    "dotenv": "^16.0.0",
    "grammy": "^1.30.0",
    "pino": "^9.0.0",
    "zod": "^3.23.0"
  },
  "devDependencies": {
    "@types/better-sqlite3": "^7.0.0",
    "@types/node": "^22.0.0",
    "tsx": "^4.0.0",
    "typescript": "^5.6.0"
  }
}

注意 "type": "module" 这一行 —— 这告诉 Node.js 使用 ES 模块语法。

第六步:环境变量模板

创建 .env.example 文件,告诉使用者需要配置哪些环境变量:

# ===== 必填 =====

# Anthropic API Key —— 在 console.anthropic.com 获取
ANTHROPIC_API_KEY=sk-ant-xxx

# Telegram Bot Token —— 通过 @BotFather 创建 Bot 获取
TELEGRAM_BOT_TOKEN=123456:ABC-DEF

# ===== 选填(有默认值) =====

# 助手名字,也是触发词(默认:mini)
# 在群里发 "@mini 帮我查一下天气" 就会触发
ASSISTANT_NAME=mini

# 消息渠道:telegram 或 cli(默认:telegram)
CHANNEL=telegram

# 最大并发容器数(默认:3)
MAX_CONCURRENT_CONTAINERS=3

# 空闲超时时间,毫秒(默认:300000,即5分钟)
IDLE_TIMEOUT=300000

# 容器运行超时时间,毫秒(默认:600000,即10分钟)
CONTAINER_TIMEOUT=600000

# 数据库文件路径(默认:./data/miniclaw.db)
DB_PATH=./data/miniclaw.db

# 群组目录路径(默认:./groups)
GROUPS_DIR=./groups

# 使用的模型(默认:sonnet)
MODEL=sonnet

然后把它复制一份作为你的实际配置:

cp .env.example .env
# 编辑 .env,填入你的 API Key 和 Telegram Bot Token

好了,项目骨架搭好了。接下来我们一个文件一个文件地写代码。


19.2 配置管理(config.ts)—— 约 50 行

这是最简单的一个文件。它的职责就是:从环境变量读取配置,做好校验,然后导出给其他模块用。

创建 src/config.ts

/**
 * config.ts —— MiniClaw 配置管理
 *
 * 职责:
 * 1. 从 .env 文件和环境变量中读取配置
 * 2. 用 Zod 做校验,确保配置都是合法的
 * 3. 导出配置对象给其他模块使用
 */

import "dotenv/config"; // 这一行会自动读取项目根目录的 .env 文件
import { z } from "zod";
import path from "path";

// ========== 用 Zod 定义配置的"模板" ==========
// Zod 会帮我们检查每个配置项是不是合法的
// 比如 MAX_CONCURRENT_CONTAINERS 必须是正整数,不能是 "abc"

const ConfigSchema = z.object({
  // Telegram Bot Token —— 必填(除非使用 CLI 模式)
  TELEGRAM_BOT_TOKEN: z.string().default(""),

  // Anthropic API Key —— 必填
  ANTHROPIC_API_KEY: z.string().min(1, "必须提供 ANTHROPIC_API_KEY"),

  // 助手名字,同时也是触发词
  // 在群里发 "@mini 你好" 就会触发 Agent
  ASSISTANT_NAME: z.string().default("mini"),

  // 消息渠道:telegram 或 cli
  CHANNEL: z.enum(["telegram", "cli"]).default("telegram"),

  // 最大并发任务数 —— 同时最多跑几个 Agent
  // 设太大可能把你的 API 额度吃光,设太小用户要排队
  MAX_CONCURRENT_CONTAINERS: z.coerce.number().int().positive().default(3),

  // 空闲超时 —— Agent 多久没收到新消息就关闭(毫秒)
  IDLE_TIMEOUT: z.coerce.number().int().positive().default(300000), // 5分钟

  // 容器运行超时 —— Agent 最多跑多长时间就强制停止(毫秒)
  CONTAINER_TIMEOUT: z.coerce.number().int().positive().default(600000), // 10分钟

  // 数据库文件路径
  DB_PATH: z.string().default("./data/miniclaw.db"),

  // 群组记忆文件目录
  GROUPS_DIR: z.string().default("./groups"),

  // 使用的模型
  MODEL: z.enum(["sonnet", "opus", "haiku"]).default("sonnet"),
});

// ========== 解析并校验配置 ==========
// 这里会从 process.env(环境变量)中读取所有配置
// 如果有配置不合法(比如必填项没填),程序会直接报错退出

const parseResult = ConfigSchema.safeParse(process.env);

if (!parseResult.success) {
  console.error("配置错误!请检查你的 .env 文件:");
  for (const issue of parseResult.error.issues) {
    console.error(`  - ${issue.path.join(".")}: ${issue.message}`);
  }
  process.exit(1);
}

// ========== 导出配置 ==========
// 其他文件只需要 import { config } from "./config.js" 就能用

export const config = parseResult.data;

// 把相对路径转成绝对路径,避免后续因为工作目录变化出问题
export const DB_PATH = path.resolve(config.DB_PATH);
export const GROUPS_DIR = path.resolve(config.GROUPS_DIR);

// 触发词的正则表达式
// 比如 ASSISTANT_NAME 是 "mini",那正则就是 /^@mini\b/i
// \b 是单词边界,确保 "@miniature" 不会误触发
// i 是大小写不敏感
function escapeRegex(str: string): string {
  return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}

export const TRIGGER_PATTERN = new RegExp(
  `^@${escapeRegex(config.ASSISTANT_NAME)}\\b`,
  "i"
);

// 轮询间隔 —— 多久检查一次新消息(毫秒)
export const POLL_INTERVAL = 2000;

// 定时任务检查间隔 —— 多久检查一次是否有定时任务该执行了
export const SCHEDULER_POLL_INTERVAL = 60000; // 1分钟

代码讲解:

这个文件做了三件事:

  1. 读取配置dotenv/config 会自动读取 .env 文件里的配置,放进 process.env
  2. 校验配置:用 Zod 定义每个配置项的类型和约束,如果不对就直接报错
  3. 导出配置:把处理好的配置导出给其他模块用

为什么用 Zod 而不是直接读 process.env?因为 process.env 里的值全是字符串,你需要自己转换类型(比如把 "3" 变成数字 3)。Zod 的 coerce 功能可以自动帮你做这个转换,还能校验值是否合法。


19.3 数据库层(db.ts)—— 约 110 行

这个文件负责所有数据库操作。MiniClaw 用 SQLite 作为数据库 —— 轻量、不需要额外安装服务、一个文件搞定。

创建 src/db.ts

/**
 * db.ts —— MiniClaw 数据库层
 *
 * 职责:
 * 1. 初始化数据库和表结构
 * 2. 提供消息的增删改查操作
 * 3. 提供群组信息的存取
 * 4. 提供会话(session)的存取
 * 5. 提供定时任务的存取
 *
 * 用 better-sqlite3 操作 SQLite,它是同步的,比异步简单,
 * 而且 SQLite 本来就是单线程的,同步操作反而更合适。
 */

import Database from "better-sqlite3";
import fs from "fs";
import path from "path";
import { DB_PATH } from "./config.js";

// 数据库实例 —— 在 initDatabase() 中初始化
let db: Database.Database;

// ========== 定义数据类型 ==========

/** 一条消息 */
export interface Message {
  id: string;          // 消息唯一ID
  chat_id: string;     // 聊天(群组)ID
  sender: string;      // 发送者ID
  sender_name: string; // 发送者昵称
  content: string;     // 消息内容
  timestamp: string;   // 时间戳(ISO格式)
  processed: number;   // 是否已处理:0=未处理,1=已处理
}

/** 一个群组 */
export interface Group {
  chat_id: string;     // 群组ID
  name: string;        // 群组名称
  folder: string;      // 对应的文件夹名(在 groups/ 下)
  trigger: string;     // 触发词
  created_at: string;  // 创建时间
}

/** 一条定时任务 */
export interface ScheduledTask {
  id: string;            // 任务ID
  group_folder: string;  // 所属群组文件夹
  chat_id: string;       // 要发送结果的聊天ID
  prompt: string;        // 要执行的提示词
  schedule_type: string; // 调度类型:cron / interval / once
  schedule_value: string;// 调度表达式(如 "0 9 * * *" 或 "3600000")
  next_run: string | null;  // 下次执行时间
  last_run: string | null;  // 上次执行时间
  last_result: string | null;// 上次执行结果
  status: string;        // 状态:active / paused / completed
  created_at: string;    // 创建时间
}

// ========== 建表语句 ==========

function createSchema(database: Database.Database): void {
  database.exec(`
    -- 消息表:存储所有聊天消息
    CREATE TABLE IF NOT EXISTS messages (
      id TEXT,
      chat_id TEXT,
      sender TEXT,
      sender_name TEXT,
      content TEXT,
      timestamp TEXT,
      processed INTEGER DEFAULT 0,
      PRIMARY KEY (id, chat_id)
    );
    -- 按时间戳建索引,方便按时间查询
    CREATE INDEX IF NOT EXISTS idx_msg_timestamp ON messages(timestamp);
    -- 按处理状态建索引,方便查找未处理的消息
    CREATE INDEX IF NOT EXISTS idx_msg_processed ON messages(processed);

    -- 群组表:记录注册的群组信息
    CREATE TABLE IF NOT EXISTS groups (
      chat_id TEXT PRIMARY KEY,
      name TEXT NOT NULL,
      folder TEXT NOT NULL UNIQUE,
      trigger TEXT NOT NULL,
      created_at TEXT NOT NULL
    );

    -- 会话表:记录每个群组的 Agent 会话ID
    -- 有了 session_id,下次可以接着上次的对话继续
    CREATE TABLE IF NOT EXISTS sessions (
      group_folder TEXT PRIMARY KEY,
      session_id TEXT NOT NULL
    );

    -- 定时任务表
    CREATE TABLE IF NOT EXISTS scheduled_tasks (
      id TEXT PRIMARY KEY,
      group_folder TEXT NOT NULL,
      chat_id TEXT NOT NULL,
      prompt TEXT NOT NULL,
      schedule_type TEXT NOT NULL,
      schedule_value TEXT NOT NULL,
      next_run TEXT,
      last_run TEXT,
      last_result TEXT,
      status TEXT DEFAULT 'active',
      created_at TEXT NOT NULL
    );
    CREATE INDEX IF NOT EXISTS idx_task_next_run ON scheduled_tasks(next_run);
    CREATE INDEX IF NOT EXISTS idx_task_status ON scheduled_tasks(status);
  `);
}

// ========== 初始化 ==========

/** 初始化数据库 —— 必须在程序启动时调用一次 */
export function initDatabase(): void {
  // 确保数据目录存在
  fs.mkdirSync(path.dirname(DB_PATH), { recursive: true });

  // 打开(或创建)数据库文件
  db = new Database(DB_PATH);

  // 开启 WAL 模式 —— 提高并发读写性能
  db.pragma("journal_mode = WAL");

  // 创建表结构
  createSchema(db);
}

// ========== 消息相关操作 ==========

/** 保存一条消息 */
export function saveMessage(msg: Omit<Message, "processed">): void {
  db.prepare(`
    INSERT OR REPLACE INTO messages (id, chat_id, sender, sender_name, content, timestamp, processed)
    VALUES (?, ?, ?, ?, ?, ?, 0)
  `).run(msg.id, msg.chat_id, msg.sender, msg.sender_name, msg.content, msg.timestamp);
}

/** 获取某个群组的未处理消息(按时间排序) */
export function getUnprocessedMessages(chatId: string): Message[] {
  return db.prepare(`
    SELECT * FROM messages
    WHERE chat_id = ? AND processed = 0
    ORDER BY timestamp ASC
  `).all(chatId) as Message[];
}

/** 把消息标记为已处理 */
export function markAsProcessed(chatId: string): void {
  db.prepare(`
    UPDATE messages SET processed = 1
    WHERE chat_id = ? AND processed = 0
  `).run(chatId);
}

/** 获取某个群组最近 N 条消息(用于给 Agent 提供上下文) */
export function getRecentMessages(chatId: string, limit: number = 50): Message[] {
  return db.prepare(`
    SELECT * FROM messages
    WHERE chat_id = ?
    ORDER BY timestamp DESC
    LIMIT ?
  `).all(chatId, limit) as Message[];
}

// ========== 群组相关操作 ==========

/** 注册一个群组(或更新已有群组) */
export function saveGroup(group: Group): void {
  db.prepare(`
    INSERT OR REPLACE INTO groups (chat_id, name, folder, trigger, created_at)
    VALUES (?, ?, ?, ?, ?)
  `).run(group.chat_id, group.name, group.folder, group.trigger, group.created_at);
}

/** 根据 chat_id 查找群组 */
export function getGroup(chatId: string): Group | undefined {
  return db.prepare(`SELECT * FROM groups WHERE chat_id = ?`).get(chatId) as Group | undefined;
}

/** 获取所有已注册的群组 */
export function getAllGroups(): Group[] {
  return db.prepare(`SELECT * FROM groups`).all() as Group[];
}

// ========== 会话相关操作 ==========

/** 保存或更新某个群组的会话ID */
export function saveSession(groupFolder: string, sessionId: string): void {
  db.prepare(`
    INSERT OR REPLACE INTO sessions (group_folder, session_id)
    VALUES (?, ?)
  `).run(groupFolder, sessionId);
}

/** 获取某个群组的会话ID */
export function getSession(groupFolder: string): string | undefined {
  const row = db.prepare(`
    SELECT session_id FROM sessions WHERE group_folder = ?
  `).get(groupFolder) as { session_id: string } | undefined;
  return row?.session_id;
}

// ========== 定时任务相关操作 ==========

/** 创建一条定时任务 */
export function createTask(task: ScheduledTask): void {
  db.prepare(`
    INSERT INTO scheduled_tasks
    (id, group_folder, chat_id, prompt, schedule_type, schedule_value, next_run, status, created_at)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
  `).run(
    task.id, task.group_folder, task.chat_id, task.prompt,
    task.schedule_type, task.schedule_value, task.next_run,
    task.status, task.created_at
  );
}

/** 获取所有到期该执行的任务 */
export function getDueTasks(): ScheduledTask[] {
  const now = new Date().toISOString();
  return db.prepare(`
    SELECT * FROM scheduled_tasks
    WHERE status = 'active' AND next_run IS NOT NULL AND next_run <= ?
    ORDER BY next_run ASC
  `).all(now) as ScheduledTask[];
}

/** 获取某个群组的所有定时任务 */
export function getTasksForGroup(groupFolder: string): ScheduledTask[] {
  return db.prepare(`
    SELECT * FROM scheduled_tasks
    WHERE group_folder = ?
    ORDER BY created_at DESC
  `).all(groupFolder) as ScheduledTask[];
}

/** 更新任务的执行状态 */
export function updateTaskAfterRun(
  taskId: string,
  nextRun: string | null,
  lastResult: string
): void {
  const now = new Date().toISOString();
  db.prepare(`
    UPDATE scheduled_tasks
    SET next_run = ?, last_run = ?, last_result = ?,
        status = CASE WHEN ? IS NULL THEN 'completed' ELSE status END
    WHERE id = ?
  `).run(nextRun, now, lastResult, nextRun, taskId);
}

/** 删除一条定时任务 */
export function deleteTask(taskId: string): void {
  db.prepare(`DELETE FROM scheduled_tasks WHERE id = ?`).run(taskId);
}

代码讲解:

数据库层是整个项目的"数据仓库"。它的设计遵循一个原则:对外暴露简单的函数,对内封装 SQL 细节

几个要点:

  1. 表设计:四张表对应四种数据 —— 消息、群组、会话、定时任务
  2. WAL 模式:SQLite 默认用 journal 模式,一次只能一个写操作。开启 WAL(Write-Ahead Logging)后,读和写可以同时进行
  3. 同步操作:better-sqlite3 是同步的,在 Node.js 里这不是问题,因为 SQLite 的操作本身就很快(通常毫秒级)

19.4 Telegram 接入(channels/telegram.ts)—— 约 120 行

这是 MiniClaw 的"耳朵和嘴巴" —— 通过 Telegram 接收用户消息,把 Agent 的回复发送回去。

创建 src/channels/telegram.ts

/**
 * channels/telegram.ts —— Telegram 消息渠道
 *
 * 职责:
 * 1. 连接 Telegram Bot API
 * 2. 监听群组和私聊消息
 * 3. 把消息转换成统一格式保存到数据库
 * 4. 把 Agent 的回复发送给用户
 *
 * 使用 grammy 库 —— 一个很好用的 Telegram Bot 框架
 */

import { Bot, Context } from "grammy";
import { config } from "../config.js";
import { saveMessage } from "../db.js";
import pino from "pino";

const logger = pino({ name: "telegram" });

// ========== 类型定义 ==========

/** 消息处理回调 —— 当收到新消息时调用 */
export type OnMessageCallback = (chatId: string, message: {
  id: string;
  chat_id: string;
  sender: string;
  sender_name: string;
  content: string;
  timestamp: string;
}) => void;

// ========== Telegram 渠道类 ==========

export class TelegramChannel {
  private bot: Bot;
  private onMessage: OnMessageCallback;

  constructor(onMessage: OnMessageCallback) {
    // 检查 Token 是否配置了
    if (!config.TELEGRAM_BOT_TOKEN) {
      throw new Error(
        "TELEGRAM_BOT_TOKEN 未配置!请在 .env 文件中设置。\n" +
        "获取方法:在 Telegram 中找 @BotFather,发送 /newbot 创建一个新 Bot"
      );
    }

    this.bot = new Bot(config.TELEGRAM_BOT_TOKEN);
    this.onMessage = onMessage;

    // 注册消息处理器
    this.setupHandlers();
  }

  /** 设置消息处理器 */
  private setupHandlers(): void {
    // 监听所有文本消息
    this.bot.on("message:text", async (ctx: Context) => {
      // 拿到消息的基本信息
      const msg = ctx.message!;
      const chatId = String(msg.chat.id);
      const senderId = String(msg.from?.id || "unknown");
      const senderName = this.getSenderName(msg.from);
      const content = msg.text || "";
      const timestamp = new Date(msg.date * 1000).toISOString();
      const messageId = String(msg.message_id);

      // 忽略空消息
      if (!content.trim()) return;

      logger.info({
        chatId,
        sender: senderName,
        contentLength: content.length,
      }, "收到 Telegram 消息");

      // 保存到数据库
      const message = {
        id: messageId,
        chat_id: chatId,
        sender: senderId,
        sender_name: senderName,
        content,
        timestamp,
      };

      saveMessage(message);

      // 通知主程序有新消息
      this.onMessage(chatId, message);
    });

    // 监听错误
    this.bot.catch((err) => {
      logger.error({ err }, "Telegram Bot 错误");
    });
  }

  /** 获取发送者的显示名称 */
  private getSenderName(from: Context["message"] extends { from?: infer F } ? F : never): string {
    if (!from) return "未知用户";
    if (from.first_name && from.last_name) {
      return `${from.first_name} ${from.last_name}`;
    }
    return from.first_name || from.username || "未知用户";
  }

  /** 启动 Bot */
  async start(): Promise<void> {
    logger.info("Telegram Bot 启动中...");

    // 获取 Bot 信息(顺便验证 Token 是否有效)
    const me = await this.bot.api.getMe();
    logger.info({ botName: me.first_name, botUsername: me.username }, "Telegram Bot 已连接");

    // 开始接收消息
    // 注意:bot.start() 会阻塞,所以不要 await 它
    this.bot.start({
      // 允许的更新类型
      allowed_updates: ["message"],
      // 遇到错误时的重试策略
      onStart: (info) => {
        logger.info({ username: info.username }, "Bot 开始接收消息");
      },
    });
  }

  /** 发送消息到指定聊天 */
  async sendMessage(chatId: string, text: string): Promise<void> {
    // Telegram 单条消息最大 4096 字符
    // 如果 Agent 回复太长,需要分段发送
    const MAX_LENGTH = 4000; // 留点余量

    if (text.length <= MAX_LENGTH) {
      // 短消息直接发
      await this.bot.api.sendMessage(chatId, text);
    } else {
      // 长消息分段发
      const chunks = this.splitMessage(text, MAX_LENGTH);
      for (const chunk of chunks) {
        await this.bot.api.sendMessage(chatId, chunk);
        // 每段之间稍微等一下,避免发送太快被限流
        await new Promise(resolve => setTimeout(resolve, 200));
      }
    }
  }

  /** 把长文本按段落边界切分 */
  private splitMessage(text: string, maxLength: number): string[] {
    const chunks: string[] = [];
    let remaining = text;

    while (remaining.length > 0) {
      if (remaining.length <= maxLength) {
        chunks.push(remaining);
        break;
      }

      // 尽量在换行符处切分,避免把一段话切成两半
      let cutPoint = remaining.lastIndexOf("\n", maxLength);
      if (cutPoint === -1 || cutPoint < maxLength * 0.5) {
        // 如果没有合适的换行符,在空格处切分
        cutPoint = remaining.lastIndexOf(" ", maxLength);
      }
      if (cutPoint === -1 || cutPoint < maxLength * 0.5) {
        // 实在没办法,硬切
        cutPoint = maxLength;
      }

      chunks.push(remaining.slice(0, cutPoint));
      remaining = remaining.slice(cutPoint).trimStart();
    }

    return chunks;
  }

  /** 设置"正在输入"状态 */
  async setTyping(chatId: string): Promise<void> {
    try {
      await this.bot.api.sendChatAction(chatId, "typing");
    } catch {
      // 设置输入状态失败不影响功能,忽略
    }
  }

  /** 停止 Bot */
  async stop(): Promise<void> {
    logger.info("Telegram Bot 停止中...");
    this.bot.stop();
  }
}

代码讲解:

Telegram 渠道做的事情很简单:

  1. 收消息bot.on("message:text", ...) 监听所有文本消息,把它转换成统一格式保存到数据库
  2. 发消息sendMessage() 把 Agent 的回复发给用户。需要注意 Telegram 单条消息有 4096 字符限制,超长要分段发送
  3. 分段逻辑:尽量在换行符或空格处切分,而不是把一个词切成两半

有一个需要注意的地方:bot.start() 是一个长时间运行的操作(它会不停地轮询 Telegram 服务器获取新消息),所以我们不 await 它,而是让它在后台跑。


19.5 CLI 接入(channels/cli.ts)—— 约 60 行

开发和测试时不想每次都用 Telegram 怎么办?搞一个命令行接口就行了。直接在终端里和 Agent 对话。

创建 src/channels/cli.ts

/**
 * channels/cli.ts —— 命令行消息渠道
 *
 * 职责:
 * 1. 从终端读取用户输入
 * 2. 把输入转换成统一格式的消息
 * 3. 在终端显示 Agent 的回复
 *
 * 主要用于开发和测试 —— 不需要配置 Telegram,
 * 直接在终端里就能和 Agent 对话。
 */

import readline from "readline";
import { config } from "../config.js";
import { saveMessage } from "../db.js";
import pino from "pino";

const logger = pino({ name: "cli" });

// CLI 模式下只有一个"聊天",用固定的 chat_id
const CLI_CHAT_ID = "cli:local";
const CLI_SENDER = "local-user";
const CLI_SENDER_NAME = "你";

/** 消息处理回调 */
export type OnMessageCallback = (chatId: string, message: {
  id: string;
  chat_id: string;
  sender: string;
  sender_name: string;
  content: string;
  timestamp: string;
}) => void;

export class CliChannel {
  private rl: readline.Interface;
  private onMessage: OnMessageCallback;
  private messageCounter = 0;

  constructor(onMessage: OnMessageCallback) {
    this.onMessage = onMessage;

    // 创建 readline 接口,从标准输入读取
    this.rl = readline.createInterface({
      input: process.stdin,
      output: process.stdout,
    });
  }

  /** 启动 CLI 交互 */
  async start(): Promise<void> {
    logger.info("CLI 模式启动");
    console.log("========================================");
    console.log(`  MiniClaw CLI 模式`);
    console.log(`  触发词:@${config.ASSISTANT_NAME}`);
    console.log(`  输入消息开始对话,输入 quit 退出`);
    console.log("========================================\n");

    // 显示提示符,等待用户输入
    this.promptUser();
  }

  /** 显示提示符,等待用户输入 */
  private promptUser(): void {
    this.rl.question("你: ", (input) => {
      const trimmed = input.trim();

      // 输入 quit 或 exit 退出
      if (trimmed === "quit" || trimmed === "exit") {
        console.log("再见!");
        process.exit(0);
      }

      // 忽略空输入
      if (!trimmed) {
        this.promptUser();
        return;
      }

      // 构造消息对象
      this.messageCounter++;
      const message = {
        id: `cli-${this.messageCounter}`,
        chat_id: CLI_CHAT_ID,
        sender: CLI_SENDER,
        sender_name: CLI_SENDER_NAME,
        content: trimmed,
        timestamp: new Date().toISOString(),
      };

      // 保存到数据库
      saveMessage(message);

      // 通知主程序
      this.onMessage(CLI_CHAT_ID, message);

      // 继续等待下一条输入
      // 注意:Agent 处理消息是异步的,不会阻塞输入
      this.promptUser();
    });
  }

  /** 发送消息(在 CLI 模式下就是打印到终端) */
  async sendMessage(_chatId: string, text: string): Promise<void> {
    console.log(`\n${config.ASSISTANT_NAME}: ${text}\n`);
  }

  /** 设置输入状态(CLI 模式下显示一个提示) */
  async setTyping(_chatId: string): Promise<void> {
    process.stdout.write(`[${config.ASSISTANT_NAME} 正在思考...]\r`);
  }

  /** 停止 CLI */
  async stop(): Promise<void> {
    this.rl.close();
  }
}

代码讲解:

CLI 渠道是 Telegram 渠道的"简化版"。它用 Node.js 内置的 readline 模块,在终端里实现一问一答的交互。

开发时可以用 pnpm run dev:cli 启动 CLI 模式,不需要配置 Telegram Bot,方便调试。


19.6 消息路由(router.ts)—— 约 80 行

路由器是 MiniClaw 的"分拣中心"。收到一条消息后,它决定:这条消息要不要交给 Agent 处理?如果要,怎么把消息和上下文一起打包发给 Agent?

创建 src/router.ts

/**
 * router.ts —— 消息路由器
 *
 * 职责:
 * 1. 判断一条消息是否需要 Agent 处理(触发词检测)
 * 2. 把消息和上下文打包成 Agent 能理解的格式
 * 3. 加载群组的 CLAUDE.md 记忆文件
 * 4. 组装最终的 prompt
 *
 * 消息格式采用 XML 标签 —— 这是 NanoClaw 的做法,
 * 因为 Claude 对 XML 格式的理解特别好。
 */

import fs from "fs";
import path from "path";
import { config, TRIGGER_PATTERN, GROUPS_DIR } from "./config.js";
import { Message, getRecentMessages, getGroup } from "./db.js";
import pino from "pino";

const logger = pino({ name: "router" });

// ========== XML 转义 ==========

/** 转义 XML 特殊字符,防止消息内容破坏 XML 结构 */
function escapeXml(s: string): string {
  if (!s) return "";
  return s
    .replace(/&/g, "&amp;")
    .replace(/</g, "&lt;")
    .replace(/>/g, "&gt;")
    .replace(/"/g, "&quot;");
}

// ========== 触发词检测 ==========

/**
 * 检查消息是否需要 Agent 处理
 *
 * 触发条件:
 * 1. 消息以触发词开头(如 "@mini")
 * 2. 这是一条私聊消息(不需要触发词)
 *
 * 返回 true 表示"这条消息需要处理"
 */
export function shouldProcess(chatId: string, messages: Message[]): boolean {
  // 私聊消息(CLI 模式)总是处理
  if (chatId.startsWith("cli:")) return true;

  // 群组消息:检查是否有触发词
  return messages.some((msg) => TRIGGER_PATTERN.test(msg.content.trim()));
}

// ========== 消息格式化 ==========

/**
 * 把消息列表格式化成 XML 格式
 *
 * 输出示例:
 * <messages>
 *   <message sender="小明" time="2024-01-01T12:00:00Z">@mini 今天天气怎么样?</message>
 *   <message sender="小红" time="2024-01-01T12:01:00Z">我也想知道</message>
 * </messages>
 */
export function formatMessages(messages: Message[]): string {
  const lines = messages.map((m) =>
    `<message sender="${escapeXml(m.sender_name)}" time="${m.timestamp}">${escapeXml(m.content)}</message>`
  );
  return `<messages>\n${lines.join("\n")}\n</messages>`;
}

// ========== 加载群组记忆 ==========

/**
 * 读取群组的 CLAUDE.md 文件
 *
 * 每个群组在 groups/ 目录下有自己的文件夹,
 * 里面的 CLAUDE.md 就是 Agent 对这个群的"记忆"。
 * Agent 可以在对话过程中修改这个文件,实现持久记忆。
 */
export function loadGroupMemory(groupFolder: string): string {
  const claudeMdPath = path.join(GROUPS_DIR, groupFolder, "CLAUDE.md");
  try {
    if (fs.existsSync(claudeMdPath)) {
      return fs.readFileSync(claudeMdPath, "utf-8");
    }
  } catch (err) {
    logger.warn({ groupFolder, err }, "读取群组记忆失败");
  }
  return "";
}

// ========== 组装完整 Prompt ==========

/**
 * 组装发送给 Agent 的完整 prompt
 *
 * 包含三部分信息:
 * 1. 群组记忆(CLAUDE.md 的内容)
 * 2. 最近的聊天历史(给 Agent 提供上下文)
 * 3. 当前待处理的新消息
 *
 * 这个结构参考了 NanoClaw 的设计 —— 用 XML 标签把不同部分包起来,
 * Claude 能很好地理解这种结构。
 */
export function buildPrompt(
  chatId: string,
  newMessages: Message[],
  groupFolder: string
): string {
  const parts: string[] = [];

  // 第一部分:群组记忆
  const memory = loadGroupMemory(groupFolder);
  if (memory) {
    parts.push(`<group_memory>\n${memory}\n</group_memory>`);
  }

  // 第二部分:最近的聊天历史
  // 取最近 30 条消息作为上下文,帮助 Agent 理解对话背景
  const history = getRecentMessages(chatId, 30);
  if (history.length > 0) {
    // getRecentMessages 返回的是倒序的,翻转成正序
    const chronological = [...history].reverse();
    parts.push(`<recent_history>\n${formatMessages(chronological)}\n</recent_history>`);
  }

  // 第三部分:当前待处理的新消息
  parts.push(`<new_messages>\n${formatMessages(newMessages)}\n</new_messages>`);

  return parts.join("\n\n");
}

/**
 * 去掉 Agent 回复中的内部标签
 *
 * Agent 有时会用 <internal>...</internal> 来做内部推理,
 * 这些内容不应该发给用户看。
 */
export function stripInternalTags(text: string): string {
  return text.replace(/<internal>[\s\S]*?<\/internal>/g, "").trim();
}

代码讲解:

路由器的核心是 buildPrompt() 函数。它把三种信息组装在一起:

  1. 群组记忆:Agent 对这个群的长期记忆(CLAUDE.md),比如群规、重要事项
  2. 聊天历史:最近的对话,帮 Agent 理解当前语境
  3. 新消息:需要 Agent 处理的消息

为什么用 XML 标签?因为 Claude 模型对 XML 的理解特别好。用 <group_memory><new_messages> 这样的标签把不同内容分开,Agent 就能清楚地区分"这是记忆"和"这是用户的新消息"。


19.7 任务队列(queue.ts)—— 约 70 行

任务队列解决两个问题:

  1. 同一个群组的消息要串行处理 —— 如果 Agent 还在处理上一条消息,新消息要排队等着
  2. 全局并发要控制 —— 不能同时跑太多 Agent,否则 API 额度很快就用完了

创建 src/queue.ts

/**
 * queue.ts —— 任务队列
 *
 * 职责:
 * 1. 每个群组一个队列,同一群组的任务串行执行
 * 2. 全局并发限制,防止同时运行太多 Agent
 * 3. 支持任务入队、出队、执行
 *
 * 设计思路:
 * - 用 Map 维护每个群组的队列状态
 * - 用计数器控制全局并发数
 * - 任务完成后自动执行队列中的下一个任务
 */

import { config } from "./config.js";
import pino from "pino";

const logger = pino({ name: "queue" });

/** 任务函数类型 —— 返回 Promise,完成时 resolve */
type TaskFn = () => Promise<void>;

/** 每个群组的队列状态 */
interface GroupQueueState {
  isRunning: boolean;       // 当前是否有任务在执行
  pendingTasks: TaskFn[];   // 等待执行的任务列表
}

export class TaskQueue {
  // 每个群组(chatId)一个队列
  private groups = new Map<string, GroupQueueState>();

  // 当前全局正在运行的任务数
  private activeCount = 0;

  // 因为全局并发限制而等待的任务
  private waitingForSlot: Array<{ chatId: string; task: TaskFn }> = [];

  /** 获取或创建某个群组的队列状态 */
  private getGroupState(chatId: string): GroupQueueState {
    let state = this.groups.get(chatId);
    if (!state) {
      state = { isRunning: false, pendingTasks: [] };
      this.groups.set(chatId, state);
    }
    return state;
  }

  /**
   * 往队列里加一个任务
   *
   * 逻辑:
   * 1. 如果这个群组当前没有任务在跑 → 直接执行
   * 2. 如果这个群组有任务在跑 → 排队等着
   * 3. 如果全局并发数已满 → 等有空位再执行
   */
  enqueue(chatId: string, task: TaskFn): void {
    const state = this.getGroupState(chatId);

    if (state.isRunning) {
      // 该群组有任务在跑,排队
      state.pendingTasks.push(task);
      logger.debug({ chatId, queueLength: state.pendingTasks.length }, "任务已排队(群组内)");
      return;
    }

    if (this.activeCount >= config.MAX_CONCURRENT_CONTAINERS) {
      // 全局并发数已满,等有空位
      this.waitingForSlot.push({ chatId, task });
      logger.debug({
        chatId,
        activeCount: this.activeCount,
        waitingCount: this.waitingForSlot.length,
      }, "任务已排队(等待空位)");
      return;
    }

    // 可以直接执行
    this.executeTask(chatId, task);
  }

  /** 执行一个任务 */
  private async executeTask(chatId: string, task: TaskFn): Promise<void> {
    const state = this.getGroupState(chatId);
    state.isRunning = true;
    this.activeCount++;

    logger.debug({
      chatId,
      activeCount: this.activeCount,
    }, "开始执行任务");

    try {
      await task();
    } catch (err) {
      logger.error({ chatId, err }, "任务执行出错");
    } finally {
      state.isRunning = false;
      this.activeCount--;

      logger.debug({
        chatId,
        activeCount: this.activeCount,
      }, "任务执行完毕");

      // 任务完成了,看看有没有后续任务
      this.drainNext(chatId);
    }
  }

  /** 任务完成后,看看有没有下一个要执行的 */
  private drainNext(chatId: string): void {
    const state = this.getGroupState(chatId);

    // 优先执行同群组的下一个任务
    if (state.pendingTasks.length > 0) {
      const nextTask = state.pendingTasks.shift()!;
      this.executeTask(chatId, nextTask);
      return;
    }

    // 该群组没有排队的了,看看有没有其他群组在等空位
    if (this.waitingForSlot.length > 0) {
      const next = this.waitingForSlot.shift()!;
      this.executeTask(next.chatId, next.task);
    }
  }

  /** 获取当前队列状态(用于调试和监控) */
  getStatus(): { active: number; waiting: number; groups: number } {
    return {
      active: this.activeCount,
      waiting: this.waitingForSlot.length,
      groups: this.groups.size,
    };
  }
}

代码讲解:

任务队列的核心逻辑可以用一句话概括:同一群组串行,全局限并发

想象一个场景:你有三个群组同时在用 MiniClaw,MAX_CONCURRENT_CONTAINERS 设为 3。

这样就不会因为消息洪水把你的 API 额度吃光。


19.8 容器管理(agent/container.ts)—— 约 100 行

在生产环境中,我们希望 Agent 在 Docker 容器里运行 —— 这样即使 Agent 执行了危险命令(比如 rm -rf /),也只会影响容器内部,不会伤害你的宿主机。

但开发时每次都启动 Docker 太麻烦了。所以我们提供两种模式:

  1. 容器模式(生产用):Agent 在 Docker 容器中运行
  2. 直连模式(开发用):Agent 直接在当前进程中运行

创建 src/agent/container.ts

/**
 * agent/container.ts —— 容器管理
 *
 * 职责:
 * 1. 创建 Docker 容器,设置资源限制和文件挂载
 * 2. 在容器内启动 Agent
 * 3. 管理容器的生命周期(创建、启动、停止、销毁)
 * 4. 提供"直连模式"作为开发替代方案
 *
 * 容器的作用就像一个"沙箱" ——
 * Agent 在里面随便折腾,都不会影响外面的世界。
 */

import { execSync, spawn, ChildProcess } from "child_process";
import path from "path";
import { config, GROUPS_DIR } from "../config.js";
import pino from "pino";

const logger = pino({ name: "container" });

// ========== Docker 工具函数 ==========

/** 检查 Docker 是否可用 */
export function isDockerAvailable(): boolean {
  try {
    execSync("docker info", { stdio: "ignore" });
    return true;
  } catch {
    return false;
  }
}

// ========== 容器选项 ==========

export interface ContainerOptions {
  /** 群组文件夹名 —— 用于文件挂载 */
  groupFolder: string;
  /** 容器名称前缀 */
  namePrefix?: string;
  /** CPU 限制(核心数,比如 1.0 表示最多用 1 个核心) */
  cpuLimit?: number;
  /** 内存限制(比如 "512m") */
  memoryLimit?: string;
}

// ========== Docker 容器模式 ==========

/**
 * 在 Docker 容器中运行命令
 *
 * 容器设置:
 * - 挂载群组目录(读写),让 Agent 能修改 CLAUDE.md
 * - 挂载项目目录(只读),让 Agent 能看到项目代码
 * - 设置资源限制,防止 Agent 吃光 CPU 和内存
 * - 设置网络隔离,只允许访问必要的地址
 * - 容器运行结束后自动删除(--rm)
 */
export function createContainer(
  command: string[],
  options: ContainerOptions,
  env: Record<string, string> = {}
): ChildProcess {
  const containerName = `${options.namePrefix || "miniclaw"}-${options.groupFolder}-${Date.now()}`;
  const groupDir = path.resolve(GROUPS_DIR, options.groupFolder);

  // 组装 docker run 命令的参数
  const args: string[] = [
    "run",
    "--rm",                                   // 容器退出后自动删除
    "--name", containerName,                  // 容器名称
    "--cpus", String(options.cpuLimit || 1),   // CPU 限制
    "--memory", options.memoryLimit || "512m", // 内存限制
    "-v", `${groupDir}:/workspace/group:rw`,  // 挂载群组目录(可读写)
    "-w", "/workspace/group",                 // 工作目录设为群组目录
  ];

  // 注入环境变量
  for (const [key, value] of Object.entries(env)) {
    args.push("-e", `${key}=${value}`);
  }

  // 使用 Node.js 镜像
  args.push("node:20-slim");

  // 运行的命令
  args.push(...command);

  logger.info({ containerName, groupFolder: options.groupFolder }, "创建 Docker 容器");

  const proc = spawn("docker", args, {
    stdio: ["pipe", "pipe", "pipe"],
  });

  // 设置超时 —— 防止容器运行太久
  const timeout = setTimeout(() => {
    logger.warn({ containerName }, "容器运行超时,强制停止");
    try {
      execSync(`docker stop ${containerName}`, { timeout: 10000 });
    } catch {
      execSync(`docker kill ${containerName}`, { timeout: 5000 }).toString();
    }
  }, config.CONTAINER_TIMEOUT);

  // 容器退出时清除超时定时器
  proc.on("exit", () => {
    clearTimeout(timeout);
    logger.info({ containerName }, "Docker 容器已退出");
  });

  return proc;
}

// ========== 直连模式(开发用) ==========

/**
 * 不用 Docker,直接在当前 Node.js 进程中运行
 *
 * 优点:启动快,调试方便
 * 缺点:没有隔离,Agent 执行的命令会直接影响宿主机
 *
 * 仅用于开发和测试!生产环境请使用 Docker 容器模式。
 */
export function runDirect(
  command: string[],
  cwd: string,
  env: Record<string, string> = {}
): ChildProcess {
  logger.info({ cwd }, "直连模式运行(无容器隔离)");

  const proc = spawn(command[0], command.slice(1), {
    cwd,
    env: { ...process.env, ...env },
    stdio: ["pipe", "pipe", "pipe"],
  });

  return proc;
}

/**
 * 获取群组的工作目录
 *
 * 在容器模式下是 /workspace/group
 * 在直连模式下是 groups/{groupFolder} 的绝对路径
 */
export function getWorkDir(groupFolder: string, useContainer: boolean): string {
  if (useContainer) {
    return "/workspace/group";
  }
  return path.resolve(GROUPS_DIR, groupFolder);
}

代码讲解:

容器管理提供了两种运行模式:

  1. Docker 容器模式:安全但稍慢,Agent 在隔离环境中运行。适合生产环境
  2. 直连模式:快速但不安全,Agent 直接在当前进程中运行。适合开发调试

容器模式的关键是文件挂载


19.9 Agent 运行器(agent/runner.ts)—— 核心!约 130 行

这是整个 MiniClaw 最核心的文件。 它调用 Claude Agent SDK 的 query() 函数,让 Agent 真正"活"起来。

创建 src/agent/runner.ts

/**
 * agent/runner.ts —— Agent 运行器
 *
 * 这是 MiniClaw 的心脏!
 *
 * 职责:
 * 1. 调用 Claude Agent SDK 的 query() 函数启动 Agent
 * 2. 配置 Agent 的能力(工具、权限、模型等)
 * 3. 收集 Agent 的输出
 * 4. 管理 Agent 会话(支持恢复上次对话)
 * 5. 错误处理和超时控制
 *
 * 整个 MiniClaw 最重要的代码就在这里了。
 * 前面所有的准备工作 —— 配置、数据库、消息渠道、路由、队列 ——
 * 都是为了把数据送到这里,然后把 Agent 的回复送出去。
 */

import { query } from "@anthropic-ai/claude-agent-sdk";
import fs from "fs";
import path from "path";
import { config, GROUPS_DIR } from "../config.js";
import { saveSession, getSession } from "../db.js";
import pino from "pino";

const logger = pino({ name: "agent-runner" });

// ========== 默认的 System Prompt ==========

/**
 * Agent 的"人设" —— 告诉 Agent 它是谁、该怎么做事
 *
 * 这是整个 MiniClaw 最重要的一段文本。
 * 它决定了 Agent 的行为方式、说话风格、安全边界。
 *
 * 你可以根据自己的需求修改这段 prompt。
 * 比如如果你想让 Agent 说英文,就把这里改成英文。
 */
const DEFAULT_SYSTEM_PROMPT = `你是 ${config.ASSISTANT_NAME},一个运行在群组聊天中的 AI 助手。

## 你的身份
- 你是一个乐于助人的 AI 助手,通过聊天群和用户交流
- 你的名字是 ${config.ASSISTANT_NAME}
- 用户通过在消息开头加 @${config.ASSISTANT_NAME} 来呼唤你

## 你的能力
- 你可以读写文件、执行命令、搜索网页
- 你可以帮用户写代码、分析问题、查找信息
- 你可以修改当前目录下的 CLAUDE.md 来记住重要信息
- 你可以创建定时任务(通过 schedule_task 工具)

## 行为准则
1. 回复简洁明了,不要过于冗长
2. 使用和用户相同的语言回复(如果用户说中文,你就用中文)
3. 如果不确定,就问用户,不要瞎猜
4. 执行危险操作前要确认(比如删除文件)
5. 始终保持友好和专业

## 记忆系统
- 当前目录有一个 CLAUDE.md 文件,这是你的"记忆"
- 需要记住的重要信息,请写入 CLAUDE.md
- 每次对话开始时,CLAUDE.md 的内容会自动加载

## 消息格式
- 用户的消息用 XML 标签包裹,格式为 <message sender="用户名" time="时间">内容</message>
- 如果有多条消息,它们按时间顺序排列
- <recent_history> 是最近的聊天历史,用于了解上下文
- <new_messages> 是新收到的、需要你处理的消息
- <group_memory> 是群组记忆(CLAUDE.md 的内容)

## 安全规则
- 不要执行可能损坏系统的命令(如 rm -rf /)
- 不要泄露 API Key 或其他敏感信息
- 不要访问不该访问的文件和目录
`;

// ========== Agent 运行器 ==========

/** Agent 运行选项 */
export interface RunAgentOptions {
  /** 要发送给 Agent 的 prompt */
  prompt: string;
  /** 群组文件夹名 */
  groupFolder: string;
  /** Agent 的工作目录 */
  cwd: string;
  /** 自定义 system prompt(可选,默认用上面的) */
  systemPrompt?: string;
  /** 是否为定时任务触发(可选) */
  isScheduledTask?: boolean;
}

/** Agent 运行结果 */
export interface RunAgentResult {
  /** Agent 的回复文本 */
  response: string;
  /** 会话ID(用于下次恢复对话) */
  sessionId?: string;
  /** 是否成功 */
  success: boolean;
  /** 错误信息(如果失败) */
  error?: string;
}

/**
 * 运行 Agent —— 这是最核心的函数!
 *
 * 做三件事:
 * 1. 调用 query() 启动 Agent 循环
 * 2. 收集 Agent 的输出
 * 3. 保存会话ID(下次可以接着聊)
 */
export async function runAgent(options: RunAgentOptions): Promise<RunAgentResult> {
  const { prompt, groupFolder, cwd, systemPrompt, isScheduledTask } = options;

  // 尝试获取之前的会话ID,实现"接着上次聊"
  const existingSessionId = getSession(groupFolder);
  logger.info({
    groupFolder,
    hasSession: !!existingSessionId,
    promptLength: prompt.length,
    isScheduledTask,
  }, "启动 Agent");

  // 确保群组工作目录存在
  fs.mkdirSync(cwd, { recursive: true });

  // 收集 Agent 的所有文本输出
  const results: string[] = [];

  // 新的会话ID(如果创建了新会话的话)
  let newSessionId: string | undefined;

  // 计时开始
  const startTime = Date.now();

  try {
    // ========== 核心:调用 query() ==========
    // 这里就是整个 MiniClaw 的"灵魂"所在
    // query() 返回一个异步迭代器,每次 yield 一条消息

    for await (const message of query({
      prompt: isScheduledTask
        ? `[定时任务] ${prompt}`
        : prompt,
      options: {
        // System Prompt —— Agent 的"人设"
        systemPrompt: systemPrompt || DEFAULT_SYSTEM_PROMPT,

        // 允许 Agent 使用的工具
        // 这里给了 Agent 相当多的能力,你可以根据需要限制
        allowedTools: [
          "Read",       // 读文件
          "Write",      // 写文件
          "Edit",       // 编辑文件
          "Bash",       // 执行命令
          "Glob",       // 查找文件
          "Grep",       // 搜索文件内容
          "WebSearch",  // 搜索网页
          "WebFetch",   // 获取网页内容
          "Task",       // 创建子任务
        ],

        // 权限模式 —— 绕过所有权限检查
        // 因为 Agent 已经在容器里了,有容器隔离保底
        // 如果你没有用容器,建议改成 "default"
        permissionMode: "bypassPermissions",

        // Agent 的工作目录
        cwd,

        // 最多执行多少轮
        // 每一轮是 Agent 的一次"思考 → 行动"
        // 20 轮对大部分任务够用了
        maxTurns: 20,

        // 使用的模型
        model: config.MODEL,

        // 恢复上一次的会话(如果有的话)
        resume: existingSessionId,

        // 传递环境变量(不包含敏感信息)
        env: {
          // 只传必要的环境变量,不要传 API Key
          HOME: process.env.HOME || "/root",
          PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin",
        },
      },
    })) {
      // ========== 处理 Agent 返回的消息 ==========

      // 系统消息 —— 包含会话初始化信息
      if (message.type === "system" && message.subtype === "init") {
        newSessionId = message.session_id;
        logger.debug({ sessionId: newSessionId }, "Agent 会话已创建");
      }

      // Assistant 消息 —— Agent 的回复
      if (message.type === "assistant") {
        for (const block of message.message.content) {
          if (block.type === "text" && block.text) {
            results.push(block.text);
          }
        }
      }

      // 结果消息 —— Agent 完成了一个阶段的工作
      if (message.type === "result") {
        if (message.subtype === "error_max_turns") {
          logger.warn({ groupFolder }, "Agent 达到最大轮次限制");
        }
      }
    }

    // ========== 收尾工作 ==========

    const elapsed = Date.now() - startTime;
    logger.info({ groupFolder, elapsed, resultCount: results.length }, "Agent 运行完毕");

    // 保存会话ID,下次可以接着聊
    if (newSessionId) {
      saveSession(groupFolder, newSessionId);
    }

    // 合并所有文本输出
    const response = results.join("\n").trim();

    return {
      response: response || "(Agent 没有回复内容)",
      sessionId: newSessionId,
      success: true,
    };

  } catch (err) {
    const elapsed = Date.now() - startTime;
    const errorMessage = err instanceof Error ? err.message : String(err);
    logger.error({ groupFolder, elapsed, error: errorMessage }, "Agent 运行出错");

    return {
      response: `抱歉,处理你的消息时出了点问题:${errorMessage}`,
      success: false,
      error: errorMessage,
    };
  }
}

代码讲解:

这个文件是整个 MiniClaw 的灵魂。我们来仔细看看 query() 函数的调用:

for await (const message of query({
  prompt: "用户的消息",
  options: {
    systemPrompt: "Agent 的人设",
    allowedTools: ["Read", "Write", ...],
    permissionMode: "bypassPermissions",
    cwd: "/workspace/group",
    maxTurns: 20,
    model: "sonnet",
    resume: existingSessionId,
  }
})) {
  // 处理每一条消息
}

query() 返回一个异步迭代器。每次循环会拿到 Agent 的一条消息,可能是:

我们收集所有 assistant 消息中的文本,最后合并成完整的回复发给用户。

关于 System Prompt:

DEFAULT_SYSTEM_PROMPT 这段文本非常关键。它定义了 Agent 的行为方式。你可以把它理解为新员工的"入职手册" —— Agent 看了这个手册,就知道自己该怎么做事。

关于会话恢复:

通过 resume: existingSessionId,Agent 可以"接着上次聊"。这意味着它能记住之前的对话内容,不用每次都从头开始。


19.10 自定义工具(agent/tools.ts)—— 约 80 行

除了 SDK 自带的工具(Read、Write、Bash 等),我们还可以给 Agent 添加自定义工具。MiniClaw 需要三个自定义工具:

  1. schedule_task —— 创建定时任务
  2. list_tasks —— 查看定时任务列表
  3. send_message —— 给指定群组发消息

创建 src/agent/tools.ts

/**
 * agent/tools.ts —— 自定义工具
 *
 * 职责:
 * 1. 定义 MiniClaw 特有的工具
 * 2. 创建 MCP Server 供 Agent 调用
 *
 * 为什么需要自定义工具?
 * - SDK 自带的工具只能操作文件和执行命令
 * - 但 Agent 还需要"安排定时任务"、"给其他群发消息"这样的能力
 * - 自定义工具就是给 Agent 装上这些新技能
 */

import { z } from "zod";
import crypto from "crypto";
import { CronExpressionParser } from "cron-parser";
import { createTask, getTasksForGroup, deleteTask, ScheduledTask } from "../db.js";
import pino from "pino";

const logger = pino({ name: "tools" });

// ========== 工具定义 ==========

/**
 * 工具1:创建定时任务
 *
 * Agent 可以用这个工具安排定时执行的任务,比如:
 * - "每天早上 9 点给我发天气预报"
 * - "每小时检查一次服务器状态"
 * - "5 分钟后提醒我开会"
 */
export interface ScheduleTaskInput {
  /** 任务描述 / 要执行的 prompt */
  prompt: string;
  /** 调度类型:cron(周期性)/ interval(固定间隔)/ once(执行一次) */
  schedule_type: "cron" | "interval" | "once";
  /** 调度表达式:cron 表达式 / 毫秒数 / ISO 时间字符串 */
  schedule_value: string;
}

/**
 * 创建一条定时任务
 *
 * @param input 任务参数
 * @param groupFolder 所属群组
 * @param chatId 要发送结果的聊天ID
 * @returns 创建结果
 */
export function handleScheduleTask(
  input: ScheduleTaskInput,
  groupFolder: string,
  chatId: string
): string {
  // 生成唯一的任务ID
  const taskId = `task-${crypto.randomUUID().slice(0, 8)}`;

  // 计算下次执行时间
  let nextRun: string;
  try {
    if (input.schedule_type === "cron") {
      // 解析 cron 表达式,计算下次执行时间
      const interval = CronExpressionParser.parse(input.schedule_value);
      nextRun = interval.next().toISOString();
    } else if (input.schedule_type === "interval") {
      // 固定间隔:当前时间 + 间隔毫秒数
      const ms = parseInt(input.schedule_value, 10);
      if (isNaN(ms) || ms <= 0) {
        return "错误:interval 的值必须是正整数(毫秒数)";
      }
      nextRun = new Date(Date.now() + ms).toISOString();
    } else {
      // 执行一次:直接用给定的时间
      nextRun = new Date(input.schedule_value).toISOString();
    }
  } catch (err) {
    return `错误:无法解析调度表达式 "${input.schedule_value}" —— ${err instanceof Error ? err.message : String(err)}`;
  }

  // 保存到数据库
  const task: ScheduledTask = {
    id: taskId,
    group_folder: groupFolder,
    chat_id: chatId,
    prompt: input.prompt,
    schedule_type: input.schedule_type,
    schedule_value: input.schedule_value,
    next_run: nextRun,
    last_run: null,
    last_result: null,
    status: "active",
    created_at: new Date().toISOString(),
  };

  createTask(task);

  logger.info({ taskId, groupFolder, scheduleType: input.schedule_type }, "定时任务已创建");

  return `定时任务已创建!
- 任务ID: ${taskId}
- 类型: ${input.schedule_type}
- 表达式: ${input.schedule_value}
- 下次执行: ${nextRun}
- 任务内容: ${input.prompt.slice(0, 100)}${input.prompt.length > 100 ? "..." : ""}`;
}

/**
 * 工具2:查看定时任务列表
 */
export function handleListTasks(groupFolder: string): string {
  const tasks = getTasksForGroup(groupFolder);

  if (tasks.length === 0) {
    return "当前没有定时任务。";
  }

  const lines = tasks.map((t) => {
    return `- [${t.status}] ${t.id}: "${t.prompt.slice(0, 60)}${t.prompt.length > 60 ? "..." : ""}"
  类型: ${t.schedule_type} (${t.schedule_value})
  下次执行: ${t.next_run || "无"}
  上次执行: ${t.last_run || "从未执行"}`;
  });

  return `定时任务列表(共 ${tasks.length} 个):\n${lines.join("\n\n")}`;
}

/**
 * 工具3:删除定时任务
 */
export function handleDeleteTask(taskId: string): string {
  try {
    deleteTask(taskId);
    logger.info({ taskId }, "定时任务已删除");
    return `已删除任务 ${taskId}`;
  } catch (err) {
    return `删除失败:${err instanceof Error ? err.message : String(err)}`;
  }
}

// ========== 工具的 JSON Schema 定义 ==========
// 这些 Schema 告诉 Agent 每个工具需要什么参数

export const toolSchemas = {
  schedule_task: {
    name: "schedule_task",
    description: "创建一个定时任务。支持 cron 表达式(周期性执行)、interval(固定间隔)和 once(执行一次)。",
    input_schema: {
      type: "object" as const,
      properties: {
        prompt: {
          type: "string",
          description: "任务要执行的内容描述。Agent 会在指定时间收到这个 prompt 并执行。",
        },
        schedule_type: {
          type: "string",
          enum: ["cron", "interval", "once"],
          description: "调度类型。cron: 使用 cron 表达式(如 '0 9 * * *' 表示每天9点);interval: 固定间隔(毫秒);once: 执行一次",
        },
        schedule_value: {
          type: "string",
          description: "调度表达式。cron 类型填 cron 表达式;interval 类型填毫秒数;once 类型填 ISO 时间字符串",
        },
      },
      required: ["prompt", "schedule_type", "schedule_value"],
    },
  },
  list_tasks: {
    name: "list_tasks",
    description: "查看当前群组的所有定时任务列表",
    input_schema: {
      type: "object" as const,
      properties: {},
    },
  },
  delete_task: {
    name: "delete_task",
    description: "删除一个定时任务",
    input_schema: {
      type: "object" as const,
      properties: {
        task_id: {
          type: "string",
          description: "要删除的任务ID",
        },
      },
      required: ["task_id"],
    },
  },
};

代码讲解:

自定义工具让 Agent 能做超出 SDK 内置能力的事情。每个工具需要三样东西:

  1. 名字和描述 —— Agent 根据描述来决定什么时候用这个工具
  2. 参数 Schema —— 告诉 Agent 需要传什么参数
  3. 执行函数 —— 实际干活的代码

比如 schedule_task 工具,Agent 可以这样调用它:

Agent 思考:用户说"每天早上 9 点提醒我锻炼身体",我应该创建一个定时任务
Agent 调用:schedule_task({
  prompt: "提醒用户:该锻炼身体了!保持健康很重要。",
  schedule_type: "cron",
  schedule_value: "0 9 * * *"
})

19.11 定时任务调度器(scheduler.ts)—— 约 70 行

调度器负责定时检查有没有任务该执行了。如果有,就把任务交给 Agent 去做。

创建 src/scheduler.ts

/**
 * scheduler.ts —— 定时任务调度器
 *
 * 职责:
 * 1. 定时检查数据库里有没有到期的任务
 * 2. 如果有,把任务交给 Agent 执行
 * 3. 执行完毕后更新任务状态和下次执行时间
 * 4. 把执行结果发送回对应的群组
 *
 * 工作方式很简单:每隔一段时间(默认 1 分钟)查一次数据库,
 * 看看有没有 next_run 已经过了当前时间的任务。
 */

import { CronExpressionParser } from "cron-parser";
import { getDueTasks, updateTaskAfterRun } from "./db.js";
import { runAgent } from "./agent/runner.js";
import { TaskQueue } from "./queue.js";
import { GROUPS_DIR, SCHEDULER_POLL_INTERVAL } from "./config.js";
import path from "path";
import pino from "pino";

const logger = pino({ name: "scheduler" });

// ========== 类型定义 ==========

/** 发送消息的回调函数 */
type SendMessageFn = (chatId: string, text: string) => Promise<void>;

// ========== 调度器 ==========

/**
 * 启动定时任务调度循环
 *
 * 每隔 SCHEDULER_POLL_INTERVAL(默认 60 秒)检查一次,
 * 有到期任务就执行。
 *
 * @param queue 任务队列(用于控制并发)
 * @param sendMessage 发送消息的函数
 */
export function startScheduler(
  queue: TaskQueue,
  sendMessage: SendMessageFn
): void {
  logger.info(
    { pollInterval: SCHEDULER_POLL_INTERVAL },
    "定时任务调度器启动"
  );

  // 调度循环
  const loop = async () => {
    try {
      // 从数据库查询到期的任务
      const dueTasks = getDueTasks();

      if (dueTasks.length > 0) {
        logger.info({ count: dueTasks.length }, "发现到期任务");
      }

      for (const task of dueTasks) {
        // 把每个到期任务加入队列
        // 队列会控制并发,不会同时跑太多 Agent
        queue.enqueue(task.chat_id, async () => {
          logger.info({ taskId: task.id, prompt: task.prompt.slice(0, 80) }, "执行定时任务");

          const startTime = Date.now();

          try {
            // 运行 Agent 执行任务
            const result = await runAgent({
              prompt: task.prompt,
              groupFolder: task.group_folder,
              cwd: path.resolve(GROUPS_DIR, task.group_folder),
              isScheduledTask: true,
            });

            const elapsed = Date.now() - startTime;

            // 把结果发送给用户
            if (result.response) {
              await sendMessage(task.chat_id, `[定时任务结果]\n${result.response}`);
            }

            // 计算下次执行时间
            let nextRun: string | null = null;

            if (task.schedule_type === "cron") {
              // cron 类型:用 cron 表达式计算下一次
              const interval = CronExpressionParser.parse(task.schedule_value);
              nextRun = interval.next().toISOString();
            } else if (task.schedule_type === "interval") {
              // interval 类型:当前时间 + 间隔
              const ms = parseInt(task.schedule_value, 10);
              nextRun = new Date(Date.now() + ms).toISOString();
            }
            // once 类型:nextRun 为 null,任务会被标记为 completed

            // 更新数据库
            updateTaskAfterRun(
              task.id,
              nextRun,
              result.response.slice(0, 200) // 只保存前 200 字作为摘要
            );

            logger.info({ taskId: task.id, elapsed, nextRun }, "定时任务执行完毕");

          } catch (err) {
            const elapsed = Date.now() - startTime;
            const errorMsg = err instanceof Error ? err.message : String(err);
            logger.error({ taskId: task.id, elapsed, error: errorMsg }, "定时任务执行失败");

            // 即使失败也要更新状态,否则同一任务会被反复执行
            let nextRun: string | null = null;
            if (task.schedule_type === "cron") {
              try {
                const interval = CronExpressionParser.parse(task.schedule_value);
                nextRun = interval.next().toISOString();
              } catch { /* ignore parse error */ }
            } else if (task.schedule_type === "interval") {
              const ms = parseInt(task.schedule_value, 10);
              if (!isNaN(ms)) {
                nextRun = new Date(Date.now() + ms).toISOString();
              }
            }

            updateTaskAfterRun(task.id, nextRun, `错误: ${errorMsg}`);
          }
        });
      }
    } catch (err) {
      logger.error({ err }, "调度器循环出错");
    }

    // 等待下一次检查
    setTimeout(loop, SCHEDULER_POLL_INTERVAL);
  };

  // 启动循环(不阻塞主线程)
  loop();
}

代码讲解:

调度器的工作原理非常简单:

每隔 1 分钟:
  1. 查数据库:有没有 next_run <= 当前时间 的任务?
  2. 有 → 把任务加入队列
  3. 队列执行任务 → 调用 runAgent() → 把结果发给用户
  4. 计算下次执行时间 → 更新数据库

三种调度类型的区别:


19.12 主入口(index.ts)—— 约 80 行

最后一个文件 —— 把所有组件串起来!

创建 src/index.ts

/**
 * index.ts —— MiniClaw 主入口
 *
 * 职责:
 * 1. 初始化所有组件(数据库、消息渠道、任务队列)
 * 2. 连接各组件之间的"管道"
 * 3. 启动消息轮询循环
 * 4. 启动定时任务调度器
 * 5. 处理优雅退出
 *
 * 这个文件就像一个"总指挥" —— 它不干具体的活,
 * 但它负责把所有干活的人组织在一起。
 */

import { config, GROUPS_DIR, TRIGGER_PATTERN, POLL_INTERVAL } from "./config.js";
import { initDatabase, saveGroup, getGroup, getAllGroups, getUnprocessedMessages, markAsProcessed } from "./db.js";
import { TelegramChannel } from "./channels/telegram.js";
import { CliChannel } from "./channels/cli.js";
import { shouldProcess, buildPrompt, stripInternalTags } from "./router.js";
import { TaskQueue } from "./queue.js";
import { runAgent } from "./agent/runner.js";
import { startScheduler } from "./scheduler.js";
import fs from "fs";
import path from "path";
import pino from "pino";

const logger = pino({ name: "main" });

// ========== 全局状态 ==========

const queue = new TaskQueue();

// 消息渠道(Telegram 或 CLI)的引用
let channel: {
  sendMessage: (chatId: string, text: string) => Promise<void>;
  setTyping: (chatId: string) => Promise<void>;
  stop: () => Promise<void>;
};

// ========== 消息处理 ==========

/**
 * 处理新消息
 *
 * 当消息渠道收到新消息时,这个函数会被调用。
 * 它决定是否需要让 Agent 处理,如果需要就加入队列。
 */
function handleNewMessage(chatId: string, _message: unknown): void {
  // 检查这个聊天是否已注册为群组
  let group = getGroup(chatId);

  if (!group) {
    // 如果是 CLI 模式,自动注册为 main 群组
    if (chatId.startsWith("cli:")) {
      group = {
        chat_id: chatId,
        name: "CLI",
        folder: "main",
        trigger: config.ASSISTANT_NAME,
        created_at: new Date().toISOString(),
      };
      saveGroup(group);
      logger.info("CLI 群组已自动注册");
    } else {
      // 未注册的群组,先自动注册
      // 用 chatId 的后 6 位作为文件夹名
      const folder = `group-${chatId.slice(-6)}`;
      group = {
        chat_id: chatId,
        name: `Group ${chatId}`,
        folder,
        trigger: config.ASSISTANT_NAME,
        created_at: new Date().toISOString(),
      };
      saveGroup(group);

      // 创建群组目录和默认的 CLAUDE.md
      const groupDir = path.resolve(GROUPS_DIR, folder);
      fs.mkdirSync(groupDir, { recursive: true });
      const claudeMdPath = path.join(groupDir, "CLAUDE.md");
      if (!fs.existsSync(claudeMdPath)) {
        fs.writeFileSync(claudeMdPath, `# ${group.name} 记忆\n\n这里记录重要信息。\n`);
      }

      logger.info({ chatId, folder }, "新群组已自动注册");
    }
  }

  // 获取该群组的未处理消息
  const unprocessed = getUnprocessedMessages(chatId);
  if (unprocessed.length === 0) return;

  // 检查是否需要 Agent 处理(触发词检测)
  if (!shouldProcess(chatId, unprocessed)) return;

  // 加入任务队列
  queue.enqueue(chatId, async () => {
    logger.info({ chatId, messageCount: unprocessed.length }, "开始处理消息");

    // 设置"正在输入"状态
    await channel.setTyping(chatId);

    // 组装 prompt
    const prompt = buildPrompt(chatId, unprocessed, group!.folder);
    const cwd = path.resolve(GROUPS_DIR, group!.folder);

    // 运行 Agent
    const result = await runAgent({
      prompt,
      groupFolder: group!.folder,
      cwd,
    });

    // 标记消息为已处理
    markAsProcessed(chatId);

    // 发送回复
    if (result.response) {
      const cleanResponse = stripInternalTags(result.response);
      if (cleanResponse) {
        await channel.sendMessage(chatId, cleanResponse);
      }
    }
  });
}

// ========== 启动 ==========

async function main(): Promise<void> {
  logger.info("MiniClaw 启动中...");

  // 第1步:初始化数据库
  initDatabase();
  logger.info("数据库初始化完成");

  // 第2步:确保必要目录存在
  fs.mkdirSync(GROUPS_DIR, { recursive: true });
  fs.mkdirSync(path.resolve(GROUPS_DIR, "main"), { recursive: true });

  // 第3步:创建消息渠道
  if (config.CHANNEL === "cli") {
    // CLI 模式
    const cli = new CliChannel(handleNewMessage);
    channel = cli;
    await cli.start();
  } else {
    // Telegram 模式
    const telegram = new TelegramChannel(handleNewMessage);
    channel = telegram;
    await telegram.start();
  }

  logger.info({ channel: config.CHANNEL }, "消息渠道已启动");

  // 第4步:启动定时任务调度器
  startScheduler(queue, async (chatId: string, text: string) => {
    await channel.sendMessage(chatId, text);
  });
  logger.info("定时任务调度器已启动");

  // 第5步:注册优雅退出处理
  const shutdown = async (signal: string) => {
    logger.info({ signal }, "收到退出信号,正在关闭...");

    try {
      await channel.stop();
      logger.info("消息渠道已关闭");
    } catch (err) {
      logger.error({ err }, "关闭消息渠道时出错");
    }

    logger.info("MiniClaw 已关闭,再见!");
    process.exit(0);
  };

  process.on("SIGTERM", () => shutdown("SIGTERM"));
  process.on("SIGINT", () => shutdown("SIGINT"));

  // 启动完毕!
  logger.info({
    assistant: config.ASSISTANT_NAME,
    channel: config.CHANNEL,
    model: config.MODEL,
    maxConcurrent: config.MAX_CONCURRENT_CONTAINERS,
  }, "MiniClaw 启动完成!准备接收消息...");
}

// ========== 启动入口 ==========

main().catch((err) => {
  logger.error({ err }, "MiniClaw 启动失败");
  process.exit(1);
});

代码讲解:

index.ts 是整个项目的"总指挥"。它按顺序做这几件事:

1. 初始化数据库 → 建表、准备好存储
2. 创建消息渠道 → 连接 Telegram 或启动 CLI
3. 启动调度器   → 定时检查有没有到期任务
4. 注册退出处理 → Ctrl+C 时优雅关闭

消息的完整流转路径是这样的:

用户发消息 → Telegram/CLI 收到
  → handleNewMessage() 被调用
  → shouldProcess() 检查触发词
  → queue.enqueue() 加入队列
  → buildPrompt() 组装 prompt
  → runAgent() 调用 Claude
  → channel.sendMessage() 发送回复

19.13 Dockerfile —— 容器化部署

如果你想把 MiniClaw 部署到服务器上,用 Docker 是最方便的。

创建 Dockerfile

# ===== 第一阶段:构建 =====
FROM node:20-slim AS builder

# 安装 pnpm
RUN npm install -g pnpm

# 设置工作目录
WORKDIR /app

# 先复制依赖文件(利用 Docker 缓存)
COPY package.json pnpm-lock.yaml* ./

# 安装依赖
RUN pnpm install --frozen-lockfile

# 复制源代码
COPY tsconfig.json ./
COPY src/ ./src/

# 编译 TypeScript
RUN pnpm run build

# ===== 第二阶段:运行 =====
FROM node:20-slim

# 安装 pnpm(运行时也需要)
RUN npm install -g pnpm

WORKDIR /app

# 只复制运行需要的文件
COPY package.json pnpm-lock.yaml* ./
RUN pnpm install --prod --frozen-lockfile

# 从构建阶段复制编译好的代码
COPY --from=builder /app/dist ./dist

# 创建数据和群组目录
RUN mkdir -p /app/data /app/groups/main

# 环境变量
ENV NODE_ENV=production
ENV DB_PATH=/app/data/miniclaw.db
ENV GROUPS_DIR=/app/groups

# 启动命令
CMD ["node", "dist/index.js"]

使用方法:

# 构建镜像
docker build -t miniclaw .

# 运行(把你的 .env 文件传进去)
docker run -d \
  --name miniclaw \
  --env-file .env \
  -v $(pwd)/data:/app/data \
  -v $(pwd)/groups:/app/groups \
  --restart unless-stopped \
  miniclaw

几个关键点:

  1. 两阶段构建:第一阶段编译 TypeScript,第二阶段只留运行需要的文件,镜像更小
  2. 数据持久化:通过 -v 挂载 data/groups/ 目录,容器删了数据还在
  3. 自动重启--restart unless-stopped 确保进程崩溃后自动重启

19.14 运行和测试

开发模式(CLI)—— 最简单的测试方式

不需要 Telegram,直接在终端里和 Agent 对话:

# 确保 .env 文件里配置了 ANTHROPIC_API_KEY
# TELEGRAM_BOT_TOKEN 可以先不填

# 启动 CLI 模式
pnpm run dev:cli

你会看到:

========================================
  MiniClaw CLI 模式
  触发词:@mini
  输入消息开始对话,输入 quit 退出
========================================

你:

试试输入:

你: @mini 你好,请自我介绍一下

等几秒钟,Agent 会回复类似这样的内容:

mini: 你好!我是 mini,一个运行在群组聊天中的 AI 助手。
我可以帮你做很多事情,比如写代码、分析问题、搜索信息等。
有什么我可以帮你的吗?

再试试让它记住点东西:

你: @mini 请记住我叫小明,我喜欢编程

Agent 会把这个信息写入 groups/main/CLAUDE.md,下次对话还记得。

开发模式(Telegram)

  1. 在 Telegram 中找 @BotFather,发送 /newbot 创建一个 Bot
  2. 拿到 Bot Token,填入 .env 文件
  3. 运行:
pnpm run dev
  1. 在 Telegram 中找到你的 Bot,发送消息就行了

常见问题排查

Q: 报错 "必须提供 ANTHROPIC_API_KEY"

A: 你的 .env 文件里没有配置 ANTHROPIC_API_KEY。去 console.anthropic.com 注册并获取 API Key。

Q: Telegram Bot 启动但收不到消息

A: 检查几件事:

  1. Bot Token 是否正确
  2. 你是否在 Telegram 中给 Bot 发了消息(Bot 不能主动给你发消息,需要你先发)
  3. 如果是群组,Bot 是否已经被拉进群组,且有读取消息的权限

Q: Agent 回复太慢

A: Claude Agent 需要一些时间来思考和执行工具。通常需要 5-30 秒。如果你觉得太慢,可以:

  1. .env 中把 MODEL 改成 haiku(更快但没那么聪明)
  2. 减少 maxTurns(在 runner.ts 中修改)

Q: 报错 "Cannot find module '@anthropic-ai/claude-agent-sdk'"

A: 确保你已经正确安装了依赖:pnpm install。如果还是不行,删掉 node_modules 重新安装。

Q: 数据库报错

A: 确保 data/ 目录存在且有写入权限。运行 mkdir -p data 创建。


19.15 完整代码统计

我们来数一下,总共写了多少行代码:

文件 行数 职责
src/config.ts ~50 配置管理
src/db.ts ~110 数据库操作
src/channels/telegram.ts ~120 Telegram 接入
src/channels/cli.ts ~60 命令行接入
src/router.ts ~80 消息路由
src/queue.ts ~70 任务队列
src/agent/container.ts ~100 容器管理
src/agent/runner.ts ~130 Agent 运行器(核心)
src/agent/tools.ts ~80 自定义工具
src/scheduler.ts ~70 定时任务调度
src/index.ts ~80 主入口
合计 ~950 一个完整的 AI 助手

不到 1000 行代码,我们就做出了一个功能完整的 AI 助手。来和其他项目对比一下:

项目 代码量 功能范围
MiniClaw(我们的) ~950 行 Telegram + CLI、Agent 对话、文件操作、定时任务
NanoClaw ~500 行核心 + Skills WhatsApp + 多渠道、Agent Swarm、容器隔离
OpenClaw ~430,000 行 15+ 渠道、插件市场、企业级安全、团队协作

MiniClaw 在功能上介于 NanoClaw 和 OpenClaw 之间 —— 没有 NanoClaw 那么精简(它只有 ~500 行),也没有 OpenClaw 那么庞大。但对于学习和个人使用来说,刚刚好。


19.16 架构回顾 —— 数据流全景图

在结束之前,让我们站高一点,看看 MiniClaw 的完整数据流:

用户在 Telegram/CLI 发送消息
        │
        ▼
┌──────────────────┐
│  消息渠道层       │  telegram.ts / cli.ts
│  收到消息         │  将消息保存到数据库
│  调用回调函数      │
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  消息处理层       │  index.ts → handleNewMessage()
│  查询未处理消息    │
│  触发词检测       │  router.ts → shouldProcess()
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  任务队列层       │  queue.ts → enqueue()
│  群组内串行       │
│  全局并发控制      │
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  Prompt 组装层    │  router.ts → buildPrompt()
│  加载群组记忆      │  合并消息历史 + 新消息
│  格式化为 XML     │
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  Agent 执行层     │  runner.ts → runAgent()
│  调用 query()     │  Claude Agent SDK 启动 Agentic Loop
│  收集输出         │  思考 → 工具调用 → 观察 → 再思考...
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  结果处理层       │  index.ts
│  清理内部标签      │  router.ts → stripInternalTags()
│  标记消息已处理    │  db.ts → markAsProcessed()
└────────┬─────────┘
         │
         ▼
┌──────────────────┐
│  消息发送层       │  telegram.ts / cli.ts → sendMessage()
│  长消息分段       │
│  发送给用户       │
└──────────────────┘

同时,在后台还有定时任务调度器在默默工作:

定时任务调度器(每 60 秒检查一次)
        │
        ▼
查询到期任务 → 加入任务队列 → 运行 Agent → 发送结果

本章小结

恭喜你!到这里,你已经完成了 MiniClaw 的全部代码。让我们回顾一下学到的要点:

  1. 项目结构:按职责分文件,每个文件只干一件事。config 管配置、db 管数据、router 管路由、runner 管 Agent
  2. 配置管理:用 Zod 做校验,确保配置合法。用 .env 文件管理敏感信息
  3. 数据库设计:四张表(messages、groups、sessions、scheduled_tasks)覆盖所有数据需求
  4. 消息渠道抽象:Telegram 和 CLI 用相同的接口,切换渠道只需改一个配置
  5. 消息路由:用触发词控制 Agent 什么时候该工作,用 XML 格式组织上下文
  6. 任务队列:同群组串行 + 全局限并发,防止资源争抢
  7. Agent 运行器:核心就是 query() 函数,配好工具和权限,让 Agent 自己去干活
  8. 自定义工具:给 Agent 添加定时任务等特殊能力
  9. 定时任务:简单的轮询机制,每分钟检查一次到期任务
  10. 容器隔离:生产环境用 Docker 隔离 Agent 的执行环境

整个项目不到 1000 行代码,但五脏俱全。这证明了一件事:有了好的 SDK 和清晰的架构,做一个 AI 助手并不复杂

下一章预告

代码写完了,但还有很多可以改进的地方。下一章《扩展与优化》,我们会:

我们的 MiniClaw 已经能跑了,接下来要让它跑得更好!

← 上一章18. 项目设计