#!/usr/bin/env python3 import requests import json session_id="123456789" class MCPClient: def __init__(self, url): self.url = url self.session = requests.Session() self.session_id = None def initialize(self): """初始化 MCP 会话""" payload = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2024-11-05", "capabilities": {}, "clientInfo": { "name": "python-client", "version": "1.0.0" } } } response = self.session.post( self.url, json=payload, headers={ "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } ) self.session_id = response.headers['Mcp-Session-Id'] def call_method(self, method, params=None, request_id=None): """调用 MCP 方法""" payload = { "jsonrpc": "2.0", "id": request_id or method, "method": method } if params: payload["params"] = params headers = { "Content-Type": "application/json", "Accept": "application/json, text/event-stream" } # 如果有 session ID,添加到头部 if self.session_id: headers["Mcp-Session-Id"] = self.session_id response = self.session.post( self.url, json=payload, headers=headers ) print(response.text) return response.json() # 使用示例 if __name__ == "__main__": client = MCPClient("http://iot.gesukj.com:7999/mcp") # 初始化 print("Initializing...") init_result = client.initialize() # print(json.dumps(init_result, indent=2)) # 列出工具 print("\nListing tools...") tools_result = client.call_method("tools/list", request_id=4) print(json.dumps(tools_result, indent=2))