第 8 课:完整项目实战
本课目标
把前面学的全部组装成一个完整的 MiniClaw 私人助理。
项目结构
miniclaw/
├── main.py ← 主程序
├── config.py ← 配置
├── storage/
│ ├── __init__.py
│ └── db.py ← SQLite 数据库
├── tools/
│ ├── __init__.py
│ ├── todo.py ← 待办管理
│ ├── notes.py ← 笔记
│ └── files.py ← 文件操作
├── hooks/
│ ├── __init__.py
│ └── safety.py ← 安全 Hook
├── agents/
│ ├── __init__.py
│ └── team.py ← Agent 团队
└── data/
└── miniclaw.db ← 数据文件(自动创建)
storage/db.py —— 数据库持久化
"""SQLite 持久化存储"""
import sqlite3
import json
import datetime
from pathlib import Path
DB_PATH = Path(__file__).parent.parent / "data" / "miniclaw.db"
def init_db():
"""初始化数据库"""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.executescript("""
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
priority TEXT DEFAULT 'medium',
deadline TEXT,
done INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS notes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
tags TEXT DEFAULT '[]',
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tool TEXT NOT NULL,
input_summary TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
""")
conn.commit()
conn.close()
def get_conn():
"""获取数据库连接"""
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
# === 待办操作 ===
def db_add_todo(content: str, priority: str = "medium", deadline: str = "") -> dict:
conn = get_conn()
cursor = conn.execute(
"INSERT INTO todos (content, priority, deadline) VALUES (?, ?, ?)",
(content, priority, deadline or None),
)
conn.commit()
todo_id = cursor.lastrowid
conn.close()
return {"id": todo_id, "content": content, "priority": priority, "deadline": deadline}
def db_list_todos(status: str = "pending") -> list[dict]:
conn = get_conn()
if status == "pending":
rows = conn.execute("SELECT * FROM todos WHERE done = 0 ORDER BY priority, created_at").fetchall()
elif status == "done":
rows = conn.execute("SELECT * FROM todos WHERE done = 1 ORDER BY created_at DESC").fetchall()
else:
rows = conn.execute("SELECT * FROM todos ORDER BY done, priority, created_at").fetchall()
conn.close()
return [dict(r) for r in rows]
def db_complete_todo(todo_id: int) -> bool:
conn = get_conn()
cursor = conn.execute("UPDATE todos SET done = 1 WHERE id = ? AND done = 0", (todo_id,))
conn.commit()
changed = cursor.rowcount > 0
conn.close()
return changed
def db_delete_todo(todo_id: int) -> bool:
conn = get_conn()
cursor = conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
conn.commit()
changed = cursor.rowcount > 0
conn.close()
return changed
# === 笔记操作 ===
def db_add_note(content: str, tags: list[str] = None) -> dict:
conn = get_conn()
cursor = conn.execute(
"INSERT INTO notes (content, tags) VALUES (?, ?)",
(content, json.dumps(tags or [], ensure_ascii=False)),
)
conn.commit()
note_id = cursor.lastrowid
conn.close()
return {"id": note_id, "content": content, "tags": tags or []}
def db_search_notes(keyword: str = "", tag: str = "") -> list[dict]:
conn = get_conn()
query = "SELECT * FROM notes WHERE 1=1"
params = []
if keyword:
query += " AND content LIKE ?"
params.append(f"%{keyword}%")
if tag:
query += " AND tags LIKE ?"
params.append(f"%{tag}%")
query += " ORDER BY created_at DESC LIMIT 20"
rows = conn.execute(query, params).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["tags"] = json.loads(d["tags"])
results.append(d)
return results
def db_recent_notes(count: int = 5) -> list[dict]:
conn = get_conn()
rows = conn.execute("SELECT * FROM notes ORDER BY created_at DESC LIMIT ?", (count,)).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["tags"] = json.loads(d["tags"])
results.append(d)
return results
# === 审计日志 ===
def db_log_audit(tool: str, input_summary: str):
conn = get_conn()
conn.execute("INSERT INTO audit_log (tool, input_summary) VALUES (?, ?)",
(tool, input_summary[:200]))
conn.commit()
conn.close()
def db_get_audit_summary() -> str:
conn = get_conn()
rows = conn.execute(
"SELECT * FROM audit_log ORDER BY created_at DESC LIMIT 15"
).fetchall()
conn.close()
if not rows:
return "暂无操作记录"
lines = [f"共 {len(rows)} 条记录(最近):"]
for r in rows:
lines.append(f" [{r['created_at'][:19]}] {r['tool']}")
return "\n".join(lines)
tools/todo.py —— 持久化版待办
"""待办管理工具(SQLite 版)"""
from claude_agent_sdk import tool
from storage.db import db_add_todo, db_list_todos, db_complete_todo, db_delete_todo
@tool("add_todo", "添加待办任务",
{"content": {"type": "string", "description": "任务内容"},
"priority": {"type": "string", "description": "优先级: high/medium/low"},
"deadline": {"type": "string", "description": "截止日期(可选)"}})
async def add_todo(content: str, priority: str = "medium", deadline: str = "") -> str:
result = db_add_todo(content, priority, deadline)
emoji = {"high": "🔴", "medium": "🟡", "low": "🟢"}
msg = f"✅ 已添加 #{result['id']}: {content} [{emoji.get(priority, '⚪')}{priority}]"
if deadline:
msg += f" 截止: {deadline}"
return msg
@tool("list_todos", "列出待办任务",
{"status": {"type": "string", "description": "筛选: all/pending/done"}})
async def list_todos(status: str = "pending") -> str:
items = db_list_todos(status)
if not items:
return "📋 没有待办任务"
emoji = {"high": "🔴", "medium": "🟡", "low": "🟢"}
lines = [f"📋 待办({len(items)} 项):"]
for t in items:
check = "✅" if t["done"] else "☐"
e = emoji.get(t["priority"], "⚪")
line = f" {check} #{t['id']} {e} {t['content']}"
if t.get("deadline"):
line += f" (截止: {t['deadline']})"
lines.append(line)
return "\n".join(lines)
@tool("complete_todo", "完成任务",
{"todo_id": {"type": "number", "description": "任务ID"}})
async def complete_todo(todo_id: int) -> str:
if db_complete_todo(todo_id):
return f"✅ 任务 #{todo_id} 已完成"
return f"未找到未完成的任务 #{todo_id}"
@tool("delete_todo", "删除任务",
{"todo_id": {"type": "number", "description": "任务ID"}})
async def delete_todo(todo_id: int) -> str:
if db_delete_todo(todo_id):
return f"🗑️ 已删除任务 #{todo_id}"
return f"未找到任务 #{todo_id}"
tools/notes.py —— 持久化版笔记
"""笔记工具(SQLite 版)"""
from claude_agent_sdk import tool
from storage.db import db_add_note, db_search_notes, db_recent_notes
@tool("add_note", "保存笔记",
{"content": {"type": "string", "description": "笔记内容"},
"tags": {"type": "string", "description": "标签,逗号分隔"}})
async def add_note(content: str, tags: str = "") -> str:
tag_list = [t.strip() for t in tags.split(",") if t.strip()] if tags else []
result = db_add_note(content, tag_list)
msg = f"📝 已保存笔记 #{result['id']}"
if tag_list:
msg += f" 标签: {', '.join('#' + t for t in tag_list)}"
return msg
@tool("search_notes", "搜索笔记",
{"keyword": {"type": "string", "description": "关键词"},
"tag": {"type": "string", "description": "标签筛选(可选)"}})
async def search_notes(keyword: str = "", tag: str = "") -> str:
results = db_search_notes(keyword, tag)
if not results:
return "没有找到匹配的笔记"
lines = [f"📓 找到 {len(results)} 条笔记:"]
for n in results:
tags_str = " ".join(f"#{t}" for t in n["tags"]) if n["tags"] else ""
lines.append(f" [{n['created_at'][:16]}] #{n['id']} {n['content'][:80]} {tags_str}")
return "\n".join(lines)
@tool("list_recent_notes", "最近的笔记",
{"count": {"type": "number", "description": "几条,默认5"}})
async def list_recent_notes(count: int = 5) -> str:
results = db_recent_notes(count)
if not results:
return "📓 笔记本是空的"
lines = [f"📓 最近 {len(results)} 条:"]
for n in results:
tags_str = " ".join(f"#{t}" for t in n["tags"]) if n["tags"] else ""
lines.append(f" [{n['created_at'][:16]}] {n['content'][:80]} {tags_str}")
return "\n".join(lines)
hooks/safety.py —— 安全层
"""安全 Hook"""
import re
from claude_agent_sdk.types import HookInput, HookContext, HookJSONOutput
from claude_agent_sdk import PermissionResultAllow, PermissionResultDeny, ToolPermissionContext
from storage.db import db_log_audit
DANGEROUS_COMMANDS = ["rm -rf", "rm -r /", "dd if=", "mkfs", "chmod -R 777"]
PROTECTED_PATHS = [".ssh", ".config", ".env", ".git"]
async def file_guard(input_data: HookInput, tool_use_id, context) -> HookJSONOutput:
tool_name = input_data["tool_name"]
tool_input = input_data.get("tool_input", {})
if tool_name == "Bash":
cmd = tool_input.get("command", "")
for d in DANGEROUS_COMMANDS:
if d in cmd:
return {"hookSpecificOutput": {"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": f"🛡️ 拦截: {d}"}}
if tool_name in ["Write", "Edit"]:
path = tool_input.get("file_path", "")
for p in PROTECTED_PATHS:
if p in path:
return {"hookSpecificOutput": {"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": f"🛡️ 受保护: {p}"}}
return {}
async def readonly_auto_approve(input_data: HookInput, tool_use_id, context) -> HookJSONOutput:
readonly = ["Read", "Grep", "Glob",
"mcp__mc__list_todos", "mcp__mc__search_notes",
"mcp__mc__list_recent_notes", "mcp__mc__list_files"]
if input_data["tool_name"] in readonly:
return {"hookSpecificOutput": {"hookEventName": "PreToolUse",
"permissionDecision": "allow", "permissionDecisionReason": "只读"}}
return {}
async def audit(input_data: HookInput, tool_use_id, context) -> HookJSONOutput:
db_log_audit(
input_data.get("tool_name", "?"),
str(input_data.get("tool_input", ""))[:200],
)
return {}
async def permission_cb(tool_name: str, input_data: dict,
context: ToolPermissionContext):
if "list" in tool_name or "search" in tool_name or "get" in tool_name:
return PermissionResultAllow()
if "add" in tool_name or "complete" in tool_name:
return PermissionResultAllow()
if "delete" in tool_name:
return PermissionResultAllow() # 生产环境可以加确认
return PermissionResultAllow()
agents/team.py —— Agent 团队
"""Agent 团队"""
from claude_agent_sdk import AgentDefinition
planner = AgentDefinition(
description="分析任务、制定计划、安排优先级",
prompt="你是效率规划师。分析目标,拆分步骤,安排优先级。用中文。",
tools=["Read", "Grep"],
model="sonnet",
)
summarizer = AgentDefinition(
description="总结信息、生成报告",
prompt="你是总结专家。把复杂信息浓缩成要点。用中文。",
tools=["Read"],
model="haiku",
)
ALL_AGENTS = {
"planner": planner,
"summarizer": summarizer,
}
main.py —— 主程序
"""MiniClaw —— 你的 AI 私人助理"""
import asyncio
from claude_agent_sdk import (
ClaudeSDKClient, ClaudeAgentOptions,
AssistantMessage, ResultMessage, TextBlock, ToolUseBlock,
create_sdk_mcp_server,
)
from claude_agent_sdk.types import HookMatcher
from storage.db import init_db, db_get_audit_summary
from tools.todo import add_todo, list_todos, complete_todo, delete_todo
from tools.notes import add_note, search_notes, list_recent_notes
from tools.files import list_files, find_large_files
from hooks.safety import file_guard, readonly_auto_approve, audit, permission_cb
from agents.team import ALL_AGENTS
# 初始化数据库
init_db()
# MCP 服务器
mc_server = create_sdk_mcp_server(
name="mc",
version="1.0.0",
tools=[add_todo, list_todos, complete_todo, delete_todo,
add_note, search_notes, list_recent_notes,
list_files, find_large_files],
)
SYSTEM_PROMPT = """你是 MiniClaw,用户的 AI 私人助理。
你的工具:
- 待办: add_todo, list_todos, complete_todo, delete_todo
- 笔记: add_note, search_notes, list_recent_notes
- 文件: list_files, find_large_files
你的 Agent 团队:
- planner: 规划师(分析和规划)
- summarizer: 总结专家
规则:
- 用中文,友好简洁
- "加个任务" → add_todo
- "记一下" → add_note
- "看看待办" → list_todos
- 主动给笔记加标签
- 不确定就追问"""
TOOL_LIST = [
"mcp__mc__add_todo", "mcp__mc__list_todos",
"mcp__mc__complete_todo", "mcp__mc__delete_todo",
"mcp__mc__add_note", "mcp__mc__search_notes",
"mcp__mc__list_recent_notes",
"mcp__mc__list_files", "mcp__mc__find_large_files",
]
async def main():
options = ClaudeAgentOptions(
system_prompt=SYSTEM_PROMPT,
mcp_servers={"mc": mc_server},
allowed_tools=TOOL_LIST,
agents=ALL_AGENTS,
hooks={
"PreToolUse": [
HookMatcher(matcher="Bash", hooks=[file_guard]),
HookMatcher(matcher="Write", hooks=[file_guard]),
HookMatcher(matcher=None, hooks=[readonly_auto_approve]),
],
"PostToolUse": [
HookMatcher(matcher=None, hooks=[audit]),
],
},
can_use_tool=permission_cb,
)
print("=" * 40)
print("🐾 MiniClaw 私人助理 v1.0")
print("=" * 40)
print("quit 退出 | log 查看日志 | /todo 看待办 | /notes 看笔记")
print()
shortcuts = {
"/todo": "列出所有待办任务",
"/done": "列出已完成的任务",
"/notes": "列出最近的笔记",
"/today": "总结今天的任务和笔记情况",
}
async with ClaudeSDKClient(options=options) as client:
while True:
user_input = input("你: ").strip()
if user_input.lower() in ['quit', 'exit', 'q']:
print(f"\n📋 {db_get_audit_summary()}")
print("🐾 拜拜!明天见~")
break
if user_input.lower() == 'log':
print(f"\n📋 {db_get_audit_summary()}\n")
continue
if not user_input:
continue
# 快捷命令
cmd = user_input.split()[0].lower()
if cmd in shortcuts:
user_input = shortcuts[cmd]
await client.query(user_input)
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"\n🐾: {block.text}")
elif isinstance(block, ToolUseBlock):
print(f" 🔧 {block.name}")
elif isinstance(msg, ResultMessage):
if msg.total_cost_usd and msg.total_cost_usd > 0:
print(f" 💰 ${msg.total_cost_usd:.4f}")
print()
if __name__ == "__main__":
asyncio.run(main)
运行和测试
pip install claude-agent-sdk
export ANTHROPIC_API_KEY="sk-ant-你的密钥"
python main.py
试试:
你: 加个任务:下周提交项目报告,高优先级
你: 记一下:Python 3.12 支持 type 语句
你: /todo
你: 完成任务 1
你: /notes
你: 帮我用 planner 规划这周的学习计划
你: log
你: quit
数据都保存在 data/miniclaw.db 里,下次打开还在。
架构总览
用户输入
│
├── 快捷命令翻译 (/todo → "列出待办")
│
▼
ClaudeSDKClient
│
├── Agent 层 → planner / summarizer
│
├── 工具层 (MCP) → 待办 / 笔记 / 文件
│
├── 安全层 (Hook)
│ ├── PreToolUse: 文件保护 + 只读自动批准
│ └── PostToolUse: 审计日志
│
└── 存储层 (SQLite)
└── todos / notes / audit_log
本课小结
- MiniClaw = 工具 + Hook + Agent + Client + SQLite
- SQLite 实现数据持久化,关掉重开数据还在
- 快捷命令(/todo、/notes)提高交互效率
- 所有组件通过 ClaudeAgentOptions 组装
课后练习
- 给笔记加"收藏"功能
- 实现
/export导出所有待办和笔记为 Markdown 文件 - 加一个"每日统计"功能:今天加了几个任务、记了几条笔记