支持 MCP Streamable HTTP 传输协议 #28

This commit is contained in:
FuQuan233 2026-06-29 11:19:19 +08:00
parent b6af4ec334
commit bec3fda293
3 changed files with 50 additions and 7 deletions

View file

@ -168,8 +168,9 @@ LLMCHAT__MCP_SERVERS同样为一个dictkey为服务器名称value配置的
| command | stdio服务器必填 | 无 | stdio服务器MCP命令 | | command | stdio服务器必填 | 无 | stdio服务器MCP命令 |
| args | 否 | [] | stdio服务器MCP命令参数 | | args | 否 | [] | stdio服务器MCP命令参数 |
| env | 否 | {} | stdio服务器环境变量 | | env | 否 | {} | stdio服务器环境变量 |
| url | sse服务器必填 | 无 | sse服务器地址 | | url | 远程服务器必填 | 无 | 远程MCP服务器地址 |
| headers | 否 | {} | sse模式下http请求头用于认证或其他设置 | | headers | 否 | {} | 远程服务器http请求头用于认证或其他设置 |
| transport | 否 | 自动 | 远程MCP传输协议类型可选 `sse``streamable_http` ,不填则自动探测 |
以下为在 Claude.app 的MCP服务器配置基础上增加的字段 以下为在 Claude.app 的MCP服务器配置基础上增加的字段
| 配置项 | 必填 | 默认值 | 说明 | | 配置项 | 必填 | 默认值 | 说明 |
@ -255,6 +256,12 @@ LLMCHAT__MCP_SERVERS同样为一个dictkey为服务器名称value配置的
"formulahendry/mcp-server-code-runner" "formulahendry/mcp-server-code-runner"
] ]
}, },
"tavily": {
"friendly_name": "Tavily搜索",
"additional_prompt": "当你需要搜索最新的互联网信息时,请使用 tavily 工具。",
"url": "https://mcp.tavily.com/mcp/?tavilyApiKey=<your-api-key>",
"transport": "streamable_http"
}
} }
' '

View file

@ -24,8 +24,9 @@ class MCPServerConfig(BaseModel):
command: str | None = Field(None, description="stdio模式下MCP命令") command: str | None = Field(None, description="stdio模式下MCP命令")
args: list[str] | None = Field([], description="stdio模式下MCP命令参数") args: list[str] | None = Field([], description="stdio模式下MCP命令参数")
env: dict[str, str] | None = Field({}, description="stdio模式下MCP命令环境变量") env: dict[str, str] | None = Field({}, description="stdio模式下MCP命令环境变量")
url: str | None = Field(None, description="sse模式下MCP服务器地址") url: str | None = Field(None, description="远程MCP服务器地址")
headers: dict[str, str] | None = Field({}, description="sse模式下http请求头用于认证或其他设置") headers: dict[str, str] | None = Field({}, description="远程MCP服务器http请求头用于认证或其他设置")
transport: str | None = Field(None, description="远程MCP传输协议类型可选 'sse''streamable_http',默认自动检测")
# 额外字段 # 额外字段
friendly_name: str | None = Field(None, description="MCP服务器友好名称") friendly_name: str | None = Field(None, description="MCP服务器友好名称")

View file

@ -3,8 +3,10 @@ from contextlib import AsyncExitStack
from time import monotonic from time import monotonic
from typing import Any, cast from typing import Any, cast
import httpx
from mcp import ClientSession, StdioServerParameters from mcp import ClientSession, StdioServerParameters
from mcp.client.sse import sse_client from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamable_http_client
from mcp.client.stdio import stdio_client from mcp.client.stdio import stdio_client
from nonebot import logger from nonebot import logger
@ -88,9 +90,42 @@ class MCPClient:
config = self.server_config[server_name] config = self.server_config[server_name]
session_stack = AsyncExitStack() session_stack = AsyncExitStack()
if config.url: if config.url:
transport_type = config.transport
if transport_type == "streamable_http":
logger.debug(f"服务器[{server_name}]使用 streamable_http 传输协议")
http_client = await session_stack.enter_async_context(
httpx.AsyncClient(headers=config.headers or {})
)
read, write, _ = await session_stack.enter_async_context(
streamable_http_client(url=config.url, http_client=http_client)
)
transport = (read, write)
elif transport_type == "sse":
logger.debug(f"服务器[{server_name}]使用 sse 传输协议")
transport = await session_stack.enter_async_context( transport = await session_stack.enter_async_context(
sse_client(url=config.url, headers=config.headers) sse_client(url=config.url, headers=config.headers)
) )
else:
# 未指定协议,自动探测:先尝试 streamable_http失败则回退到 sse
logger.debug(f"服务器[{server_name}]未指定传输协议,开始自动探测")
probe_stack = AsyncExitStack()
try:
http_client = await probe_stack.enter_async_context(
httpx.AsyncClient(headers=config.headers or {})
)
read, write, _ = await probe_stack.enter_async_context(
streamable_http_client(url=config.url, http_client=http_client)
)
await session_stack.enter_async_context(probe_stack)
transport = (read, write)
logger.debug(f"服务器[{server_name}]自动探测成功: 使用 streamable_http 传输协议")
except Exception as e:
await probe_stack.aclose()
logger.debug(f"服务器[{server_name}]streamable_http 探测失败({e}),回退到 sse")
transport = await session_stack.enter_async_context(
sse_client(url=config.url, headers=config.headers)
)
logger.debug(f"服务器[{server_name}]自动探测成功: 使用 sse 传输协议")
elif config.command: elif config.command:
stdio_params: dict[str, Any] = { stdio_params: dict[str, Any] = {
"command": config.command, "command": config.command,