Skip to content

MCP (Model Context Protocol) 详解

Model Context Protocol (MCP) 是一个开放标准,旨在为AI应用程序提供安全、标准化的方式来连接外部数据源和工具。MCP使AI助手能够安全地访问本地和远程资源,同时保持用户控制和隐私。

概述

什么是MCP

MCP是由Anthropic开发的一个协议标准,它解决了AI应用程序与外部系统集成时面临的复杂性和安全性问题。通过MCP,开发者可以:

  • 统一接口: 使用标准化的协议连接各种数据源和工具
  • 安全访问: 确保数据传输和访问的安全性
  • 简化集成: 降低AI应用与外部系统的集成复杂度
  • 增强能力: 扩展AI模型的功能边界

核心价值

  1. 标准化: 提供统一的接口规范,避免重复开发
  2. 安全性: 内置安全机制,保护敏感数据
  3. 可扩展性: 支持多种类型的资源和工具
  4. 互操作性: 不同系统间的无缝协作
  5. 用户控制: 用户对数据访问有完全控制权

架构设计

核心组件

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   AI Client     │    │   MCP Server    │    │   Resources     │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │   Model     │ │    │ │  Protocol   │ │    │ │ File System │ │
│ │             │ │◄──►│ │  Handler    │ │◄──►│ │             │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │ MCP Client  │ │    │ │ Resource    │ │    │ │  Database   │ │
│ │             │ │    │ │ Manager     │ │    │ │             │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘

1. MCP Client (客户端)

  • 职责: 发起请求,处理响应
  • 功能: 协议通信、请求管理、错误处理
  • 实现: 通常集成在AI应用程序中

2. MCP Server (服务端)

  • 职责: 处理客户端请求,管理资源访问
  • 功能: 协议解析、权限验证、资源调度
  • 实现: 独立服务或嵌入式组件

3. Resources (资源)

  • 类型: 文件系统、数据库、API、工具等
  • 访问: 通过MCP Server统一管理
  • 安全: 权限控制和访问审计

通信模型

txt
Client                    Server                    Resource
  │                         │                         │
  │ ──── Request ────────► │                         │
  │                         │ ──── Access ─────────► │
  │                         │ ◄─── Data ──────────── │
  │ ◄─── Response ──────── │                         │
  │                         │                         │

协议规范

消息格式

MCP使用JSON-RPC 2.0作为基础通信协议:

json
{
  "jsonrpc": "2.0",
  "method": "resources/read",
  "params": {
    "uri": "file:///path/to/file.txt"
  },
  "id": 1
}

核心方法

1. 资源管理

列出资源:

json
{
  "method": "resources/list",
  "params": {
    "cursor": null
  }
}

读取资源:

json
{
  "method": "resources/read",
  "params": {
    "uri": "file:///example.txt"
  }
}

订阅资源变更:

json
{
  "method": "resources/subscribe",
  "params": {
    "uri": "file:///watched/directory"
  }
}

2. 工具调用

列出工具:

json
{
  "method": "tools/list",
  "params": {}
}

调用工具:

json
{
  "method": "tools/call",
  "params": {
    "name": "calculator",
    "arguments": {
      "expression": "2 + 2"
    }
  }
}

3. 提示管理

获取提示:

json
{
  "method": "prompts/get",
  "params": {
    "name": "code_review",
    "arguments": {
      "language": "python"
    }
  }
}

列出提示:

json
{
  "method": "prompts/list",
  "params": {}
}

错误处理

json
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32602,
    "message": "Invalid params",
    "data": {
      "details": "URI format is invalid"
    }
  },
  "id": 1
}

实现指南

服务端实现

Python示例

python
import asyncio
import json
from typing import Any, Dict, List
from mcp import McpServer, types

class FileSystemMcpServer(McpServer):
    def __init__(self):
        super().__init__("filesystem-server")
        self.setup_handlers()
    
    def setup_handlers(self):
        @self.list_resources()
        async def list_resources() -> List[types.Resource]:
            """列出可用资源"""
            return [
                types.Resource(
                    uri="file:///documents",
                    name="Documents",
                    description="User documents directory",
                    mimeType="inode/directory"
                )
            ]
        
        @self.read_resource()
        async def read_resource(uri: str) -> str:
            """读取资源内容"""
            if uri.startswith("file://"):
                file_path = uri[7:]  # 移除 file:// 前缀
                try:
                    with open(file_path, 'r', encoding='utf-8') as f:
                        return f.read()
                except FileNotFoundError:
                    raise ValueError(f"File not found: {file_path}")
            else:
                raise ValueError(f"Unsupported URI scheme: {uri}")
        
        @self.list_tools()
        async def list_tools() -> List[types.Tool]:
            """列出可用工具"""
            return [
                types.Tool(
                    name="file_search",
                    description="Search for files by name",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "pattern": {
                                "type": "string",
                                "description": "Search pattern"
                            }
                        },
                        "required": ["pattern"]
                    }
                )
            ]
        
        @self.call_tool()
        async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
            """执行工具调用"""
            if name == "file_search":
                pattern = arguments.get("pattern", "")
                # 实现文件搜索逻辑
                results = self.search_files(pattern)
                return [
                    types.TextContent(
                        type="text",
                        text=f"Found {len(results)} files matching '{pattern}'"
                    )
                ]
            else:
                raise ValueError(f"Unknown tool: {name}")
    
    def search_files(self, pattern: str) -> List[str]:
        """搜索文件的实现"""
        import glob
        return glob.glob(f"**/*{pattern}*", recursive=True)

# 启动服务器
async def main():
    server = FileSystemMcpServer()
    await server.run()

if __name__ == "__main__":
    asyncio.run(main())

TypeScript示例

typescript
import { McpServer } from '@modelcontextprotocol/sdk/server';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio';
import { ListResourcesRequestSchema, ReadResourceRequestSchema } from '@modelcontextprotocol/sdk/types';
import * as fs from 'fs/promises';
import * as path from 'path';

class FileSystemServer {
  private server: McpServer;

  constructor() {
    this.server = new McpServer(
      {
        name: 'filesystem-server',
        version: '1.0.0'
      },
      {
        capabilities: {
          resources: {},
          tools: {}
        }
      }
    );

    this.setupHandlers();
  }

  private setupHandlers() {
    // 资源列表处理
    this.server.setRequestHandler(
      ListResourcesRequestSchema,
      async () => {
        return {
          resources: [
            {
              uri: 'file:///documents',
              name: 'Documents',
              description: 'User documents directory',
              mimeType: 'inode/directory'
            }
          ]
        };
      }
    );

    // 资源读取处理
    this.server.setRequestHandler(
      ReadResourceRequestSchema,
      async (request) => {
        const { uri } = request.params;
        
        if (uri.startsWith('file://')) {
          const filePath = uri.slice(7);
          try {
            const content = await fs.readFile(filePath, 'utf-8');
            return {
              contents: [
                {
                  uri,
                  mimeType: 'text/plain',
                  text: content
                }
              ]
            };
          } catch (error) {
            throw new Error(`Failed to read file: ${error.message}`);
          }
        }
        
        throw new Error(`Unsupported URI scheme: ${uri}`);
      }
    );
  }

  async run() {
    const transport = new StdioServerTransport();
    await this.server.connect(transport);
  }
}

// 启动服务器
const server = new FileSystemServer();
server.run().catch(console.error);

客户端实现

Python客户端

python
import asyncio
from mcp import ClientSession, StdioServerParameters

class McpClient:
    def __init__(self, server_path: str):
        self.server_path = server_path
        self.session = None
    
    async def connect(self):
        """连接到MCP服务器"""
        server_params = StdioServerParameters(
            command=self.server_path,
            args=[]
        )
        
        self.session = await ClientSession.create(server_params)
    
    async def list_resources(self):
        """获取资源列表"""
        if not self.session:
            raise RuntimeError("Not connected to server")
        
        response = await self.session.list_resources()
        return response.resources
    
    async def read_resource(self, uri: str):
        """读取资源内容"""
        if not self.session:
            raise RuntimeError("Not connected to server")
        
        response = await self.session.read_resource(uri)
        return response.contents[0].text if response.contents else None
    
    async def call_tool(self, name: str, arguments: dict):
        """调用工具"""
        if not self.session:
            raise RuntimeError("Not connected to server")
        
        response = await self.session.call_tool(name, arguments)
        return response.content
    
    async def disconnect(self):
        """断开连接"""
        if self.session:
            await self.session.close()

# 使用示例
async def main():
    client = McpClient("python filesystem_server.py")
    
    try:
        await client.connect()
        
        # 列出资源
        resources = await client.list_resources()
        print(f"Available resources: {len(resources)}")
        
        # 读取文件
        content = await client.read_resource("file:///example.txt")
        print(f"File content: {content}")
        
        # 调用工具
        result = await client.call_tool("file_search", {"pattern": "*.py"})
        print(f"Search result: {result}")
        
    finally:
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

安全机制

权限控制

python
class SecurityManager:
    def __init__(self):
        self.permissions = {
            "file_read": ["file:///safe/directory/**"],
            "file_write": ["file:///temp/**"],
            "tool_execute": ["calculator", "text_processor"]
        }
    
    def check_resource_permission(self, uri: str, action: str) -> bool:
        """检查资源访问权限"""
        allowed_patterns = self.permissions.get(f"file_{action}", [])
        
        for pattern in allowed_patterns:
            if self.match_pattern(uri, pattern):
                return True
        
        return False
    
    def check_tool_permission(self, tool_name: str) -> bool:
        """检查工具执行权限"""
        allowed_tools = self.permissions.get("tool_execute", [])
        return tool_name in allowed_tools
    
    def match_pattern(self, uri: str, pattern: str) -> bool:
        """模式匹配实现"""
        import fnmatch
        return fnmatch.fnmatch(uri, pattern)

数据加密

python
import cryptography.fernet

class EncryptionManager:
    def __init__(self, key: bytes):
        self.cipher = cryptography.fernet.Fernet(key)
    
    def encrypt_data(self, data: str) -> str:
        """加密数据"""
        encrypted = self.cipher.encrypt(data.encode())
        return encrypted.decode()
    
    def decrypt_data(self, encrypted_data: str) -> str:
        """解密数据"""
        decrypted = self.cipher.decrypt(encrypted_data.encode())
        return decrypted.decode()

审计日志

python
import logging
from datetime import datetime

class AuditLogger:
    def __init__(self):
        self.logger = logging.getLogger('mcp_audit')
        self.logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('mcp_audit.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_resource_access(self, uri: str, action: str, user: str, success: bool):
        """记录资源访问"""
        self.logger.info(
            f"Resource access: {action} {uri} by {user} - {'SUCCESS' if success else 'FAILED'}"
        )
    
    def log_tool_execution(self, tool_name: str, user: str, success: bool):
        """记录工具执行"""
        self.logger.info(
            f"Tool execution: {tool_name} by {user} - {'SUCCESS' if success else 'FAILED'}"
        )

实际应用场景

1. 文件系统集成

场景: AI助手需要访问本地文件进行分析

python
# MCP服务器配置
file_server_config = {
    "name": "filesystem",
    "command": "python",
    "args": ["file_server.py"],
    "env": {
        "ALLOWED_PATHS": "/home/user/documents,/home/user/projects"
    }
}

# 使用示例
async def analyze_project():
    client = McpClient("filesystem")
    await client.connect()
    
    # 读取项目文件
    files = await client.call_tool("list_files", {
        "directory": "/home/user/projects/myapp",
        "pattern": "*.py"
    })
    
    # 分析代码质量
    for file_path in files:
        content = await client.read_resource(f"file://{file_path}")
        analysis = await analyze_code(content)
        print(f"Analysis for {file_path}: {analysis}")

2. 数据库集成

场景: AI助手查询数据库获取业务数据

python
# 数据库MCP服务器
class DatabaseMcpServer(McpServer):
    def __init__(self, db_config):
        super().__init__("database-server")
        self.db = self.connect_database(db_config)
    
    @self.call_tool()
    async def execute_query(self, name: str, arguments: dict):
        if name == "sql_query":
            query = arguments.get("query")
            # 安全检查:只允许SELECT语句
            if not query.strip().upper().startswith("SELECT"):
                raise ValueError("Only SELECT queries are allowed")
            
            results = await self.db.execute(query)
            return [types.TextContent(
                type="text",
                text=json.dumps(results, indent=2)
            )]

# 使用示例
async def get_sales_data():
    client = McpClient("database")
    await client.connect()
    
    result = await client.call_tool("sql_query", {
        "query": "SELECT * FROM sales WHERE date >= '2024-01-01'"
    })
    
    return json.loads(result[0].text)

3. API集成

场景: AI助手调用外部API获取实时数据

python
# API MCP服务器
class ApiMcpServer(McpServer):
    def __init__(self):
        super().__init__("api-server")
        self.http_client = httpx.AsyncClient()
    
    @self.call_tool()
    async def call_api(self, name: str, arguments: dict):
        if name == "weather_api":
            city = arguments.get("city")
            api_key = os.getenv("WEATHER_API_KEY")
            
            url = f"https://api.weather.com/v1/current?city={city}&key={api_key}"
            response = await self.http_client.get(url)
            
            return [types.TextContent(
                type="text",
                text=response.text
            )]

# 使用示例
async def get_weather_info():
    client = McpClient("api")
    await client.connect()
    
    weather = await client.call_tool("weather_api", {
        "city": "Beijing"
    })
    
    return json.loads(weather[0].text)

4. 开发工具集成

场景: AI助手协助代码开发和调试

python
# 开发工具MCP服务器
class DevToolsMcpServer(McpServer):
    def __init__(self):
        super().__init__("devtools-server")
    
    @self.call_tool()
    async def dev_tools(self, name: str, arguments: dict):
        if name == "run_tests":
            test_path = arguments.get("path", ".")
            result = subprocess.run(
                ["python", "-m", "pytest", test_path],
                capture_output=True,
                text=True
            )
            
            return [types.TextContent(
                type="text",
                text=f"Exit code: {result.returncode}\n\nSTDOUT:\n{result.stdout}\n\nSTDERR:\n{result.stderr}"
            )]
        
        elif name == "lint_code":
            file_path = arguments.get("file")
            result = subprocess.run(
                ["flake8", file_path],
                capture_output=True,
                text=True
            )
            
            return [types.TextContent(
                type="text",
                text=result.stdout or "No linting issues found"
            )]

# 使用示例
async def code_review_assistant():
    client = McpClient("devtools")
    await client.connect()
    
    # 运行测试
    test_result = await client.call_tool("run_tests", {
        "path": "tests/"
    })
    
    # 代码检查
    lint_result = await client.call_tool("lint_code", {
        "file": "src/main.py"
    })
    
    return {
        "tests": test_result[0].text,
        "linting": lint_result[0].text
    }

最佳实践

1. 服务器设计

模块化设计:

python
class ModularMcpServer(McpServer):
    def __init__(self):
        super().__init__("modular-server")
        self.modules = {
            "filesystem": FileSystemModule(),
            "database": DatabaseModule(),
            "api": ApiModule()
        }
        self.setup_handlers()
    
    def setup_handlers(self):
        for module_name, module in self.modules.items():
            module.register_handlers(self)

错误处理:

python
class RobustMcpServer(McpServer):
    async def safe_call_tool(self, name: str, arguments: dict):
        try:
            return await self.call_tool(name, arguments)
        except Exception as e:
            self.logger.error(f"Tool call failed: {name} - {str(e)}")
            return [types.TextContent(
                type="text",
                text=f"Error: {str(e)}"
            )]

性能优化:

python
class OptimizedMcpServer(McpServer):
    def __init__(self):
        super().__init__("optimized-server")
        self.cache = {}
        self.connection_pool = ConnectionPool()
    
    async def cached_resource_read(self, uri: str):
        if uri in self.cache:
            return self.cache[uri]
        
        content = await self.read_resource(uri)
        self.cache[uri] = content
        return content

2. 客户端设计

连接管理:

python
class ManagedMcpClient:
    def __init__(self, server_configs):
        self.servers = {}
        self.server_configs = server_configs
    
    async def get_server(self, server_name: str):
        if server_name not in self.servers:
            config = self.server_configs[server_name]
            client = McpClient(config)
            await client.connect()
            self.servers[server_name] = client
        
        return self.servers[server_name]
    
    async def cleanup(self):
        for client in self.servers.values():
            await client.disconnect()

重试机制:

python
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

class ReliableMcpClient(McpClient):
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    async def reliable_call_tool(self, name: str, arguments: dict):
        return await self.call_tool(name, arguments)

3. 安全实践

输入验证:

python
from jsonschema import validate, ValidationError

class SecureMcpServer(McpServer):
    def __init__(self):
        super().__init__("secure-server")
        self.schemas = {
            "file_read": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "pattern": "^/safe/.*"}
                },
                "required": ["path"]
            }
        }
    
    def validate_input(self, tool_name: str, arguments: dict):
        if tool_name in self.schemas:
            try:
                validate(arguments, self.schemas[tool_name])
            except ValidationError as e:
                raise ValueError(f"Invalid input: {e.message}")

资源限制:

python
import asyncio
from asyncio import Semaphore

class RateLimitedMcpServer(McpServer):
    def __init__(self):
        super().__init__("rate-limited-server")
        self.semaphore = Semaphore(10)  # 最多10个并发请求
        self.request_counts = {}
    
    async def handle_request(self, request):
        async with self.semaphore:
            client_id = self.get_client_id(request)
            
            # 检查请求频率
            if self.is_rate_limited(client_id):
                raise ValueError("Rate limit exceeded")
            
            return await super().handle_request(request)

故障排除

常见问题

1. 连接问题

问题: 客户端无法连接到服务器

解决方案:

python
# 检查服务器状态
async def diagnose_connection():
    try:
        client = McpClient("server_path")
        await asyncio.wait_for(client.connect(), timeout=10)
        print("Connection successful")
    except asyncio.TimeoutError:
        print("Connection timeout - check server startup")
    except Exception as e:
        print(f"Connection failed: {e}")

2. 权限错误

问题: 资源访问被拒绝

解决方案:

python
# 权限诊断
def diagnose_permissions(uri: str):
    security_manager = SecurityManager()
    
    for action in ["read", "write"]:
        has_permission = security_manager.check_resource_permission(uri, action)
        print(f"{action.upper()} permission for {uri}: {has_permission}")

3. 性能问题

问题: 响应时间过长

解决方案:

python
import time

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {}
    
    async def monitor_call(self, func, *args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            duration = time.time() - start_time
            self.metrics[func.__name__] = duration
            
            if duration > 5.0:  # 超过5秒警告
                print(f"Slow operation detected: {func.__name__} took {duration:.2f}s")
            
            return result
        except Exception as e:
            duration = time.time() - start_time
            print(f"Operation failed after {duration:.2f}s: {e}")
            raise

调试工具

python
class McpDebugger:
    def __init__(self):
        self.message_log = []
        self.performance_log = []
    
    def log_message(self, direction: str, message: dict):
        """记录消息"""
        timestamp = time.time()
        self.message_log.append({
            "timestamp": timestamp,
            "direction": direction,  # "sent" or "received"
            "message": message
        })
    
    def analyze_performance(self):
        """分析性能"""
        if len(self.performance_log) < 2:
            return
        
        durations = [entry["duration"] for entry in self.performance_log]
        avg_duration = sum(durations) / len(durations)
        max_duration = max(durations)
        
        print(f"Average response time: {avg_duration:.2f}s")
        print(f"Maximum response time: {max_duration:.2f}s")
    
    def export_logs(self, filename: str):
        """导出日志"""
        with open(filename, 'w') as f:
            json.dump({
                "messages": self.message_log,
                "performance": self.performance_log
            }, f, indent=2)

生态系统

官方工具

  1. MCP SDK: 官方开发工具包
  2. MCP Inspector: 协议调试工具
  3. MCP Registry: 服务器注册中心

社区项目

  1. 文件系统服务器: 本地文件访问
  2. 数据库连接器: 多种数据库支持
  3. API网关: RESTful API集成
  4. 开发工具集: 代码分析和构建

集成平台

  1. Claude Desktop: 原生MCP支持
  2. VS Code扩展: 开发环境集成
  3. Jupyter插件: 数据科学工作流
  4. 企业平台: 定制化解决方案

未来发展

技术路线图

  1. 协议增强: 支持更多数据类型和操作
  2. 性能优化: 流式传输和批量操作
  3. 安全加强: 端到端加密和零信任架构
  4. 标准化: 与其他AI协议的互操作性

应用前景

  1. 企业集成: 大规模企业系统连接
  2. 边缘计算: 本地AI助手部署
  3. 物联网: 设备数据实时访问
  4. 多模态: 支持更多数据模态

MCP作为AI应用程序的标准化连接协议,正在成为构建智能系统的重要基础设施。通过提供安全、标准化的接口,MCP使AI助手能够安全地访问各种外部资源,同时保持用户的控制权和隐私保护。随着生态系统的不断发展,MCP将在AI应用的普及和标准化方面发挥越来越重要的作用。