Appearance
MCP (Model Context Protocol) 详解
Model Context Protocol (MCP) 是一个开放标准,旨在为AI应用程序提供安全、标准化的方式来连接外部数据源和工具。MCP使AI助手能够安全地访问本地和远程资源,同时保持用户控制和隐私。
概述
什么是MCP
MCP是由Anthropic开发的一个协议标准,它解决了AI应用程序与外部系统集成时面临的复杂性和安全性问题。通过MCP,开发者可以:
- 统一接口: 使用标准化的协议连接各种数据源和工具
- 安全访问: 确保数据传输和访问的安全性
- 简化集成: 降低AI应用与外部系统的集成复杂度
- 增强能力: 扩展AI模型的功能边界
核心价值
- 标准化: 提供统一的接口规范,避免重复开发
- 安全性: 内置安全机制,保护敏感数据
- 可扩展性: 支持多种类型的资源和工具
- 互操作性: 不同系统间的无缝协作
- 用户控制: 用户对数据访问有完全控制权
架构设计
核心组件
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ AI Client │ │ MCP Server │ │ Resources │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ Model │ │ │ │ Protocol │ │ │ │ File System │ │
│ │ │ │◄──►│ │ Handler │ │◄──►│ │ │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
│ │ │ │ │ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │
│ │ MCP Client │ │ │ │ Resource │ │ │ │ Database │ │
│ │ │ │ │ │ Manager │ │ │ │ │ │
│ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
1. MCP Client (客户端)
- 职责: 发起请求,处理响应
- 功能: 协议通信、请求管理、错误处理
- 实现: 通常集成在AI应用程序中
2. MCP Server (服务端)
- 职责: 处理客户端请求,管理资源访问
- 功能: 协议解析、权限验证、资源调度
- 实现: 独立服务或嵌入式组件
3. Resources (资源)
- 类型: 文件系统、数据库、API、工具等
- 访问: 通过MCP Server统一管理
- 安全: 权限控制和访问审计
通信模型
txt
Client Server Resource
│ │ │
│ ──── Request ────────► │ │
│ │ ──── Access ─────────► │
│ │ ◄─── Data ──────────── │
│ ◄─── Response ──────── │ │
│ │ │
协议规范
消息格式
MCP使用JSON-RPC 2.0作为基础通信协议:
json
{
"jsonrpc": "2.0",
"method": "resources/read",
"params": {
"uri": "file:///path/to/file.txt"
},
"id": 1
}
核心方法
1. 资源管理
列出资源:
json
{
"method": "resources/list",
"params": {
"cursor": null
}
}
读取资源:
json
{
"method": "resources/read",
"params": {
"uri": "file:///example.txt"
}
}
订阅资源变更:
json
{
"method": "resources/subscribe",
"params": {
"uri": "file:///watched/directory"
}
}
2. 工具调用
列出工具:
json
{
"method": "tools/list",
"params": {}
}
调用工具:
json
{
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"expression": "2 + 2"
}
}
}
3. 提示管理
获取提示:
json
{
"method": "prompts/get",
"params": {
"name": "code_review",
"arguments": {
"language": "python"
}
}
}
列出提示:
json
{
"method": "prompts/list",
"params": {}
}
错误处理
json
{
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Invalid params",
"data": {
"details": "URI format is invalid"
}
},
"id": 1
}
实现指南
服务端实现
Python示例
python
import asyncio
import json
from typing import Any, Dict, List
from mcp import McpServer, types
class FileSystemMcpServer(McpServer):
def __init__(self):
super().__init__("filesystem-server")
self.setup_handlers()
def setup_handlers(self):
@self.list_resources()
async def list_resources() -> List[types.Resource]:
"""列出可用资源"""
return [
types.Resource(
uri="file:///documents",
name="Documents",
description="User documents directory",
mimeType="inode/directory"
)
]
@self.read_resource()
async def read_resource(uri: str) -> str:
"""读取资源内容"""
if uri.startswith("file://"):
file_path = uri[7:] # 移除 file:// 前缀
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
raise ValueError(f"File not found: {file_path}")
else:
raise ValueError(f"Unsupported URI scheme: {uri}")
@self.list_tools()
async def list_tools() -> List[types.Tool]:
"""列出可用工具"""
return [
types.Tool(
name="file_search",
description="Search for files by name",
inputSchema={
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Search pattern"
}
},
"required": ["pattern"]
}
)
]
@self.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""执行工具调用"""
if name == "file_search":
pattern = arguments.get("pattern", "")
# 实现文件搜索逻辑
results = self.search_files(pattern)
return [
types.TextContent(
type="text",
text=f"Found {len(results)} files matching '{pattern}'"
)
]
else:
raise ValueError(f"Unknown tool: {name}")
def search_files(self, pattern: str) -> List[str]:
"""搜索文件的实现"""
import glob
return glob.glob(f"**/*{pattern}*", recursive=True)
# 启动服务器
async def main():
server = FileSystemMcpServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())
TypeScript示例
typescript
import { McpServer } from '@modelcontextprotocol/sdk/server';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio';
import { ListResourcesRequestSchema, ReadResourceRequestSchema } from '@modelcontextprotocol/sdk/types';
import * as fs from 'fs/promises';
import * as path from 'path';
class FileSystemServer {
private server: McpServer;
constructor() {
this.server = new McpServer(
{
name: 'filesystem-server',
version: '1.0.0'
},
{
capabilities: {
resources: {},
tools: {}
}
}
);
this.setupHandlers();
}
private setupHandlers() {
// 资源列表处理
this.server.setRequestHandler(
ListResourcesRequestSchema,
async () => {
return {
resources: [
{
uri: 'file:///documents',
name: 'Documents',
description: 'User documents directory',
mimeType: 'inode/directory'
}
]
};
}
);
// 资源读取处理
this.server.setRequestHandler(
ReadResourceRequestSchema,
async (request) => {
const { uri } = request.params;
if (uri.startsWith('file://')) {
const filePath = uri.slice(7);
try {
const content = await fs.readFile(filePath, 'utf-8');
return {
contents: [
{
uri,
mimeType: 'text/plain',
text: content
}
]
};
} catch (error) {
throw new Error(`Failed to read file: ${error.message}`);
}
}
throw new Error(`Unsupported URI scheme: ${uri}`);
}
);
}
async run() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
}
}
// 启动服务器
const server = new FileSystemServer();
server.run().catch(console.error);
客户端实现
Python客户端
python
import asyncio
from mcp import ClientSession, StdioServerParameters
class McpClient:
def __init__(self, server_path: str):
self.server_path = server_path
self.session = None
async def connect(self):
"""连接到MCP服务器"""
server_params = StdioServerParameters(
command=self.server_path,
args=[]
)
self.session = await ClientSession.create(server_params)
async def list_resources(self):
"""获取资源列表"""
if not self.session:
raise RuntimeError("Not connected to server")
response = await self.session.list_resources()
return response.resources
async def read_resource(self, uri: str):
"""读取资源内容"""
if not self.session:
raise RuntimeError("Not connected to server")
response = await self.session.read_resource(uri)
return response.contents[0].text if response.contents else None
async def call_tool(self, name: str, arguments: dict):
"""调用工具"""
if not self.session:
raise RuntimeError("Not connected to server")
response = await self.session.call_tool(name, arguments)
return response.content
async def disconnect(self):
"""断开连接"""
if self.session:
await self.session.close()
# 使用示例
async def main():
client = McpClient("python filesystem_server.py")
try:
await client.connect()
# 列出资源
resources = await client.list_resources()
print(f"Available resources: {len(resources)}")
# 读取文件
content = await client.read_resource("file:///example.txt")
print(f"File content: {content}")
# 调用工具
result = await client.call_tool("file_search", {"pattern": "*.py"})
print(f"Search result: {result}")
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
安全机制
权限控制
python
class SecurityManager:
def __init__(self):
self.permissions = {
"file_read": ["file:///safe/directory/**"],
"file_write": ["file:///temp/**"],
"tool_execute": ["calculator", "text_processor"]
}
def check_resource_permission(self, uri: str, action: str) -> bool:
"""检查资源访问权限"""
allowed_patterns = self.permissions.get(f"file_{action}", [])
for pattern in allowed_patterns:
if self.match_pattern(uri, pattern):
return True
return False
def check_tool_permission(self, tool_name: str) -> bool:
"""检查工具执行权限"""
allowed_tools = self.permissions.get("tool_execute", [])
return tool_name in allowed_tools
def match_pattern(self, uri: str, pattern: str) -> bool:
"""模式匹配实现"""
import fnmatch
return fnmatch.fnmatch(uri, pattern)
数据加密
python
import cryptography.fernet
class EncryptionManager:
def __init__(self, key: bytes):
self.cipher = cryptography.fernet.Fernet(key)
def encrypt_data(self, data: str) -> str:
"""加密数据"""
encrypted = self.cipher.encrypt(data.encode())
return encrypted.decode()
def decrypt_data(self, encrypted_data: str) -> str:
"""解密数据"""
decrypted = self.cipher.decrypt(encrypted_data.encode())
return decrypted.decode()
审计日志
python
import logging
from datetime import datetime
class AuditLogger:
def __init__(self):
self.logger = logging.getLogger('mcp_audit')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler('mcp_audit.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_resource_access(self, uri: str, action: str, user: str, success: bool):
"""记录资源访问"""
self.logger.info(
f"Resource access: {action} {uri} by {user} - {'SUCCESS' if success else 'FAILED'}"
)
def log_tool_execution(self, tool_name: str, user: str, success: bool):
"""记录工具执行"""
self.logger.info(
f"Tool execution: {tool_name} by {user} - {'SUCCESS' if success else 'FAILED'}"
)
实际应用场景
1. 文件系统集成
场景: AI助手需要访问本地文件进行分析
python
# MCP服务器配置
file_server_config = {
"name": "filesystem",
"command": "python",
"args": ["file_server.py"],
"env": {
"ALLOWED_PATHS": "/home/user/documents,/home/user/projects"
}
}
# 使用示例
async def analyze_project():
client = McpClient("filesystem")
await client.connect()
# 读取项目文件
files = await client.call_tool("list_files", {
"directory": "/home/user/projects/myapp",
"pattern": "*.py"
})
# 分析代码质量
for file_path in files:
content = await client.read_resource(f"file://{file_path}")
analysis = await analyze_code(content)
print(f"Analysis for {file_path}: {analysis}")
2. 数据库集成
场景: AI助手查询数据库获取业务数据
python
# 数据库MCP服务器
class DatabaseMcpServer(McpServer):
def __init__(self, db_config):
super().__init__("database-server")
self.db = self.connect_database(db_config)
@self.call_tool()
async def execute_query(self, name: str, arguments: dict):
if name == "sql_query":
query = arguments.get("query")
# 安全检查:只允许SELECT语句
if not query.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed")
results = await self.db.execute(query)
return [types.TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
# 使用示例
async def get_sales_data():
client = McpClient("database")
await client.connect()
result = await client.call_tool("sql_query", {
"query": "SELECT * FROM sales WHERE date >= '2024-01-01'"
})
return json.loads(result[0].text)
3. API集成
场景: AI助手调用外部API获取实时数据
python
# API MCP服务器
class ApiMcpServer(McpServer):
def __init__(self):
super().__init__("api-server")
self.http_client = httpx.AsyncClient()
@self.call_tool()
async def call_api(self, name: str, arguments: dict):
if name == "weather_api":
city = arguments.get("city")
api_key = os.getenv("WEATHER_API_KEY")
url = f"https://api.weather.com/v1/current?city={city}&key={api_key}"
response = await self.http_client.get(url)
return [types.TextContent(
type="text",
text=response.text
)]
# 使用示例
async def get_weather_info():
client = McpClient("api")
await client.connect()
weather = await client.call_tool("weather_api", {
"city": "Beijing"
})
return json.loads(weather[0].text)
4. 开发工具集成
场景: AI助手协助代码开发和调试
python
# 开发工具MCP服务器
class DevToolsMcpServer(McpServer):
def __init__(self):
super().__init__("devtools-server")
@self.call_tool()
async def dev_tools(self, name: str, arguments: dict):
if name == "run_tests":
test_path = arguments.get("path", ".")
result = subprocess.run(
["python", "-m", "pytest", test_path],
capture_output=True,
text=True
)
return [types.TextContent(
type="text",
text=f"Exit code: {result.returncode}\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
)]
elif name == "lint_code":
file_path = arguments.get("file")
result = subprocess.run(
["flake8", file_path],
capture_output=True,
text=True
)
return [types.TextContent(
type="text",
text=result.stdout or "No linting issues found"
)]
# 使用示例
async def code_review_assistant():
client = McpClient("devtools")
await client.connect()
# 运行测试
test_result = await client.call_tool("run_tests", {
"path": "tests/"
})
# 代码检查
lint_result = await client.call_tool("lint_code", {
"file": "src/main.py"
})
return {
"tests": test_result[0].text,
"linting": lint_result[0].text
}
最佳实践
1. 服务器设计
模块化设计:
python
class ModularMcpServer(McpServer):
def __init__(self):
super().__init__("modular-server")
self.modules = {
"filesystem": FileSystemModule(),
"database": DatabaseModule(),
"api": ApiModule()
}
self.setup_handlers()
def setup_handlers(self):
for module_name, module in self.modules.items():
module.register_handlers(self)
错误处理:
python
class RobustMcpServer(McpServer):
async def safe_call_tool(self, name: str, arguments: dict):
try:
return await self.call_tool(name, arguments)
except Exception as e:
self.logger.error(f"Tool call failed: {name} - {str(e)}")
return [types.TextContent(
type="text",
text=f"Error: {str(e)}"
)]
性能优化:
python
class OptimizedMcpServer(McpServer):
def __init__(self):
super().__init__("optimized-server")
self.cache = {}
self.connection_pool = ConnectionPool()
async def cached_resource_read(self, uri: str):
if uri in self.cache:
return self.cache[uri]
content = await self.read_resource(uri)
self.cache[uri] = content
return content
2. 客户端设计
连接管理:
python
class ManagedMcpClient:
def __init__(self, server_configs):
self.servers = {}
self.server_configs = server_configs
async def get_server(self, server_name: str):
if server_name not in self.servers:
config = self.server_configs[server_name]
client = McpClient(config)
await client.connect()
self.servers[server_name] = client
return self.servers[server_name]
async def cleanup(self):
for client in self.servers.values():
await client.disconnect()
重试机制:
python
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
class ReliableMcpClient(McpClient):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def reliable_call_tool(self, name: str, arguments: dict):
return await self.call_tool(name, arguments)
3. 安全实践
输入验证:
python
from jsonschema import validate, ValidationError
class SecureMcpServer(McpServer):
def __init__(self):
super().__init__("secure-server")
self.schemas = {
"file_read": {
"type": "object",
"properties": {
"path": {"type": "string", "pattern": "^/safe/.*"}
},
"required": ["path"]
}
}
def validate_input(self, tool_name: str, arguments: dict):
if tool_name in self.schemas:
try:
validate(arguments, self.schemas[tool_name])
except ValidationError as e:
raise ValueError(f"Invalid input: {e.message}")
资源限制:
python
import asyncio
from asyncio import Semaphore
class RateLimitedMcpServer(McpServer):
def __init__(self):
super().__init__("rate-limited-server")
self.semaphore = Semaphore(10) # 最多10个并发请求
self.request_counts = {}
async def handle_request(self, request):
async with self.semaphore:
client_id = self.get_client_id(request)
# 检查请求频率
if self.is_rate_limited(client_id):
raise ValueError("Rate limit exceeded")
return await super().handle_request(request)
故障排除
常见问题
1. 连接问题
问题: 客户端无法连接到服务器
解决方案:
python
# 检查服务器状态
async def diagnose_connection():
try:
client = McpClient("server_path")
await asyncio.wait_for(client.connect(), timeout=10)
print("Connection successful")
except asyncio.TimeoutError:
print("Connection timeout - check server startup")
except Exception as e:
print(f"Connection failed: {e}")
2. 权限错误
问题: 资源访问被拒绝
解决方案:
python
# 权限诊断
def diagnose_permissions(uri: str):
security_manager = SecurityManager()
for action in ["read", "write"]:
has_permission = security_manager.check_resource_permission(uri, action)
print(f"{action.upper()} permission for {uri}: {has_permission}")
3. 性能问题
问题: 响应时间过长
解决方案:
python
import time
class PerformanceMonitor:
def __init__(self):
self.metrics = {}
async def monitor_call(self, func, *args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
self.metrics[func.__name__] = duration
if duration > 5.0: # 超过5秒警告
print(f"Slow operation detected: {func.__name__} took {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
print(f"Operation failed after {duration:.2f}s: {e}")
raise
调试工具
python
class McpDebugger:
def __init__(self):
self.message_log = []
self.performance_log = []
def log_message(self, direction: str, message: dict):
"""记录消息"""
timestamp = time.time()
self.message_log.append({
"timestamp": timestamp,
"direction": direction, # "sent" or "received"
"message": message
})
def analyze_performance(self):
"""分析性能"""
if len(self.performance_log) < 2:
return
durations = [entry["duration"] for entry in self.performance_log]
avg_duration = sum(durations) / len(durations)
max_duration = max(durations)
print(f"Average response time: {avg_duration:.2f}s")
print(f"Maximum response time: {max_duration:.2f}s")
def export_logs(self, filename: str):
"""导出日志"""
with open(filename, 'w') as f:
json.dump({
"messages": self.message_log,
"performance": self.performance_log
}, f, indent=2)
生态系统
官方工具
- MCP SDK: 官方开发工具包
- MCP Inspector: 协议调试工具
- MCP Registry: 服务器注册中心
社区项目
- 文件系统服务器: 本地文件访问
- 数据库连接器: 多种数据库支持
- API网关: RESTful API集成
- 开发工具集: 代码分析和构建
集成平台
- Claude Desktop: 原生MCP支持
- VS Code扩展: 开发环境集成
- Jupyter插件: 数据科学工作流
- 企业平台: 定制化解决方案
未来发展
技术路线图
- 协议增强: 支持更多数据类型和操作
- 性能优化: 流式传输和批量操作
- 安全加强: 端到端加密和零信任架构
- 标准化: 与其他AI协议的互操作性
应用前景
- 企业集成: 大规模企业系统连接
- 边缘计算: 本地AI助手部署
- 物联网: 设备数据实时访问
- 多模态: 支持更多数据模态
MCP作为AI应用程序的标准化连接协议,正在成为构建智能系统的重要基础设施。通过提供安全、标准化的接口,MCP使AI助手能够安全地访问各种外部资源,同时保持用户的控制权和隐私保护。随着生态系统的不断发展,MCP将在AI应用的普及和标准化方面发挥越来越重要的作用。