MCP入门笔记2: MCP服务器开发流程,并以天气查询为例
文章目录
- 前言
- 三、MCP服务器Server开发流程
- 1. MCP服务器的基本功能
- 2. MCP服务器的通讯模式
- 2.1 MCP 通信模式总览
- 2.2 HTTP 与 SSE 的比较
- 2.3 本地(stdio)与远程(SSE)MCP 通信流程
- 2.3.1本地通信(stdio)
- 2.3.2远程通信(HTTP + SSE)
- 2.4 消息格式与接口定义
- 3. MCP服务器SDK基本语法规则(未完成)
- 4. 创建一个天气查询Server
- 4.1 准备工作
- 4.2 天气查询Server的创建流程
- 4.2.1 服务器依赖安装
- 4.2.2 服务器代码编写
- 四、MCP客户端Client接入服务器Server流程
- 1. 天气查询客户端client创建流程
- 1.1client.py的代码实现
- 1.2client.py测试运行
- 总结
前言
上篇博客中,笔者熟悉了MCP客户端开发流程,使用client.py成功调用本地ollama。本文继续记录学习相关内容,章节数承接上篇。
三、MCP服务器Server开发流程
1. MCP服务器的基本功能
根据MCP协议定义,Server可以提供三种类型的标准能力:Resources、Tools、Prompts,每个Server可同时提供三种类型能力或其中一种。
- Resources:资源,客户端可以读取的类似文件的数据(如 API 响应或文件内容)。
- Tools:工具,第三方服务、功能函数,LLM 可以调用的函数(经用户批准)。
- Prompts:提示词,帮助用户完成特定任务的预先编写的模板。
2. MCP服务器的通讯模式
2.1 MCP 通信模式总览
MCP 支持两种互补的传输方式:1. 标准输入输出(stdio) 2. 基于 HTTP 的服务器推送事件(SSE)
两种方式分别面向本地集成场景与分布式/远程集成场景,共同满足 LLM 客户端(Client)与 MCP 服务器(Server)之间的双向通信需求,并全部采用 JSON-RPC 2.0 作为消息封装格式
2.2 HTTP 与 SSE 的比较
说明: 虽然 HTTP/1.1 可复用连接,但每次请求仍属于独立交互;而 SSE 则在同一连接上多次发送事件,更适合实时性要求高的场景
2.3 本地(stdio)与远程(SSE)MCP 通信流程
2.3.1本地通信(stdio)
客户端将 MCP 服务器作为子进程启动,二者通过操作系统的 stdin/stdout 管道进行数据交换。
每条消息都使用 JSON-RPC 2.0 格式(包含 jsonrpc
、method
、params
、id
等字段),写入子进程的 stdin;服务器处理后将响应写入 stdout。
典型场景:桌面应用(如 Claude Desktop)在本地启动 MCP 服务器,以便模型读取本地文件或执行系统命令。
2.3.2远程通信(HTTP + SSE)
客户端通过 HTTP 向 MCP 服务器的/messages
接口发送 JSON-RPC 请求;服务器返回初始 HTTP 响应后即可马上切换到 SSE 模式。
服务器在同一 HTTP 连接上,通过 SSE 持续向客户端发送后续事件(event: message
),每个事件均为 JSON-RPC 格式的消息。
客户端可在任意时刻通过新的 HTTP 请求向/messages
发送下一个调用。
典型场景:分布式部署的 LLM 应用需要从远端数据库、API 获取实时上下文更新
2.4 消息格式与接口定义
MCP 统一使用 JSON-RPC 2.0 消息格式:
{
"jsonrpc": "2.0",
"id": 1,
"method": "someMethod",
"params": { /* 方法参数 */ }
}
3. MCP服务器SDK基本语法规则(未完成)
- a. 高级API调用规则
- b. 底层API调用规则
4. 创建一个天气查询Server
通过使用OpenWeather API,创建一个能够实时查询天气的服务器(server),并使用stdio方式进行通信。在创建之前,需要完成一个准备工作。
4.1 准备工作
第一步,从官网https://openweathermap.org/,获取API-Key。进入官网点击API,然后点击API doc;下拉可以看到sign up,最后获取API-Key。
第二步,在conda中输入代码,测试一下是否可用。
curl -s "https://api.openweathermap.org/data/2.5/weather?q=Beijing&appid='YOUR_API_KEY'&units=metric&lang=zh_cn"
图片来源:https://www.bilibili.com/video/BV1NLXCYTEbj/?p=6&share_source=copy_web&vd_source=659441755107f4432b57296f459746bb
笔者使用刚注册好的api key,遇到了错误401。查看其原因,发现这个api key必须邮箱验证之后,才能用(刚注册的时候有提示,进我垃圾邮箱了,我没注意)。。。另外需要注意的是,'YOUR_API_KEY’不要保留引号,只要数字。至于其他错误情况,我暂时没遇到过。
4.2 天气查询Server的创建流程
4.2.1 服务器依赖安装
由于我们需要使用http请求来查询天气,因此需要在当前虚拟环境中添加如下依赖:
uv add mcp httpx
4.2.2 服务器代码编写
本小节创建服务器代码,MCP基本执行流程如下:
图片来源:https://www.bilibili.com/video/BV1NLXCYTEbj/?p=7&share_source=copy_web&vd_source=659441755107f4432b57296f459746bb
参考上面的链接,server.py代码如下:
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
import json
# Initialize FastMCP server
mcp = FastMCP("weatherServer")
# API配置
NWS_API_BASE = "https://api.openweathermap.org/data/2.5/weather"
USER_AGENT = "weather-app/1.0"
API_KEY = "YOUR_API_KEY"
async def make_request(city: str) -> dict[str, Any] | None:
"""Make a request to the API with proper error handling.
从 OpenWeather API 获取天气信息。
:param city: 城市名称(需使用英文,如 Beijing)
:return: 天气数据字典;若出错返回包含 error 信息的字典
"""
params = {
"q": city,
"appid": API_KEY,
"units": "metric",
"lang": "zh_cn"
}
headers = {
"User-Agent": USER_AGENT
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(NWS_API_BASE, params=params, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
return {"error": f"HTTP 错误: {e.response.status_code}"}
except Exception as e:
return {"error": f"请求失败: {str(e)}"}
def format_weather(data: dict[str, Any] | str) -> str:
"""
将天气数据格式,转换为易读文本。
:param data: 天气数据(可以是字典或 JSON 字符串)
:return: 格式化后的天气信息字符串
"""
# 如果传入的是字符串,则先转换为字典
if isinstance(data, str):
try:
data = json.loads(data)
except Exception as e:
return f"无法解析天气数据: {e}"
# 如果数据中包含错误信息,直接返回错误提示
if"error"in data:
return f"⚠️ {data['error']}"
# 提取数据时做容错处理
city = data.get("name", "未知")
country = data.get("sys", {}).get("country", "未知")
temp = data.get("main", {}).get("temp", "N/A")
humidity = data.get("main", {}).get("humidity", "N/A")
wind_speed = data.get("wind", {}).get("speed", "N/A")
# weather 可能为空列表,因此用 [0] 前先提供默认字典
weather_list = data.get("weather", [{}])
description = weather_list[0].get("description", "未知")
return (
f"🌍 {city}, {country}
"
f"🌡 温度: {temp}°C
"
f"💧 湿度: {humidity}%
"
f"🌬 风速: {wind_speed} m/s
"
f"🌤 天气: {description}
"
)
@mcp.tool()
async def query_weather(city: str) -> str:
"""
输入指定城市的英文名字,返回今日天气查询结果
:param city: 城市名称(需使用英文)
:return: 格式化后的天气信息
"""
data = await make_request(city)
return format_weather(data)
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')
四、MCP客户端Client接入服务器Server流程
1. 天气查询客户端client创建流程
1.1client.py的代码实现
import asyncio
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from contextlib import AsyncExitStack
from typing import Optional
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
load_dotenv() # load environment variables from .env
class MCPClient:
def __init__(self):
# Initialize session and client objects
self.exit_stack = AsyncExitStack()
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("BASE_URL")
self.model = os.getenv("MODEL")
if not self.openai_api_key:
raise ValueError("openai_api_key is null, 请在.env文件中设置")
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
self.session: Optional[ClientSession] = None
async def connect_to_server(self, server_script_path: str):
"""Connect to an MCP server 连接到MCP服务器并列出可用工具
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# List available tools
response = await self.session.list_tools()
tools = response.tools
print("
Connected to server with tools:", [tool.name for tool in tools])
async def process_query(self, query: str) -> str:
"""使用LLM(如qwq:32b)处理用户query并调用可用的工具(Function Calling)"""
messages = [{"role": "user", "content": query}]
response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
}
} for tool in response.tools]
# print(available_tools)
# 调用 OpenAI API
response = self.client.chat.completions.create(
model = self.model,
messages = messages,
tools = available_tools
)
# 处理返回的内容
content = response.choices[0]
if content.finish_reason == "tool_calls":
# 如果需要使用工具,就解析工具
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# 执行工具
result = await self.session.call_tool(tool_name, tool_args)
print(f"
[Calling tool {tool_name} with args {tool_args}]
")
# 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": result.content[0].text,
"tool_call_id": tool_call.id,
})
# 将上面的结果再返回给大模型用于生产最终的结果
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
)
return response.choices[0].message.content
return content.message.content
async def chat_loop(self):
"""交互式聊天循环"""
print("
MCP Client Started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("
Query: ").strip()
if query.lower() == 'quit':
break
response = await self.process_query(query)
print("
" + response)
except Exception as e:
print(f"
Error: {str(e)}")
async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()
async def main():
if len(sys.argv) < 2:
print("Usage: python client.py " )
sys.exit(1)
client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
import sys
asyncio.run(main())
1.2client.py测试运行
进入之前创建的虚拟环境中,运行代码并提问:北京今天天气如何?结果如下。这里有两个地方要注意,第一,我这里用的模型是qwq32b,deepseek-r1:70b模型不行,不支持。第二,在server4.py文件中,别忘了给api值,这次可以有引号了。
uv run client4.py server4.py
他的回答对吗?还真对!
我百度查了一下,截图如下:
总结
本文主要记录了MCP服务器开发流程,并实现了天气查询功能。不过感觉,OpenWeather API这个天气插件很成熟了。如果想添加其他新功能,有一定挑战性。可以学习国内大厂,阿里云百炼、字节Coze等产品。