【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构
随着业务规模的不断扩大和系统复杂度的提升,单一服务器架构往往无法满足高并发、高可用性和弹性扩展的需求。在MCP生态系统中,多服务器协作架构成为构建大规模应用的必然选择。本文将深入探讨MCP TypeScript-SDK在多服务器环境下的部署、协作和管理,以及如何构建高可用、高性能、易扩展的分布式MCP应用。
目录
-
多服务器架构基础
1.1 MCP多服务器架构概述
1.2 分布式系统的挑战与解决方案
1.3 MCP服务器分类与职责划分
1.4 多服务器架构的常见模式 -
服务发现与路由
2.1 服务注册与发现机制
2.2 基于DNS的服务发现
2.3 动态路由与负载均衡
2.4 服务健康检查与自动恢复 -
状态同步与一致性
3.1 分布式状态管理策略
3.2 事件驱动的状态同步
3.3 实现最终一致性
3.4 数据分区与冲突解决 -
负载均衡与容错设计
4.1 负载均衡策略与实现
4.2 故障检测与自动恢复
4.3 优雅降级与熔断机制
4.4 构建高可用MCP集群
1. 多服务器架构基础
在深入探讨MCP多服务器协作之前,我们需要先了解多服务器架构的核心概念、挑战以及MCP SDK提供的解决方案。
1.1 MCP多服务器架构概述
MCP(Model Context Protocol)作为一种为大型语言模型(LLM)交互设计的协议,其服务架构需要适应复杂多变的业务场景和负载模式。多服务器MCP架构是指将MCP服务分布在多个物理或虚拟服务器上,通过协作方式提供统一的服务能力。
// MCP服务器实例类型
interface McpServerInstance {
id: string; // 服务器唯一标识
host: string; // 主机地址
port: number; // 端口号
role: 'master' | 'worker' | 'specialized'; // 服务器角色
status: 'online' | 'offline' | 'degraded'; // 服务器状态
capacity: {
// 服务器容量信息
maxConcurrentRequests: number;
currentLoad: number;
};
features: string[]; // 支持的特性
startTime: Date; // 启动时间
}
// 多服务器集群配置
interface McpClusterConfig {
serviceName: string; // 服务名称
serviceVersion: string;// 服务版本
discoveryMethod: 'static' | 'dns' | 'registry'; // 服务发现方式
heartbeatInterval: number; // 心跳间隔
syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略
loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 负载均衡策略
failoverPolicy: {
// 故障转移策略
retryAttempts: number;
retryDelay: number;
circuitBreakerThreshold: number;
};
}
多服务器MCP架构具有以下几个核心特点:
- 水平扩展性:通过增加服务器数量来线性提升系统整体处理能力
- 高可用性:即使部分服务器故障,系统整体仍可持续提供服务
- 负载分散:将请求负载分散到多个服务器,避免单点压力过大
- 资源隔离:可按业务功能或客户需求隔离资源,提高安全性和稳定性
- 灵活部署:支持混合云、多区域、边缘计算等多样化部署模式
1.2 分布式系统的挑战与解决方案
构建分布式MCP服务面临一系列挑战,MCP TypeScript-SDK针对这些挑战提供了相应的解决方案:
挑战 | 表现 | MCP SDK解决方案 |
---|---|---|
网络不可靠 | 网络延迟、分区、丢包 | 重试机制、异步通信、断线重连 |
一致性问题 | 数据不一致、冲突 | 事件驱动同步、版本控制、冲突解决策略 |
服务协调 | 服务发现、路由 | 服务注册表、服务健康检查、动态路由 |
故障处理 | 节点故障、服务降级 | 故障检测、自动恢复、熔断机制 |
性能瓶颈 | 请求拥塞、资源竞争 | 负载均衡、请求限流、资源隔离 |
// MCP服务器故障处理配置
interface McpFailoverConfig {
detection: {
method: 'heartbeat' | 'ping' | 'health-endpoint';
interval: number; // 检测间隔
timeout: number; // 超时时间
thresholds: {
warning: number; // 警告阈值
critical: number; // 临界阈值
}
};
recovery: {
strategy: 'restart' | 'replace' | 'redirect';
cooldownPeriod: number; // 冷却时间
maxAttempts: number; // 最大尝试次数
};
notification: {
channels: string[]; // 通知渠道
templates: Record<string, string>; // 通知模板
}
}
1.3 MCP服务器分类与职责划分
在多服务器架构中,不同的MCP服务器可以承担不同的角色和职责,实现功能分离和专业化:
1.3.1 按角色分类
-
主服务器(Master):
- 管理集群状态和配置
- 协调跨服务器的资源分配
- 监控集群健康状态
- 通常部署较少数量,配置较高
-
工作服务器(Worker):
- 处理客户端的实际请求
- 执行MCP资源访问和工具调用
- 可大规模部署,构成系统处理能力的主体
- 通常是无状态的,便于横向扩展
-
专用服务器(Specialized):
- 专注于特定功能或业务场景
- 例如:AI推理服务器、数据处理服务器等
- 可根据需求定制化配置
- 适合资源密集型或安全敏感型业务
// MCP服务器角色定义和职责配置
import {
McpServer } from '@modelcontextprotocol/sdk';
// 主服务器配置
const masterConfig = {
name: "mcp-master-server",
description: "MCP集群主服务器",
version: "1.0.0",
cluster: {
role: "master",
workers: ["worker-1", "worker-2", "worker-3"],
electionStrategy: "fixed", // 固定主服务器
stateSync: {
method: "push",
interval: 5000
}
}
};
// 工作服务器配置
const workerConfig = {
name: "mcp-worker-server",
description: "MCP集群工作服务器",
version: "1.0.0",
cluster: {
role: "worker",
masterId: "master-1",
maxConcurrentRequests: 1000,
resourceCacheSize: 500,
reportInterval: 2000
}
};
// 专用服务器配置
const specializedConfig = {
name: "mcp-specialized-server",
description: "MCP图像处理专用服务器",
version: "1.0.0",
cluster: {
role: "specialized",
serviceType: "image-processing",
supportedOperations: ["resize", "filter", "recognize"],
resourceRequirements: {
gpu: true,
minMemory: "16G",
cpuCores: 8
}
}
};
1.3.2 按功能分类
-
API网关服务器:
- 请求路由和负载均衡
- 认证和授权处理
- 请求限流和缓存
- 请求/响应转换
-
资源服务器:
- 管理MCP资源模板
- 处理资源访问请求
- 资源缓存和优化
- 资源版本控制
-
工具服务器:
- 托管和执行MCP工具
- 工具依赖管理
- 工具执行环境隔离
- 工具性能监控
-
状态同步服务器:
- 维护全局状态
- 协调跨服务器状态同步
- 处理分布式事务
- 解决数据冲突
// 按功能划分的MCP服务器实现示例
import {
McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import {
createProxyMiddleware } from 'http-proxy-middleware';
// API网关服务器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({
name: "mcp-api-gateway",
description: "MCP API网关服务器",
version: "1.0.0"
});
// 路由配置
const routeConfig = {
'/api/resources': {
target: 'http://resource-server:3001', pathRewrite: {
'^/api': ''} },
'/api/tools': {
target: 'http://tool-server:3002', pathRewrite: {
'^/api': ''} }
};
// 设置API代理
Object.entries(routeConfig).forEach(([path, config]) => {
apiGatewayApp.use(path, createProxyMiddleware(config));
});
// 资源服务器
const resourceServer = new McpServer({
name: "mcp-resource-server",
description: "MCP资源服务器",
version: "1.0.0"
});
// 注册资源模板
resourceServer.registerResourceTemplate({
name: "users",
description: "用户资源",
schema: userSchema
});
// 工具服务器
const toolServer = new McpServer({
name: "mcp-tool-server",
description: "MCP工具服务器",
version: "1.0.0"
});
// 注册工具
toolServer.registerTool({
name: "calculator",
description: "计算工具",
parameters: calculatorSchema,
execute: async (params) => {
// 计算逻辑
return {
result: calculateExpression(params.expression) };
}
});
1.4 多服务器架构的常见模式
在实际应用中,MCP多服务器架构可以采用多种模式,根据业务需求和资源条件灵活选择:
1.4.1 主从模式(Master-Slave)
最常见的分布式架构模式,一个主服务器协调多个从服务器的工作。
┌────────────┐ ┌────────────┐
│ │ │ │
│ Master │◄────►│ Slave 1 │
│ │ │ │
└────────────┘ └────────────┘
▲
│
▼
┌────────────┐ ┌────────────┐
│ │ │ │
│ Slave 2 │◄────►│ Slave 3 │
│ │ │ │
└────────────┘ └────────────┘
// 主从模式实现示例
import {
McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import {
createClient } from 'redis';
// 创建Redis客户端用于服务器间通信
const pubClient = createClient({
url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
// 主服务器实现
class MasterMcpServer extends McpServer {
private slaves: Map<string, SlaveInfo> = new Map();
constructor(config) {
super(config);
// 订阅从服务器状态更新
subClient.subscribe('slave-status', (message) => {
const slaveStatus = JSON.parse(message);
this.updateSlaveStatus(slaveStatus);
});
// 定期检查从服务器健康状态
setInterval(() => this.checkSlavesHealth(), 10000);
}
// 注册从服务器
registerSlave(slaveId: string, info: SlaveInfo) {
this.slaves.set(slaveId, {
...info, lastHeartbeat: Date.now() });
this.emit('slave-registered', {
slaveId, info });
}
// 更新从服务器状态
updateSlaveStatus(status: SlaveStatus) {
const slave = this.slaves.get(status.slaveId);
if (slave) {
this.slaves.set(status.slaveId, {
...slave,
...status,
lastHeartbeat: Date.now()
});
}
}
// 检查从服务器健康状态
checkSlavesHealth() {
const now = Date.now();
for (const [slaveId, info] of this.slaves.entries()) {
if (now - info.lastHeartbeat > 30000) {
// 30秒无心跳
this.emit('slave-offline', {
slaveId });
this.slaves.delete(slaveId);
}
}
}
}
// 从服务器实现
class SlaveMcpServer extends McpServer {
private masterId: string;
constructor(config) {
super(config);
this.masterId = config.masterId;
// 定期向主服务器发送心跳
setInterval(() => this.sendHeartbeat(), 5000);
// 订阅主服务器指令
subClient.subscribe(`slave-command:${
this.id}`, (message) => {
this.handleMasterCommand(JSON.parse(message));
});
}
// 发送心跳
sendHeartbeat() {
const status = {
slaveId: this.id,
load: this.getCurrentLoad(),
memory: process.memoryUsage(),
status: 'online'
};
pubClient.publish('slave-status', JSON.stringify(status));
}
// 处理主服务器指令
handleMasterCommand(command) {
switch (command.type) {
case 'restart':
this.restart();
break;
case 'update-config':
this.updateConfig(command.config);
break;
// 其他指令处理...
}
}
}
1.4.2 去中心化模式(Peer-to-Peer)
所有服务器地位平等,通过协作提供服务,无单点故障风险。
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 1 │ │ Node 2 │
│ │◄────►│ │
└────────────┘ └────────────┘
▲ ▲ ▲
│ └───────┐ ┌───┘
│ ▼ ▼
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 3 │ │ Node 4 │
│ │◄────►│ │
└────────────┘ └────────────┘
// 去中心化模式实现示例
import {
McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import {
v4 as uuidv4 } from 'uuid';
// P2P MCP服务器
class P2PMcpServer extends McpServer {
private ipfs;
private peerId: string;
private peers: Map<string, PeerInfo> = new Map();
private eventTopic = 'mcp-p2p-events';
constructor(config) {
super(config);
this.peerId = uuidv4();
this.initializeP2P();
}
// 初始化P2P网络
async initializeP2P() {
// 创建IPFS节点
this.ipfs = await IPFS.create();
// 获取本节点ID
const nodeInfo = await this.ipfs.id();
console.log(`P2P节点ID: ${
nodeInfo.id}`);
// 订阅事件主题
await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {
this.handleP2PEvent(msg);
});
// 定期发布状态
setInterval(() => this.broadcastStatus(), 5000);
// 发布上线事件
this.broadcastEvent({
type: 'node-online',
peerId: this.peerId,
timestamp: Date.now(),
info: {
resources: this.getResourceNames(),
tools: this.getToolNames(),
capacity: this.getCapacity()
}
});
}
// 处理P2P事件
handleP2PEvent(message) {
try {
const event = JSON.parse(message.data.toString());
// 忽略自己发出的消息
if (event.peerId === this.peerId) return;
switch (event.type) {
case 'node-online':
this.addPeer(event.peerId, event.info);
// 向新节点发送欢迎消息
this.sendDirectEvent(event.peerId, {
type: 'welcome',
peerId: this.peerId,
timestamp: Date.now()
});
break;
case 'node-offline':
this.removePeer(event.peerId);
break;
case 'status-update':
this.updatePeerStatus(event.peerId, event.status);
break;
case 'resource-request':
this.handleResourceRequest(event);
break;
case 'tool-execution':
this.handleToolExecution(event);
break;
}
} catch (error) {
console.error('处理P2P事件错误:', error);
}
}
// 广播事件到所有节点
async broadcastEvent(event) {
await this.ipfs.pubsub.publish(
this.eventTopic,
Buffer.from(JSON.stringify(event))
);
}
// 发送事件到特定节点
async sendDirectEvent(targetPeerId, event) {
const peer = this.peers.get(targetPeerId);
if (!peer || !peer.directTopic) return;
await this.ipfs.pubsub.publish(
peer.directTopic,
Buffer.from(JSON.stringify(event))
);
}
// 定期广播状态
broadcastStatus() {
this.broadcastEvent({
type: 'status-update',
peerId: this.peerId,
timestamp: Date.now(),
status: {
load: this.getCurrentLoad(),
memory: process.memoryUsage(),
uptime: process.uptime()
}
});
}
// 添加对等节点
addPeer(peerId: string, info: any) {
this.peers.set(peerId, {
...info,
directTopic: `mcp-p2p-direct:${
peerId}`,
lastSeen: Date.now()
});
// 订阅此节点的直接通信主题
this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${
this.peerId}`, (msg) => {
this.handleDirectMessage(msg);
});
}
// 移除对等节点
removePeer(peerId: string) {
this.peers.delete(peerId);
}
// 更新对等节点状态
updatePeerStatus(peerId: string, status: any) {
const peer = this.peers.get(peerId);
if (peer) {
this.peers.set(peerId, {
...peer, ...status, lastSeen: Date.now() });
}
}
}
1.4.3 微服务模式(Microservices)
将MCP功能分解为多个独立服务,每个服务专注于特定功能域。
┌─────────────────────────────────────────────┐
│ API Gateway │
└─────────────┬─────────────┬─────────────────┘
│ │
┌─────────▼───┐ ┌─────▼───────┐ ┌─────────────┐
│ Resource │ │ Tool │ │ Identity │
│ Service │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
┌─────────▼───┐ ┌─────▼───────┐ ┌─────▼───────┐
│ Storage │ │ Execution │ │ Security │
│ Service │ │ Service │ │ Service │
└─────────────┘ └─────────────┘ └─────────────┘
// 微服务模式实现示例
import {
McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import {
MongoClient } from 'mongodb';
import {
v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';
// 服务发现客户端
class ServiceDiscovery {
private serviceRegistry: Map<string, ServiceInfo[]> = new Map();
private consulUrl: string;
constructor(consulUrl: string) {
this.consulUrl = consulUrl;
this.refreshRegistry();
setInterval(() => this.refreshRegistry(), 30000);
}
// 刷新服务注册表
async refreshRegistry() {
try {
const response = await axios.get(`${
this.consulUrl}/v1/catalog/services`);
for (const [serviceName, tags] of Object.entries(response.data)) {
if (tags.includes('mcp')) {
const instances = await this.getServiceInstances(serviceName);
this.serviceRegistry.set(serviceName, instances);
}
}
} catch (error) {
console.error('刷新服务注册表失败:', error);
}
}
// 获取服务实例列表
async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {
try {
const response = await axios.get(
`${
this.consulUrl}/v1/health/service/${
serviceName}?passing=true`
);
return response.data.map(entry => ({
id: entry.Service.ID,
address: entry.Service.Address,
port: entry.Service.Port,
tags: entry.Service.Tags,
meta: entry.Service.Meta
}));
} catch (error) {
console.error(`获取服务${
serviceName}实例失败:`, error);
return [];
}
}
// 获取服务实例
getService(serviceName: string, tags: string[] = []): ServiceInfo | null {
const instances = this.serviceRegistry.get(serviceName) || [];
// 筛选满足标签要求的实例
const eligibleInstances = instances.filter(instance =>
tags.every(tag => instance.tags.includes(tag))
);
if (eligibleInstances.length === 0) return null;
// 简单负载均衡:随机选择一个实例
const randomIndex = Math.floor(Math.random() * eligibleInstances.length);
return eligibleInstances[randomIndex];
}
}
// MCP资源服务
class ResourceMcpService {
private server: McpServer;
private app = express();
private mongoClient: MongoClient;
private serviceId: string;
private discovery: ServiceDiscovery;
private messageBroker: amqp.Connection;
private channel: amqp.Channel;
constructor(config) {
this.server = new McpServer({
name: "mcp-resource-service",
description: "MCP资源微服务",
version: "1.0.0"
});
this.serviceId = uuidv4();
this.discovery = new ServiceDiscovery(config.consulUrl);
// 初始化Express中间件
this.initializeExpress();
// 连接MongoDB
this.connectMongo(config.mongoUrl);
// 连接消息代理
this.connectMessageBroker(config.rabbitMqUrl);
// 注册服务
this.registerService(config.consulUrl);
}
// 初始化Express
initializeExpress() {
this.app.use(express.json());
// 资源API端点
this.app.get('/resources/:name', this.getResource.bind(this));
this.app.post('/resources/:name', this.createResource.bind(this));
this.app.put('/resources/:name/:id', this.updateResource.bind(this));
this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));
// 健康检查端点
this.app.get('/health', (req, res) => {
res.status(200).json({
status: 'healthy', serviceId: this.serviceId });
});
// 启动服务器
const port = process.env.PORT || 3000;
this.app.listen(port, () => {
console.log(`资源服务运行在端口 ${
port}`);
});
}
// 连接MongoDB
async connectMongo(mongoUrl: string) {
try {
this.mongoClient = new MongoClient(mongoUrl);
await this.mongoClient.connect();
console.log('已连接到MongoDB');
} catch (error) {
console.error('MongoDB连接失败:', error);
process.exit(1);
}
}
// 连接消息代理
async connectMessageBroker(rabbitMqUrl: string) {
try {
this.messageBroker = await amqp.connect(rabbitMqUrl);
this.channel = await this.messageBroker.createChannel();
// 声明交换机和队列
await this.channel.assertExchange('mcp-events', 'topic', {
durable: true });
const {
queue } = await this.channel.assertQueue(
`resource-service-${
this.serviceId}`,
{
exclusive: true }
);
// 绑定感兴趣的主题
await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');
// 消费消息
this.channel.consume(queue, (msg) => {
if (msg) {
this.handleMessage(msg);
this.channel.ack(msg);
}
});
console.log('已连接到消息代理');
} catch (error) {
console.error('消息代理连接失败:', error);
}
}
// 处理消息
handleMessage(msg: amqp.ConsumeMessage) {
try {
const content = JSON.parse(msg.content.toString());
const routingKey = msg.fields.routingKey;
console.log(`收到消息: ${
routingKey}`, content);
// 根据路由键处理不同类型的消息
switch (routingKey) {
case 'resource.created':
this.handleResourceCreated(content);
break;
case 'resource.updated':
this.handleResourceUpdated(content);
break;
case 'resource.deleted':
this.handleResourceDeleted(content);
break;
}
} catch (error) {
console.error('处理消息错误:', error);
}
}
// 发布事件
publishEvent(routingKey: string, data: any) {
this.channel.publish(
'mcp-events',
routingKey,
Buffer.from(JSON.stringify({
...data,
serviceId: this.serviceId,
timestamp: Date.now()
}))
);
}
// API处理器
async getResource(req, res) {
try {
const {
name } = req.params;
const filter = req.query.filter ? JSON.parse(req.query.filter) : {
};
const db = this.mongoClient.db('mcp-resources');
const collection = db.collection(name);
const resources = await collection.find(filter).toArray();
res.json({
success: true, data: resources });
} catch (error) {
res.status(500).json({
success: false,
error: error.message
});
}
}
async createResource(req, res) {
try {
const {
name } = req.params;
const resourceData = req.body;
const db = this.mongoClient.db('mcp-resources');
const collection = db.collection(name);
const result = await collection.insertOne({
...resourceData,
_id: uuidv4(),
createdAt: new Date(),
updatedAt: new Date()
});
// 发布资源创建事件
this.publishEvent('resource.created', {
resourceName: name,
resourceId: result.insertedId,
data: resourceData
});
res.status(201).json({
success: true,
data: {
id: result.insertedId }
});
} catch (error) {
res.status(500).json({
success: false,
error: error.message
});
}
}
// 其他API处理器...
// 注册服务到Consul
async registerService(consulUrl: string) {
try {
await axios.put(`${
consulUrl}/v1/agent/service/register`, {
ID: this.serviceId,
Name: 'mcp-resource-service',
Tags: ['mcp', 'resource'],
Address: process.env.SERVICE_HOST || 'localhost',
Port: parseInt(process.env.PORT || '3000'),
Check: {
HTTP: `http://${
process.env.SERVICE_HOST || 'localhost'}:${
process.env.PORT || '3000'}/health`,
Interval: '15s',
Timeout: '5s'
}
});
console.log(`服务已注册,ID: ${
this.serviceId}`);
// 注册服务注销钩子
process.on('SIGINT', () => this.deregisterService(consulUrl));
process.on('SIGTERM', () => this.deregisterService(consulUrl));
} catch (error) {
console.error('服务注册失败:', error);
}
}
// 注销服务
async deregisterService(consulUrl: string) {
try {
await axios.put(`${
consulUrl}/v1/agent/service/deregister/${
this.serviceId}`);
console.log('服务已注销');
process.exit(0);
} catch (error) {
console.error('服务注销失败:', error);
process.exit(1);
}
}
}
多服务器架构为MCP应用提供了更高的可用性、可伸缩性和灵活性,但同时也带来了系统复杂度的提升。在后续章节中,我们将深入探讨如何应对这些挑战,构建稳健的多服务器MCP系统。
2. 服务发现与路由
在多服务器MCP架构中,服务发现和路由机制是确保系统高效运行的关键组件。它们使客户端能够动态找到可用的服务实例,并将请求路由到最合适的服务器上。
2.1 服务注册与发现机制
服务发现的核心是让服务消费者能够在不需要硬编码服务提供者地址的情况下,动态找到并调用所需的服务。在MCP系统中,服务发现机制通常涉及以下几个关键组件:
2.1.1 服务注册表
服务注册表是服务发现的核心数据存储,记录了所有可用服务实例的关键信息:
// 服务注册表接口
interface ServiceRegistry {
// 注册服务实例
register(instance: ServiceInstance): Promise<void>;
// 注销服务实例
deregister(instanceId: string): Promise<void>;
// 获取指定服务的所有实例
getInstances(serviceName: string): Promise<ServiceInstance[]>;
// 查询符合特定条件的服务实例
findServices(query: ServiceQuery): Promise<ServiceInstance[]>;
// 监听服务变更
watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}
// 服务实例数据结构
interface ServiceInstance {
id: string; // 实例唯一标识
serviceName: string; // 服务名称
host: string; // 主机地址
port: number; // 端口号
status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 实例状态
metadata: Record<string, string>; // 元数据,如版本、环境等
healthCheckUrl?: string; // 健康检查URL
registrationTime: number; // 注册时间
lastUpdateTime: number; // 最后更新时间
}
2.1.2 基于MCP实现的分布式服务注册表
以下是一个使用MCP服务器实现的分布式服务注册表示例:
import {
McpServer } from '@modelcontextprotocol/sdk';
import {
createClient } from 'redis';
import express from 'express';
// 基于Redis的MCP服务注册表实现
class McpServiceRegistry implements ServiceRegistry {
private redisClient;
private readonly KEY_PREFIX = 'mcp:registry:';
private readonly TTL = 60; // 服务记录过期时间(秒)
constructor(redisUrl: string) {
this.redisClient = createClient({
url: redisUrl });
this.redisClient.connect();
}
// 注册服务实例
async register(instance: ServiceInstance): Promise<void> {
const key = `${
this.KEY_PREFIX}${
instance.serviceName}:${
instance.id}`;
const now = Date.now();
// 更新注册时间或最后更新时间
instance.lastUpdateTime = now;
if (!instance.registrationTime) {
instance.registrationTime = now;
}
// 将服务实例信息存储到Redis
await this.redisClient.set(key, JSON.stringify(instance), {
EX: this.TTL });
// 添加到服务名称集合中,方便按服务名查询
await this.redisClient.sAdd(`${
this.KEY_PREFIX}${
instance.serviceName}`, instance.id);
}
// 注销服务实例
async deregister(instanceId: string): Promise<void> {
// 获取服务实例信息
const pattern = `${
this.KEY_PREFIX}*:${
instanceId}`;
const keys = await this.redisClient.keys(pattern);
if (keys.length > 0) {
const key = keys[0];
const serviceName = key.split(':')[2];
// 从Redis中删除服务实例
await this.redisClient.del(key);
// 从服务名称集合中移除
await this.redisClient.sRem(`${
this.KEY_PREFIX}${
serviceName}`, instanceId);
}
}
// 获取指定服务的所有实例
async getInstances(serviceName: string): Promise<ServiceInstance[]> {
const serviceKey = `${
this.KEY_PREFIX}${
serviceName}`;
const instanceIds = await this.redisClient.sMembers(serviceKey);
if (instanceIds.length === 0) {
return [];
}
// 批量获取所有服务实例信息
const keys = instanceIds.map(id => `${
this.KEY_PREFIX}${
serviceName}:${
id}`);
const instancesData = await this.redisClient.mGet(keys);
// 过滤并解析有效的实例数据
return instancesData
.filter(Boolean)
.map(data => JSON.parse(data))
.filter(instance => instance.status !== 'DOWN');
}
// 查询符合特定条件的服务实例
async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {
const {
serviceName, status, metadata } = query;
// 获取所有匹配服务名的实例
const instances = await this.getInstances(serviceName);
// 应用过滤条件
return instances.filter(instance => {
// 状态过滤
if (status && instance.status !== status) {
return false;
}
// 元数据过滤
if (metadata) {
for (const [key, value] of Object.entries(metadata)) {
if (instance.metadata[key] !== value) {
return false;
}
}
}
return true;
});
}
// 监听服务变更
watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {
// 创建订阅客户端
const subClient = this.redisClient.duplicate();
// 订阅服务变更事件
subClient.subscribe(`${
this.KEY_PREFIX}${
serviceName}:changes`, (message) => {
this.getInstances(serviceName).then(callback);
});
}
}
// 服务注册表API服务器
function createRegistryServer(registry: McpServiceRegistry): express.Express {
const app = express();
app.use(express.json());
// 服务注册端点
app.post('/services', async (req, res) => {
try {
const instance = req.body as ServiceInstance;
await registry.register(instance);
res.status(201).json({
success: true, message: '服务实例已注册' });
} catch (error) {
res.status(500).json({
success: false, error: error.message });
}
});
// 服务注销端点
app.delete('/services/:instanceId', async (req, res) => {
try {
await registry.deregister(req.params.instanceId);
res.status(200).json({
success: true, message: '服务实例已注销' });
} catch (error) {
res.status(500).json({
success: false, error: error.message });
}
});
// 获取服务实例端点
app.get('/services/:serviceName', async (req, res) => {
try {
const instances = await registry.getInstances(req.params.serviceName);
res.status(200).json({
success: true, data: instances });
} catch (error) {
res.status(500).json({
success: false, error: error.message });
}
});
// 服务查询端点
app.get('/services', async (req, res) => {
try {
const query = {
serviceName: req.query.serviceName as string,
status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',
metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined
};
const instances = await registry.findServices(query);
res.status(200).json({
success: true, data: instances });
} catch (error) {
res.status(500).json({
success: false, error: error.message });
}
});
return app;
}
// 使用MCP服务器封装服务注册表
class McpRegistryServer extends McpServer {
private registry: McpServiceRegistry;
private httpServer: express.Express;
constructor(config: {
name: string;
description: string;
version: string;
redisUrl: string;
port: number;
}) {
super({
name: config.name,
description: config.description,
version: config.version
});
// 创建服务注册表
this.registry = new McpServiceRegistry(config.redisUrl);
// 创建HTTP服务器
this.httpServer = createRegistryServer(this.registry);
// 启动HTTP服务器
this.httpServer.listen(config.port, () => {
console.log(`注册表服务器运行在端口 ${
config.port}`);
});
// 作为MCP资源暴露服务注册表
this.exposeRegistryAsResource();
}
// 将注册表作为MCP资源暴露
private exposeRegistryAsResource() {
// 定义服务实例资源模板
this.registerResourceTemplate({
name: "serviceInstances",
description: "MCP服务实例资源",
schema: {
type: "object",
properties: {
id: {
type: "string" },
serviceName: {
type: "string" },
host: {
type: "string" },
port: {
type: "number" },
status: {
type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },
metadata: {
type: "object" },
healthCheckUrl: {
type: "string" },
registrationTime: {
type: "number" },
lastUpdateTime: {
type: "number" }
},
required: ["id", "serviceName", "host",