13.2 MCP 服务器连接

服务器配置结构

每个 MCP 服务器的配置定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/mcp/types.ts

export type MCPServerConfig = {
/** 服务器名称(唯一标识) */
name: string;
/** 启动命令 */
command: string;
/** 命令参数 */
args?: string[];
/** 环境变量 */
env?: { [key: string]: string };
/** 是否启用 */
enabled?: boolean;
/** 超时时间(毫秒) */
timeout?: number;
};

配置示例

1
2
3
4
5
6
{
"name": "filesystem",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"enabled": true
}

StdioClientTransport

MCP Client 和 Server 通过 stdio(标准输入/输出) 通信:

工作原理

1
2
3
4
5
6
┌─────────────┐                    ┌─────────────┐
│ MCP Client │ stdout → stdin │ MCP Server │
│ (Agent) │ ←──────┬───────→ │ (独立进程) │
└─────────────┘ │ └─────────────┘

JSON-RPC 消息

传输实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// src/mcp/client.ts

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

export class MCPServerConnection {
private client: Client | null = null;
private config: MCPServerConfig;
private connected: boolean = false;

constructor(config: MCPServerConfig) {
this.config = config;
}

/**
* 连接到 MCP 服务器
*/
async connect(): Promise<void> {
if (this.connected) {
return;
}

try {
// 1. 准备环境变量
const env: Record<string, string> = { ...process.env };
if (this.config.env) {
for (const [key, value] of Object.entries(this.config.env)) {
if (typeof value === "string") {
env[key] = value;
}
}
}

// 2. 创建 stdio 传输
const transport = new StdioClientTransport({
command: this.config.command,
args: this.config.args || [],
env,
});

// 3. 创建客户端
this.client = new Client(
{
name: "agent-client",
version: "1.0.0",
},
{
capabilities: {},
}
);

// 4. 连接
await this.client.connect(transport);
this.connected = true;
} catch (error) {
throw new Error(
`连接到 MCP 服务器 "${this.config.name}" 失败: ` +
`${error instanceof Error ? error.message : String(error)}`
);
}
}
}

连接流程详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
启动流程
├── 1. 创建子进程
│ └── npx -y @modelcontextprotocol/server-filesystem /tmp

├── 2. 建立 stdio 管道
│ ├── Client.stdout → Server.stdin
│ └── Server.stdout → Client.stdin

├── 3. 发送初始化消息 (initialize)
│ └── {"method": "initialize", "params": {...}}

├── 4. 等待服务器响应
│ └── {"result": {"capabilities": {...}}}

└── 5. 连接成功

服务器生命周期

连接状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 检查是否已连接
*/
isConnected(): boolean {
return this.connected;
}

/**
* 检查是否启用
*/
isEnabled(): boolean {
return this.config.enabled !== false;
}

断开连接

1
2
3
4
5
6
7
8
9
10
/**
* 断开连接
*/
async disconnect(): Promise<void> {
if (this.client && this.connected) {
await this.client.close();
this.connected = false;
this.client = null;
}
}

工具列表获取

连接成功后,可以获取服务器提供的工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 列出可用工具
*/
async listTools(): Promise<MCPTool[]> {
if (!this.client || !this.connected) {
return [];
}

try {
const response = await this.client.listTools() as any;
return (response.tools || []) as MCPTool[];
} catch (error) {
console.error(`列出工具失败:`, error);
return [];
}
}

MCP 工具格式

1
2
3
4
5
6
7
8
9
export type MCPTool = {
name: string;
description?: string;
inputSchema: {
type: "object";
properties?: { [key: string]: any };
required?: string[];
};
};

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"name": "read_file",
"description": "读取文件内容",
"inputSchema": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "文件路径"
}
},
"required": ["path"]
}
}

工具调用

callTool 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 调用工具
*/
async callTool(name: string, args: any): Promise<MCPToolResult> {
if (!this.client || !this.connected) {
throw new Error(`MCP 服务器 "${this.config.name}" 未连接`);
}

try {
const result = await this.client.callTool({
name,
arguments: args,
}) as any;

return result as MCPToolResult;
} catch (error) {
return {
content: [
{
type: "text",
text: `错误: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}

MCP 工具结果格式

1
2
3
4
5
6
7
8
9
export type MCPToolResult = {
content: Array<{
type: "text" | "image" | "resource";
text?: string;
data?: string;
mimeType?: string;
}>;
isError?: boolean;
};

成功结果示例:

1
2
3
4
5
6
7
8
{
"content": [
{
"type": "text",
"text": "文件内容:Hello World!"
}
]
}

错误结果示例:

1
2
3
4
5
6
7
8
9
{
"content": [
{
"type": "text",
"text": "文件不存在"
}
],
"isError": true
}

错误处理

连接失败处理

1
2
3
4
5
6
7
8
9
10
async connect(): Promise<void> {
try {
// 连接逻辑...
} catch (error) {
throw new Error(
`连接到 MCP 服务器 "${this.config.name}" 失败: ` +
`${error instanceof Error ? error.message : String(error)}`
);
}
}

优雅降级

1
2
3
4
5
6
7
8
9
10
// 连接失败不应阻止 Agent 启动
const connection = new MCPServerConnection(config);

try {
await connection.connect();
console.log(`✓ MCP 服务器 "${config.name}" 连接成功`);
} catch (error) {
console.warn(`✗ MCP 服务器 "${config.name}" 连接失败: ${error.message}`);
console.warn(` 该服务器的工具将不可用`);
}

超时控制

设置超时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MCPServerConnection {
private config: MCPServerConfig;
private timeout: number;

constructor(config: MCPServerConfig) {
this.config = config;
this.timeout = config.timeout || 30000; // 默认 30 秒
}

async connect(): Promise<void> {
// 使用 Promise.race 实现超时
const connectPromise = this.doConnect();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error("连接超时")), this.timeout)
);

await Promise.race([connectPromise, timeoutPromise]);
}
}

多服务器管理

MCPManager 设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// src/mcp/manager.ts

export class MCPManager {
private connections: Map<string, MCPServerConnection> = new Map();
private tools: Map<string, { server: string; tool: MCPTool }> = new Map();

/**
* 添加 MCP 服务器
*/
addServer(config: MCPServerConfig): void {
const connection = new MCPServerConnection(config);
this.connections.set(config.name, connection);
}

/**
* 移除 MCP 服务器
*/
removeServer(name: string): void {
const connection = this.connections.get(name);
if (connection) {
connection.disconnect().catch(console.error);
this.connections.delete(name);
}

// 移除该服务器的所有工具
for (const [toolName, toolInfo] of this.tools.entries()) {
if (toolInfo.server === name) {
this.tools.delete(toolName);
}
}
}
}

批量连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 连接所有启用的服务器
*/
async connectAll(): Promise<void> {
const connectionPromises: Promise<void>[] = [];

for (const [name, connection] of this.connections.entries()) {
if (connection.isEnabled()) {
connectionPromises.push(
connection.connect().catch((error) => {
console.warn(`连接到 MCP 服务器 "${name}" 失败:`, error.message);
})
);
}
}

await Promise.all(connectionPromises);

// 加载所有工具
await this.loadAllTools();
}

批量断开

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 断开所有服务器连接
*/
async disconnectAll(): Promise<void> {
const disconnectPromises: Promise<void>[] = [];

for (const connection of this.connections.values()) {
disconnectPromises.push(connection.disconnect());
}

await Promise.all(disconnectPromises);
this.tools.clear();
}

统计信息

获取服务器状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 获取服务器统计信息
*/
getStats(): {
totalServers: number;
connectedServers: number;
totalTools: number;
} {
let connectedServers = 0;

for (const connection of this.connections.values()) {
if (connection.isConnected()) {
connectedServers++;
}
}

return {
totalServers: this.connections.size,
connectedServers,
totalTools: this.tools.size,
};
}

获取已连接服务器列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 获取已连接的服务器列表
*/
getConnectedServers(): string[] {
const connected: string[] = [];

for (const [name, connection] of this.connections.entries()) {
if (connection.isConnected()) {
connected.push(name);
}
}

return connected;
}

小结

本节介绍了 MCP 服务器连接的实现:

  • 服务器配置结构
  • StdioClientTransport 原理
  • 连接和断开流程
  • 工具列表获取
  • 工具调用方法
  • 错误处理和超时控制
  • 多服务器管理

导航

上一篇: 13.1 MCP 协议概述

下一篇: 13.3 工具适配器实现