极简MCP实现示例

MCP(Model Context Protocol)已经成为当前人工智能领域热议的焦点之一,关于它的讨论无处不在。简而言之,MCP为AI应用程序提供类似于USB-C的接口,它提供了一种标准化的方法,使得AI应用程序能够连接到各种不同的数据源和工具。

MCP采用客户端-服务器架构,应用程序通过这种架构能够连接到多个服务器。其架构示意如图 1所示。

图 1 MCP架构示意图

本文聚焦于如何通过MCP实现AI应用程序对外部数据源或工具的访问,示例AI应用程序将通过调用一个自主实现的MCP Server提供的天气信息的查询能力实现获得指定地区的天气信息的功能。

1.环境准备

开发基于MCP的AI应用程序,首先当然需要安装MCP相关的环境。本文示例程序采用python实现(MCP官方提供Python、TypeScript、Java、Kotlin及C#的SDK),因而需要通过以下命令安装MCP相关组件包。

pip install -U mcp

本示例程序使用的模型选择DeepSeek,因而需要安装OpenAI相关组件包。如果选用其他模型,则需安装对应的SDK。

pip install -U openai

提供天气信息数据服务的平台很多,通常服务以Restful API或其他Web API形式提供,因而在示例程序中需要通过http协议访问API,如果环境中还未安装相关支持包,则可执行以下指令安装。

pip install -U httpx

至此,示例程序的环境准备完毕。当然在实际的环境准备过程中,可将上述过程整合在一条指令中执行。

2.MCP Server实现

如前所述,本示例程序中的MCP Server基于第三方的Web API服务提供天气信息查询能力,因而有必要先对http的访问进行封装。

from typing import Any
import httpx

async def get_request(url: str) -> dict[str, Any] | None:
    """
    异步请求URL并获取响应。

    Args:
        url (str): 要请求的URL地址。

    Returns:
        dict[str, Any] | None: 如果请求成功并返回JSON数据,则返回该数据;否则返回None。

    """
    # 设置请求头
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    # 异步HTTP客户端
    async with httpx.AsyncClient() as client:
        try:
            # 发送GET请求
            response = await client.get(url, headers=headers, timeout=30.0)
            # 检查响应状态码
            response.raise_for_status()
            # 返回响应的JSON数据
            return response.json()
        except Exception:
            # 发生异常时返回None
            return None

上述代码中的USER_AGENT是一个常量,可适当设定,例如可设定为”weather-app/1.0″。
随后即可利用上述封装函数访问天气服务API并实现MCP Server中提供天气信息查询的工具。本示例程序选择使用高德开放平台提供的天气信息查询API(其实高德开放平台已经提供了其MCP Server,在实际应用中可以直接使用),因而需要开发者先申请一个API KEY并将该API KEY保存于常量AMAP_KEY之中,或者也可以保存于系统环境变量或.env文件中。
基于MCP实现提供各种能力的工具,仅需在工具实现函数上方添加装饰器“@mcp.tool()”即可。示例MCP Server通过get_forecast函数提供天气信息查询能力,因而需要在get_forecast函数的实现代码上方添加装饰器“@mcp.tool()”,其实现代码如下。

from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather")

@mcp.tool()
async def get_forecast(adcode:str) -> str:
    """取得参数所代表的城市的天气信息.

    Args:
        adcode: 城市行政编码
    """
    # 高德天气获取API
    amap_url = f"""https://restapi.amap.com/v3/weather/weatherInfo?extensions=all&key={AMAP_KEY}&city={adcode}"""

    forecast_data = await get_request(amap_url)
    print(forecast_data)

    if not forecast_data:
        return "未能取得天气信息。"

    return forecast_data

装饰器“@mcp.tool()”将普通函数转换为MCP Server可提供的服务工具。但在应用程序选择调用该能力之前,需要了解该工具的功能以及调用该工具时应当提供哪些参数,就正如使用Function Calling时需要向模型仔细描述函数的功能与参数信息一样,在MCP中,这些信息通过“函数注释”的方式获得,python程序的函数注释通过在函数名称下方由一对三引号“””””包含的内容构成,该内容应先说明函数的功能,然后通过Args说明每一个参数的含义。
此外,如果需要为工具提供更多信息,可以通过在tool中添加注解的方式实现。
至此,具备查询天气信息工具的MCP Server实现完毕,如果需要为MCP Server添加更多工具,可参照上述get_forecast工具实现。
所有工具实现完成后,就可以通过run方法启动MCP Server了。

if __name__ == "__main__":
    print("Starting server...")
    # Initialize and run the server
    mcp.run(transport='stdio')

3.MCP Client及MCP Host实现

AI应用使用MCP则是借助于MCP Client实现,而AI应用本身或其组件则可视为MCP Host。
MCP Client端需要访问大模型,本示例程序选用DeepSeek,其API Key和Base Url存储于与源代码文件同一目录下的文件.env中并在代码中通过函数load_dotenv()加载,.env内容如下,如果选用其他模型服务则需进行相应替换。

OPENAI_API_KEY="sk-XXXXXXXX"
OPENAI_BASE_URL="https://api.deepseek.com/v1"

MCP Client的功能实现被封装于类MCPClient中。在定义该Python类之前,可先行导入需要用到的包并加载环境变量。

import sys
import json
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv

_ = load_dotenv() 

类MCPClient首先需要通过init方法为类实例初始化提供支持,init方法除初始化模型客户端(或称模型本地代理)及指定待使用的模型编码外,同时对MCP Client与MCP Server实现异步工具调用所需的会话及栈也分别依次进行了类型定义和实例化。

    def __init__(self):
        """
        初始化MCPClient实例。
        """
        # 初始化会话和客户端对象
        try:
            # 初始化会话对象
            self.session: Optional[ClientSession] = None
            # 初始化退出栈对象
            self.exit_stack = AsyncExitStack()
            # 初始化LLM对象
            self.llm = OpenAI()
            # 初始化模型编码
            self.model = "deepseek-chat"
        except Exception as e:
            # 捕获异常并打印错误信息
            print(f"初始化MCPClient异常: {e}")

随后在类MCPClient中定义connect_to_server方法,该方法实现与MCP Server之间建立连接。连接建立的过程大致是先读取MCP Server的脚本,以便stdio_client可以获得服务相关的参数;然后在栈中创建stdio并依赖该stdio创建MCP Client与MCP Server之间的会话对象;最后调用会话对象的initialize方法真正实现MCP Client与MCP Server之间的连接并初始化会话。
本示例采用python完成,故connect_to_server方法中仅允许传入的MCP Server脚本文件类型为“.py”,在实际应用中,应根据MCP Server脚本实际采用的语言进行调整。例如如果MCP Server基于Node.js编写,则应允许脚本文件类型为“.js”。

    async def connect_to_server(self, server_script_path: str):
        """
        异步连接MCP服务。

        Args:
            server_script_path (str): MCP Server脚本文件路径。

        Returns:
            无返回值。

        Raises:
            ValueError: 如果提供的MCP Server脚本文件不是Python文件,则引发此异常。

        """
        # 检查文件是否为Python文件
        is_python = server_script_path.endswith('.py')

        # 如果不是Python文件,则抛出异常
        if not (is_python):
            raise ValueError("MCP Server脚本不是python文件")
            
        # 设置要执行的命令
        command = "python"
    
        # 设置服务器参数
        server_params = StdioServerParameters(
            command=command,  # 命令
            args=[server_script_path],  # 参数
            env=None  # 环境变量
        )
        
        # 创建stdio传输
        stdio_transport = await self.exit_stack.enter_async_context(
            stdio_client(server_params)
        )
    
        # 将stdio传输设置为对象的属性
        self.stdio, self.write = stdio_transport
    
        # 创建会话
        self.session = await self.exit_stack.enter_async_context(
            ClientSession(self.stdio, self.write)
        )
        
        # 初始化会话
        await self.session.initialize()
    
        # 列出有效工具
        response = await self.session.list_tools()
        tools = response.tools
        print("\nMCP Server提供的工具:", [tool.name for tool in tools])

上述代码的最后列出了MCP Server所提供的有效工具,在建立MCP Client与MCP Server之间连接时它并不是必需的,只是示例程序基于调试与演示目的而添加的代码。
类MCPClient的方法process_query则实现了对用户消息的真正回应,于本示例程序而言,是对用户指定地区的天气信息查询。其实现逻辑与Function Calling的实现基本相同,只不过在与模型交互前需要先取得MCP Server所提供的工具列表,并将之转换成模型交互接口所需要的形式。随后与模型的交互及对工具的调用过程处理则与Function Calling的实现完全相同。

    async def process_query(self, query: str) -> str:
        """
        使用MCP Server异步处理查询请求。

        Args:
            query (str): 用户输入的查询语句。

        Returns:
            str: 处理后的回复文本。

        """
        # 创建包含用户查询的消息列表
        messages = [
            {
                "role": "user",
                "content": query
            }
        ]

        # 获取工具集
        response = await self.session.list_tools()    
        # print(f"工具集:\n{response.tools}")

        # 将工具集转换为适合大模型使用的格式
        available_tools = [{ "type": "function",
            "function": {
            "name": tool.name,
            "description": tool.description,
            "input_schema": tool.inputSchema
        }} for tool in response.tools]   
        # print(f"转换后的工具集:\n{available_tools}")

        # 调用大模型
        response = self.llm.chat.completions.create(
            model=self.model,
            max_tokens=1000,
            messages=messages,
            tools=available_tools
        )
        # print(f"response=\n{response}")
        
        # 处理大模型返回信息
        tool_results = []
        final_text = []

        for choice in response.choices:
            # 处理文本回复
            if choice.finish_reason == 'stop':
                final_text.append(choice.message.content)
            
            # 处理工具调用
            elif choice.finish_reason == 'tool_calls':
                # 添加assistant消息,包含tool_calls
                assistant_message = {
                    "role": "assistant",
                    "content": choice.message.content,
                    "tool_calls": [
                        {
                            "id": tool.id,
                            "type": "function",
                            "function": {
                                "name": tool.function.name,
                                "arguments": tool.function.arguments
                            }
                        } for tool in choice.message.tool_calls
                    ]
                }
                messages.append(assistant_message)
                
                # 处理每个工具调用
                for tool in choice.message.tool_calls:
                    tool_name = tool.function.name
                    tool_args = json.loads(tool.function.arguments)
                    tool_id = tool.id
                
                    # 执行工具调用
                    result = await self.session.call_tool(tool_name, tool_args)
                    # print(f"工具调用结果:\n{result}")
                    tool_results.append({"call": tool_name, "result": result})
                    final_text.append(f"[使用参数{tool_args}调用工具{tool_name}]\n")

                    # 添加tool消息响应
                    messages.append({
                        "role": "tool",
                        "content": str(result),
                        "tool_call_id": tool_id,
                        "name": tool_name
                    })

                # 获取模型的下一个响应
                response = self.llm.chat.completions.create(
                    model=self.model,
                    max_tokens=1000,
                    messages=messages,
                )
                final_text.append(response.choices[0].message.content)

        return "\n".join(final_text)

process_query方法提供了与模型及工具进行交互的实现,因而MCP Host可以通过调用它实现相关意图,也就是说process_query方法是MCP Host与MCP Client之间的桥梁。
最后,cleanup方法实现了在退出会话后清理MCPClient资源的支持。

    async def cleanup(self):
        """释放资源"""
        await self.exit_stack.aclose()

至此,MCP Client全部实现完成。
MCP的完整实现除MCP Server与MCP Client外,还需要有真正应用它的MCP Host,也就是AI应用程序,基于上述MCP Server与MCP Client一个简单MCP Host实现如下,它构建了一个对话循环,并通过rich在命令行窗口中实现对markdown格式返回文本的渲染。

async def main():
    print("\n启动MCP Host!")
    # 检查命令行参数数量
    if len(sys.argv) < 2:
        # 打印用法信息
        print("用法: python client.py <path_to_server_script>")
        # 退出程序并返回错误码
        sys.exit(1)

    print("\n启动MCP Client!")    
    # 创建MCPClient实例
    client = MCPClient()
    try:
        # 连接到服务器
        await client.connect_to_server(sys.argv[1])
        
        while True:
            try:
                query = input("\n问题(输入'quit'退出): ").strip()
                
                if query.lower() == 'quit':
                    break
                    
                response = await client.process_query(query)
                # 输出查询结果
                import rich
                from rich.markdown import Markdown
                md_content = Markdown(response)
                rich.print(md_content)
                    
            except Exception as e:
                print(f"\n发生异常: {str(e)}")
    finally:
        # 清理资源
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

4.运行效果

全部代码编写完成后,即可运行查看示例程序效果。
首先需要先运行MCP Server,在源文件目录下执行以下命令即可:

python weather.py

然后,运行MCP Host,在源文件目录下执行以下命令:

python client.py weather.py

在MCP Host的交互界面中输入希望查询天气信息的地区,稍候片刻即可获得反馈,如下图所示。

图 2 基于MCP实现的天气信息查询效果

最后需要补充说明的一个与MCP本身无关的细节,高德开放平台提供的查询天气信息接口的参数adcode所需要的是待查询城市的行政编码(经实测,直接传入城市名称也可正常查询),而上述运行过程表明,模型已经内化了城市行政编码的知识,在我们要求查询“北京天气”时,它知道“北京”的行政编码为“110000”,因而在调用工具时它自动将“110000”作为参数传递。