构建和集成自定义工具以扩展 Claude Code SDK 功能
createSdkMcpServer
和 tool
辅助函数来定义类型安全的自定义工具:
import { query, tool, createSdkMcpServer } from "@anthropic-ai/claude-code";
import { z } from "zod";
// 创建带有自定义工具的 SDK MCP 服务器
const customServer = createSdkMcpServer({
name: "my-custom-tools",
version: "1.0.0",
tools: [
tool(
"get_weather",
"获取某个位置的当前天气",
{
location: z.string().describe("城市名称或坐标"),
units: z.enum(["celsius", "fahrenheit"]).default("celsius").describe("温度单位")
},
async (args) => {
// 调用天气 API
const response = await fetch(
`https://api.weather.com/v1/current?q=${args.location}&units=${args.units}`
);
const data = await response.json();
return {
content: [{
type: "text",
text: `温度:${data.temp}°\n天气状况:${data.conditions}\n湿度:${data.humidity}%`
}]
};
}
)
]
});
mcpServers
选项将自定义服务器作为字典/对象传递给 query
函数。
prompt
参数使用异步生成器/可迭代对象 - 简单的字符串无法与 MCP 服务器一起工作。mcp__{server_name}__{tool_name}
my-custom-tools
中名为 get_weather
的工具变成 mcp__my-custom-tools__get_weather
allowedTools
选项控制 Claude 可以使用哪些工具:
import { query } from "@anthropic-ai/claude-code";
// 在查询中使用自定义工具和流式输入
async function* generateMessages() {
yield {
type: "user" as const,
message: {
role: "user" as const,
content: "旧金山的天气怎么样?"
}
};
}
for await (const message of query({
prompt: generateMessages(), // 使用异步生成器进行流式输入
options: {
mcpServers: {
"my-custom-tools": customServer // 作为对象/字典传递,而不是数组
},
// 可选择指定 Claude 可以使用哪些工具
allowedTools: [
"mcp__my-custom-tools__get_weather", // 允许天气工具
// 根据需要添加其他工具
],
maxTurns: 3
}
})) {
if (message.type === "result" && message.subtype === "success") {
console.log(message.result);
}
}
const multiToolServer = createSdkMcpServer({
name: "utilities",
version: "1.0.0",
tools: [
tool("calculate", "执行计算", { /* ... */ }, async (args) => { /* ... */ }),
tool("translate", "翻译文本", { /* ... */ }, async (args) => { /* ... */ }),
tool("search_web", "搜索网络", { /* ... */ }, async (args) => { /* ... */ })
]
});
// 仅允许特定工具和流式输入
async function* generateMessages() {
yield {
type: "user" as const,
message: {
role: "user" as const,
content: "计算 5 + 3 并将 'hello' 翻译成西班牙语"
}
};
}
for await (const message of query({
prompt: generateMessages(), // 使用异步生成器进行流式输入
options: {
mcpServers: {
utilities: multiToolServer
},
allowedTools: [
"mcp__utilities__calculate", // 允许计算器
"mcp__utilities__translate", // 允许翻译器
// "mcp__utilities__search_web" 不被允许
]
}
})) {
// 处理消息
}
@tool
装饰器支持各种模式定义方法以实现类型安全:
import { z } from "zod";
tool(
"process_data",
"使用类型安全处理结构化数据",
{
// Zod 模式定义运行时验证和 TypeScript 类型
data: z.object({
name: z.string(),
age: z.number().min(0).max(150),
email: z.string().email(),
preferences: z.array(z.string()).optional()
}),
format: z.enum(["json", "csv", "xml"]).default("json")
},
async (args) => {
// args 基于模式完全类型化
// TypeScript 知道:args.data.name 是字符串,args.data.age 是数字等
console.log(`正在将 ${args.data.name} 的数据处理为 ${args.format} 格式`);
// 您的处理逻辑在这里
return {
content: [{
type: "text",
text: `已处理 ${args.data.name} 的数据`
}]
};
}
)
tool(
"fetch_data",
"从 API 获取数据",
{
endpoint: z.string().url().describe("API 端点 URL")
},
async (args) => {
try {
const response = await fetch(args.endpoint);
if (!response.ok) {
return {
content: [{
type: "text",
text: `API 错误:${response.status} ${response.statusText}`
}]
};
}
const data = await response.json();
return {
content: [{
type: "text",
text: JSON.stringify(data, null, 2)
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `获取数据失败:${error.message}`
}]
};
}
}
)
const databaseServer = createSdkMcpServer({
name: "database-tools",
version: "1.0.0",
tools: [
tool(
"query_database",
"执行数据库查询",
{
query: z.string().describe("要执行的 SQL 查询"),
params: z.array(z.any()).optional().describe("查询参数")
},
async (args) => {
const results = await db.query(args.query, args.params || []);
return {
content: [{
type: "text",
text: `找到 ${results.length} 行:\n${JSON.stringify(results, null, 2)}`
}]
};
}
)
]
});
const apiGatewayServer = createSdkMcpServer({
name: "api-gateway",
version: "1.0.0",
tools: [
tool(
"api_request",
"向外部服务发出经过身份验证的 API 请求",
{
service: z.enum(["stripe", "github", "openai", "slack"]).describe("要调用的服务"),
endpoint: z.string().describe("API 端点路径"),
method: z.enum(["GET", "POST", "PUT", "DELETE"]).describe("HTTP 方法"),
body: z.record(z.any()).optional().describe("请求体"),
query: z.record(z.string()).optional().describe("查询参数")
},
async (args) => {
const config = {
stripe: { baseUrl: "https://api.stripe.com/v1", key: process.env.STRIPE_KEY },
github: { baseUrl: "https://api.github.com", key: process.env.GITHUB_TOKEN },
openai: { baseUrl: "https://api.openai.com/v1", key: process.env.OPENAI_KEY },
slack: { baseUrl: "https://slack.com/api", key: process.env.SLACK_TOKEN }
};
const { baseUrl, key } = config[args.service];
const url = new URL(`${baseUrl}${args.endpoint}`);
if (args.query) {
Object.entries(args.query).forEach(([k, v]) => url.searchParams.set(k, v));
}
const response = await fetch(url, {
method: args.method,
headers: { Authorization: `Bearer ${key}`, "Content-Type": "application/json" },
body: args.body ? JSON.stringify(args.body) : undefined
});
const data = await response.json();
return {
content: [{
type: "text",
text: JSON.stringify(data, null, 2)
}]
};
}
)
]
});
const calculatorServer = createSdkMcpServer({
name: "calculator",
version: "1.0.0",
tools: [
tool(
"calculate",
"执行数学计算",
{
expression: z.string().describe("要计算的数学表达式"),
precision: z.number().optional().default(2).describe("小数精度")
},
async (args) => {
try {
// 在生产环境中使用安全的数学计算库
const result = eval(args.expression); // 仅作示例!
const formatted = Number(result).toFixed(args.precision);
return {
content: [{
type: "text",
text: `${args.expression} = ${formatted}`
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `错误:无效表达式 - ${error.message}`
}]
};
}
}
),
tool(
"compound_interest",
"计算投资的复利",
{
principal: z.number().positive().describe("初始投资金额"),
rate: z.number().describe("年利率(作为小数,例如 0.05 表示 5%)"),
time: z.number().positive().describe("投资期限(年)"),
n: z.number().positive().default(12).describe("每年复利次数")
},
async (args) => {
const amount = args.principal * Math.pow(1 + args.rate / args.n, args.n * args.time);
const interest = amount - args.principal;
return {
content: [{
type: "text",
text: `投资分析:\n` +
`本金:$${args.principal.toFixed(2)}\n` +
`利率:${(args.rate * 100).toFixed(2)}%\n` +
`时间:${args.time} 年\n` +
`复利频率:每年 ${args.n} 次\n\n` +
`最终金额:$${amount.toFixed(2)}\n` +
`赚取利息:$${interest.toFixed(2)}\n` +
`回报率:${((interest / args.principal) * 100).toFixed(2)}%`
}]
};
}
)
]
});