MCP 入门案例

快速入门

安装 UV 工具

1
2
3
4
5
# 安装 uv
pip install uv

# 查看安装位置
pip show uv

找到 uv.exe 所在的目录,复制路径 C:\Users\Admin\AppData\Roaming\Python\Python312\Scripts 添加系统环境变量的 path 中。

/images/posts/MCP入门案例/1.png
(图1)

dos 窗口中输入 uv

/images/posts/MCP入门案例/2.png
(图2)

MCP Server 入门案例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# 初始化项目,命令执行后,会生成一个目录 mcp-demo,里面包含一些基本的文件,删除 main.py 文件
uv init mcp-demo

cd mcp-demo

# 创建 virtual environment 并激活它
uv venv
.venv\Scripts\activate

# 安装 dependencies
uv add mcp[cli] httpx

# 创建 server 目录

# 创建我们的 server file
new-item personalInfo.py
/images/posts/MCP入门案例/3.png
(图3)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
import csv
import os

import uvicorn

# Initialize FastMCP server
mcp = FastMCP("personalInfo")

@mcp.tool()
async def get_info(name: str) -> str:
    """
    根据姓名从 CSV 文件中获取个人信息,并返回的字符串。
    如果未找到记录,则返回提示信息。
    """
    file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "personal.csv"))
    
    try:
        with open(file_path, mode="r", encoding="utf-8") as file:
            reader = csv.reader(file)
            for row in reader:
                if not row or not row[0].strip():
                    continue
                
                parts = row[0].split(":", 1)
                if len(parts) != 2:
                    print(f"格式错误,跳过该行: {row[0]}")
                    continue
                
                record_name, info = parts
                if record_name == name:
                    return info
        
        return f"未找到 {name} 的信息。"
    except FileNotFoundError:
        return "文件未找到,请检查文件路径是否正确。"
    except Exception as e:
        return f"读取文件时发生错误:{str(e)}"


if __name__ == "__main__":
    # Initialize and run the server
    print("mcp-server启动了")
    mcp.run(transport='stdio')
1
2
张三: 性别 男 年龄 18岁 爱好 喜欢爬山
小红: 性别 女 年龄 25岁 爱好 喜欢游泳

MCP Client 入门案例

我们的 clientserver 在一个项目中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 打开一个新的窗口
# 激活虚拟环境
.venv\Scripts\activate

# 安装所需的包
uv add mcp anthropic python-dotenv

# 创建 client 目录

# 创建我们的主文件
touch client.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from anthropic import Anthropic
from dotenv import load_dotenv

from openai import OpenAI
import os
import json

from mcp.client.sse import sse_client

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()

        # 使用自己的模型
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        self.base_url = os.getenv('BASE_URL')
        self.model = os.getenv('MODEL')
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)


    async def connect_to_server(self, server_script_path: str):
        """Connect to an MCP server

        Args:
            server_script_path: Path to the server script (.py or .js)
        """
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("Server script must be a .py or .js file")

        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None    
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])


    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        messages = [
            {
                "role": "system",
                "content": "你是一个智能助手,帮助用户回答问题。"},
            {
                "role": "user",
                "content": query
            }
        ]

        # 可用的工具
        response = await self.session.list_tools()
        print("可用的工具列表:", response)
        available_tools = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema  # 确保这是有效的 JSON Schema
                }
            }
            for tool in response.tools
        ]

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)
 
            # 执行工具
            result = await self.session.call_tool(function_name, function_args)
            print(f"\n\n[Calling tool {function_name} with args {function_args}]\n\n")
            
            # 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
            result_content = result.content[0].text
            messages.append(content.message.model_dump())
            messages.append({
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": result_content,
            })

            # 将上面的结果再返回给大模型用于生产最终的结果
            print("messages===>", messages)
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=available_tools
            )

            return response.choices[0].message.content.strip()
 
        return content.message.content.strip()
        
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    import sys
    asyncio.run(main())
1
2
3
BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
MODEL=xxxxxx
OPENAI_API_KEY=xxxxxx

调试

1
uv run client.py ../server/personalInfo.py
/images/posts/MCP入门案例/4.png
(图4)

调用高德 MCP

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from anthropic import Anthropic
from dotenv import load_dotenv

from openai import OpenAI
import os
import json

from mcp.client.sse import sse_client

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        self.base_url = os.getenv('BASE_URL')
        self.model = os.getenv('MODEL')
        
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)


    async def connect_to_server(self, server_script_path: str):
        # 高德
        server_params = StdioServerParameters(
            command="npx",  # 使用 npx 启动服务器
            args=[
                "-y",
                "@amap/amap-maps-mcp-server"
            ],  # npx 参数
            env={
                "AMAP_MAPS_API_KEY": "高德API KEY"  # 设置环境变量
            },
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])



    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        messages = [
            {
                "role": "system",
                "content": "你是一个智能助手,帮助用户回答问题。"},
            {
                "role": "user",
                "content": query
            }
        ]

        # 可用的工具
        response = await self.session.list_tools()
        print("可用的工具列表:", response)
        available_tools = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema  # 确保这是有效的 JSON Schema
                }
            }
            for tool in response.tools
        ]

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)
 
            # 执行工具
            result = await self.session.call_tool(function_name, function_args)
            print(f"\n\n[Calling tool {function_name} with args {function_args}]\n\n")
            # 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
            result_content = result.content[0].text
            messages.append(content.message.model_dump())
            messages.append({
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": result_content,
            })

            # 将上面的结果再返回给大模型用于生产最终的结果
            print("messages===>", messages)
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=available_tools
            )

            return response.choices[0].message.content.strip()
 
        return content.message.content.strip()
        

    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()


if __name__ == "__main__":
    import sys
    asyncio.run(main())
/images/posts/MCP入门案例/5.png
(图5)

SSE 调试

MCP Server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
import csv
import os

import uvicorn

# Initialize FastMCP server
mcp = FastMCP("personalInfo")

@mcp.tool()
async def get_info(name: str) -> str:
    """
    根据姓名从 CSV 文件中获取个人信息,并返回的字符串。
    如果未找到记录,则返回提示信息。
    """
    file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "personal.csv"))
    
    try:
        with open(file_path, mode="r", encoding="utf-8") as file:
            reader = csv.reader(file)
            for row in reader:
                if not row or not row[0].strip():
                    continue
                
                parts = row[0].split(":", 1)
                if len(parts) != 2:
                    print(f"格式错误,跳过该行: {row[0]}")
                    continue
                
                record_name, info = parts
                if record_name == name:
                    return info
        
        return f"未找到 {name} 的信息。"
    except FileNotFoundError:
        return "文件未找到,请检查文件路径是否正确。"
    except Exception as e:
        return f"读取文件时发生错误:{str(e)}"


if __name__ == "__main__":
    # Initialize and run the server
    print("mcp-server启动了")
    uvicorn.run(mcp.sse_app, host="0.0.0.0", port=8080)
/images/posts/MCP入门案例/6.png
(图6)

MCP Client

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from anthropic import Anthropic
from dotenv import load_dotenv

from openai import OpenAI
import os
import json

from mcp.client.sse import sse_client

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        self.base_url = os.getenv('BASE_URL')
        self.model = os.getenv('MODEL')
        
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)


    async def connect_to_server(self, server_script_path: str):
        stdio_transport = await self.exit_stack.enter_async_context(sse_client(server_script_path))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        await self.session.initialize()
        
        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])


    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        messages = [
            {
                "role": "system",
                "content": "你是一个智能助手,帮助用户回答问题。"},
            {
                "role": "user",
                "content": query
            }
        ]

        # 可用的工具
        response = await self.session.list_tools()
        print("可用的工具列表:", response)
        available_tools = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema  # 确保这是有效的 JSON Schema
                }
            }
            for tool in response.tools
        ]

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)
 
            # 执行工具
            result = await self.session.call_tool(function_name, function_args)
            print(f"\n\n[Calling tool {function_name} with args {function_args}]\n\n")
            # 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
            result_content = result.content[0].text
            messages.append(content.message.model_dump())
            messages.append({
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": result_content,
            })

            # 将上面的结果再返回给大模型用于生产最终的结果
            # return result_content
            print("messages===>", messages)
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=available_tools
            )

            return response.choices[0].message.content.strip()
 
        return content.message.content.strip()
        


    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()




async def main():
    if len(sys.argv) < 2:
        print("Usage: python client.py <path_to_server_script>")
        sys.exit(1)

    client = MCPClient()
    try:
        await client.connect_to_server(sys.argv[1])
        await client.chat_loop()
    finally:
        await client.cleanup()



if __name__ == "__main__":
    import sys
    asyncio.run(main())
1
uv run client.py http://127.0.0.1:8080

去除必须输入 server 文件地址

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import asyncio
from typing import Optional
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client

from anthropic import Anthropic
from dotenv import load_dotenv

from openai import OpenAI
import os
import json

from mcp.client.sse import sse_client

load_dotenv()  # load environment variables from .env

class MCPClient:
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        self.openai_api_key = os.getenv('OPENAI_API_KEY')
        self.base_url = os.getenv('BASE_URL')
        self.model = os.getenv('MODEL')
        
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)


    # async def connect_to_server(self, server_script_path: str):
    async def connect_to_server(self):
        # 高德
        server_params = StdioServerParameters(
            command="npx",  # 使用 npx 启动服务器
            args=[
                "-y",
                "@amap/amap-maps-mcp-server"
            ],  # npx 参数
            env={
                "AMAP_MAPS_API_KEY": "高德API KEY"  # 设置环境变量
            },
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))

        await self.session.initialize()

        # List available tools
        response = await self.session.list_tools()
        tools = response.tools
        print("\nConnected to server with tools:", [tool.name for tool in tools])


    async def process_query(self, query: str) -> str:
        """Process a query using Claude and available tools"""
        messages = [
            {
                "role": "system",
                "content": "你是一个智能助手,帮助用户回答问题。"},
            {
                "role": "user",
                "content": query
            }
        ]

        # 可用的工具
        response = await self.session.list_tools()
        print("可用的工具列表:", response)
        available_tools = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.inputSchema  # 确保这是有效的 JSON Schema
                }
            }
            for tool in response.tools
        ]

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=available_tools
        )

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            function_name = tool_call.function.name
            function_args = json.loads(tool_call.function.arguments)
 
            # 执行工具
            result = await self.session.call_tool(function_name, function_args)
            print(f"\n\n[Calling tool {function_name} with args {function_args}]\n\n")
            # 将模型返回的调用哪个工具数据和工具执行完成后的数据都存入messages中
            result_content = result.content[0].text
            messages.append(content.message.model_dump())
            messages.append({
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": result_content,
            })

            # 将上面的结果再返回给大模型用于生产最终的结果
            print("messages===>", messages)
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=available_tools
            )

            return response.choices[0].message.content.strip()
 
        return content.message.content.strip()
        
    async def chat_loop(self):
        """Run an interactive chat loop"""
        print("\nMCP Client Started!")
        print("Type your queries or 'quit' to exit.")

        while True:
            try:
                query = input("\nQuery: ").strip()

                if query.lower() == 'quit':
                    break

                response = await self.process_query(query)
                print("\n" + response)

            except Exception as e:
                print(f"\nError: {str(e)}")

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


async def main():
    # if len(sys.argv) < 2:
    #     print("Usage: python client.py <path_to_server_script>")
    #     sys.exit(1)

    client = MCPClient()
    try:
        # await client.connect_to_server(sys.argv[1])
        await client.connect_to_server()
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    import sys
    asyncio.run(main())
1
uv run client.py

多个 MCP 配置 Session 覆盖的情况

场景:多个 Mcp Server 配置,建立多次连接,获取到所有工具列表,将问题和工具列表给到模型,模型返回需要调用的工具,但是模型是通过 Session 调用的,多次连接,覆盖了前面的 Session,提示找不到工具。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
"""
根据配置获取工具
"""
@mcp_router.post("/getTools")
async def getTools(mcpVo:McpVo):
    client = MCPClient()
    try:
        data = json.loads(mcpVo.config)
        mcp_servers = data.get("mcpServers", {})
        tools_list = []

        for key, value in mcp_servers.items():
            entry_json = json.dumps({key: value}, ensure_ascii=False)
            tools = await client.connect_to_server(entry_json)
            tools_list.extend(tools)

        return ReturnDatas.SuccessResponse(data=jsonable_encoder(tools_list))
    finally:
        await client.cleanup()


@mcp_router.post("/test")
async def test(config: str, query: str):
    client = MCPClient()
    try:
        data = json.loads(config)
        mcp_servers = data.get("mcpServers", {})
        tools_list = []

        for key, value in mcp_servers.items():
            entry_json = json.dumps({key: value}, ensure_ascii=False)
            tools = await client.connect_to_server(entry_json)
            tools_list.extend(tools)

        print(tools_list)
        res = await client.process_query(query)
        return res
    finally:
        await client.cleanup()
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import asyncio

from typing import Optional
from contextlib import AsyncExitStack


from mcp import ClientSession, StdioServerParameters, stdio_client

from mcp.client.sse import sse_client

from openai import OpenAI
import json


# async def run_client(query: str):
#     client = MCPClient()
#     try:
#         await client.connect_to_server()
#         response = await client.process_query(query)
#         return response  # 返回处理结果
#     finally:
#         await client.cleanup()


class MCPClient:
    def __init__(self):
        self.sessions = []  # 多个 MCP 连接
        self.tool_map = {}  # 工具名 -> session 映射
        self.exit_stack = AsyncExitStack()

        self.openai_api_key = "xxxxxx"
        self.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
        self.model = "xxxxxx"

        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)

    async def connect_to_server(self, config: str):
        data = json.loads(config)
        command = self.find_key_in_dict(data, "command")

        if command is not None:
            args = self.find_key_in_dict(data, "args")
            env = self.find_key_in_dict(data, "env")
            tools, session = await self.stdio_connect_to_server(command, args, env)
        else:
            url = self.find_key_in_dict(data, "url")
            tools, session = await self.sse_connect_to_server(url)

        self.sessions.append(session)

        # 建立工具 -> session 映射
        for tool in tools:
            self.tool_map[tool.name] = session

        print("\nConnected to server with tools:", [tool.name for tool in tools])
        return tools

    async def process_query(self, query: str) -> str:
        messages = [
            {"role": "system", "content": "你是一个智能助手,帮助用户回答问题。"},
            {"role": "user", "content": query}
        ]

        all_tools = []
        for tool_name, session in self.tool_map.items():
            # 每个工具都从其对应 session 获取定义
            response = await session.list_tools()
            for tool in response.tools:
                if tool.name == tool_name:
                    all_tools.append({
                        "type": "function",
                        "function": {
                            "name": tool.name,
                            "description": tool.description,
                            "parameters": tool.inputSchema
                        }
                    })

        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=all_tools
        )

        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            # 模型返回的工具名称
            function_name = tool_call.function.name
            # 模型返回的参数
            function_args = json.loads(tool_call.function.arguments)

            # 关键:调用正确 session 的工具
            if function_name not in self.tool_map:
                raise Exception(f"Tool {function_name} not found in any session")

            # 执行工具拿到结果
            session = self.tool_map[function_name]
            result = await session.call_tool(function_name, function_args)

            print(f"\n\n[Calling tool {function_name} with args {function_args}]\n\n")
            result_content = result.content[0].text

            messages.append(content.message.model_dump())
            messages.append({
                "tool_call_id": tool_call.id,
                "role": "tool",
                "name": function_name,
                "content": result_content,
            })

            # 将上面的结果再返回给大模型用于生产最终的结果
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                tools=all_tools
            )

            return response.choices[0].message.content.strip()

        return content.message.content.strip()

    async def cleanup(self):
        """Clean up resources"""
        await self.exit_stack.aclose()


    """Stdio方式"""
    async def stdio_connect_to_server(self, command, args, env):
        if env is not None:
            server_params = StdioServerParameters(
                command=command,
                args=args,
                env=env,
            )
        else:
            server_params = StdioServerParameters(
                command=command,
                args=args,
            )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        stdio, write = stdio_transport
        session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))
        await session.initialize()
        response = await session.list_tools()
        return response.tools, session


    """SSE方式"""
    async def sse_connect_to_server(self, url):
        stdio_transport = await self.exit_stack.enter_async_context(sse_client(url))
        stdio, write = stdio_transport
        session = await self.exit_stack.enter_async_context(ClientSession(stdio, write))
        await session.initialize()
        response = await session.list_tools()
        return response.tools, session


    """递归函数来查找目标key"""
    def find_key_in_dict(self, d, target_key):
        if isinstance(d, dict):  # 如果是字典
            for key, value in d.items():
                if key == target_key:  # 找到目标 key
                    return value
                if isinstance(value, dict):  # 值是字典,递归查找
                    result = self.find_key_in_dict(value, target_key)
                    if result is not None:
                        return result
                elif isinstance(value, list):  # 值是列表,遍历列表
                    for item in value:
                        result = self.find_key_in_dict(item, target_key)
                        if result is not None:
                            return result
        elif isinstance(d, list):  # 如果是列表
            for item in d:
                result = self.find_key_in_dict(item, target_key)
                if result is not None:
                    return result
        return None

0%