添加MCP服务器全局工作目录配置

This commit is contained in:
FuQuan233 2026-06-29 10:23:21 +08:00
parent 38af060cb2
commit b6af4ec334
4 changed files with 254 additions and 224 deletions

View file

@ -349,261 +349,266 @@ async def process_messages(context_id: int, is_group: bool = True):
logger.info(
f"开始处理{chat_type}消息 {context_type}{context_id} 当前队列长度:{state.queue.qsize()}"
)
while not state.queue.empty():
event = await state.queue.get()
if is_group:
logger.debug(f"从队列获取消息 群号:{context_id} 消息ID{event.message_id}")
group_id = context_id
else:
logger.debug(f"从队列获取消息 用户:{context_id} 消息ID{event.message_id}")
group_id = None
past_events_snapshot = []
mcp_client = MCPClient.get_instance(plugin_config.mcp_servers)
try:
# 构建系统提示,分成多行以满足行长限制
chat_type = "群聊" if is_group else "私聊"
bot_names = "".join(list(driver.config.nickname))
default_prompt = (state.group_prompt) or plugin_config.default_prompt
system_lines = [
f"我想要你帮我在{chat_type}中闲聊,大家一般叫你{bot_names}",
"我将会在后面的信息中告诉你每条信息的发送者和发送时间,你可以直接称呼发送者为他对应的昵称。",
"你的回复需要遵守以下几点规则:",
"- 你可以使用多条消息回复,每两条消息之间使用<botbr>分隔,<botbr>前后不需要包含额外的换行和空格。",
"- 除<botbr>外,消息中不应该包含其他类似的标记。",
"- 不要使用markdown或者html聊天软件不支持解析换行请用换行符。",
"- 你应该以普通人的方式发送消息,每条消息字数要尽量少一些,应该倾向于使用更多条的消息回复。",
"- 代码则不需要分段,用单独的一条消息发送。",
"- 请使用发送者的昵称称呼发送者,你可以礼貌地问候发送者,但只需要在"
"第一次回答这位发送者的问题时问候他。",
"- 你有引用某条消息的能力,使用[CQ:reply,id=消息id]来引用。",
"- 如果有多条消息,你应该优先回复提到你的,一段时间之前的就不要回复了,也可以直接选择不回复。",
"- 如果你选择完全不回复,你只需要直接输出一个<botbr>。",
"- 如果你需要思考的话,你应该尽量少思考,以节省时间。",
]
try:
while not state.queue.empty():
event = await state.queue.get()
if is_group:
system_lines += [
"- 你有at群成员的能力只需要在某条消息中插入[CQ:at,qq=QQ号]"
"也就是CQ码。at发送者是非必要的你可以根据你自己的想法at某个人。",
logger.debug(f"从队列获取消息 群号:{context_id} 消息ID{event.message_id}")
group_id = context_id
else:
logger.debug(f"从队列获取消息 用户:{context_id} 消息ID{event.message_id}")
group_id = None
past_events_snapshot = []
mcp_client = MCPClient.get_instance(
plugin_config.mcp_servers,
plugin_config.mcp_server_cwd,
)
try:
# 构建系统提示,分成多行以满足行长限制
chat_type = "群聊" if is_group else "私聊"
bot_names = "".join(list(driver.config.nickname))
default_prompt = (state.group_prompt) or plugin_config.default_prompt
system_lines = [
f"我想要你帮我在{chat_type}中闲聊,大家一般叫你{bot_names}",
"我将会在后面的信息中告诉你每条信息的发送者和发送时间,你可以直接称呼发送者为他对应的昵称。",
"你的回复需要遵守以下几点规则:",
"- 你可以使用多条消息回复,每两条消息之间使用<botbr>分隔,<botbr>前后不需要包含额外的换行和空格。",
"- 除<botbr>外,消息中不应该包含其他类似的标记。",
"- 不要使用markdown或者html聊天软件不支持解析换行请用换行符。",
"- 你应该以普通人的方式发送消息,每条消息字数要尽量少一些,应该倾向于使用更多条的消息回复。",
"- 代码则不需要分段,用单独的一条消息发送。",
"- 请使用发送者的昵称称呼发送者,你可以礼貌地问候发送者,但只需要在"
"第一次回答这位发送者的问题时问候他。",
"- 你有引用某条消息的能力,使用[CQ:reply,id=消息id]来引用。",
"- 如果有多条消息,你应该优先回复提到你的,一段时间之前的就不要回复了,也可以直接选择不回复。",
"- 如果你选择完全不回复,你只需要直接输出一个<botbr>。",
"- 如果你需要思考的话,你应该尽量少思考,以节省时间。",
]
system_lines += [
"下面是关于你性格的设定,如果设定中提到让你扮演某个人,或者设定中有提到名字,则优先使用设定中的名字。",
default_prompt,
]
if is_group:
system_lines += [
"- 你有at群成员的能力只需要在某条消息中插入[CQ:at,qq=QQ号]"
"也就是CQ码。at发送者是非必要的你可以根据你自己的想法at某个人。",
]
systemPrompt = "\n".join(system_lines)
if preset.support_mcp:
systemPrompt += "\n你也可以使用一些工具,下面是关于这些工具的额外说明:\n"
for mcp_name, mcp_config in plugin_config.mcp_servers.items():
if mcp_config.additional_prompt:
systemPrompt += f"{mcp_name}{mcp_config.additional_prompt}"
systemPrompt += "\n"
system_lines += [
"下面是关于你性格的设定,如果设定中提到让你扮演某个人,或者设定中有提到名字,则优先使用设定中的名字。",
default_prompt,
]
logger.debug(f"构建系统提示词:\n{systemPrompt}")
systemPrompt = "\n".join(system_lines)
if preset.support_mcp:
systemPrompt += "\n你也可以使用一些工具,下面是关于这些工具的额外说明:\n"
for mcp_name, mcp_config in plugin_config.mcp_servers.items():
if mcp_config.additional_prompt:
systemPrompt += f"{mcp_name}{mcp_config.additional_prompt}"
systemPrompt += "\n"
messages: list[ChatCompletionMessageParam] = [
{"role": "system", "content": systemPrompt}
]
logger.debug(f"构建系统提示词:\n{systemPrompt}")
while len(state.history) > 0 and state.history[0]["role"] != "user":
state.history.popleft()
messages: list[ChatCompletionMessageParam] = [
{"role": "system", "content": systemPrompt}
]
messages += list(state.history)[-plugin_config.history_size * 2 :]
while len(state.history) > 0 and state.history[0]["role"] != "user":
state.history.popleft()
# 没有未处理的消息说明已经被处理了,跳过
if state.past_events.__len__() < 1:
break
messages += list(state.history)[-plugin_config.history_size * 2 :]
content: list[ChatCompletionContentPartParam] = []
# 没有未处理的消息说明已经被处理了,跳过
if state.past_events.__len__() < 1:
break
# 将机器人错过的消息推送给LLM
past_events_snapshot = list(state.past_events)
state.past_events.clear()
for ev in past_events_snapshot:
text_content = format_message(ev)
content.append({"type": "text", "text": text_content})
content: list[ChatCompletionContentPartParam] = []
# 将消息中的图片转成 base64
if preset.support_image:
base64_images = await process_images(ev)
for base64_image in base64_images:
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}})
# 将机器人错过的消息推送给LLM
past_events_snapshot = list(state.past_events)
state.past_events.clear()
for ev in past_events_snapshot:
text_content = format_message(ev)
content.append({"type": "text", "text": text_content})
new_messages: list[ChatCompletionMessageParam] = [
{"role": "user", "content": content}
]
# 将消息中的图片转成 base64
if preset.support_image:
base64_images = await process_images(ev)
for base64_image in base64_images:
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}})
logger.debug(
f"发送API请求 模型:{preset.model_name} 历史消息数:{len(messages)}"
)
new_messages: list[ChatCompletionMessageParam] = [
{"role": "user", "content": content}
]
client_config = {
"model": preset.model_name,
"max_tokens": preset.max_tokens,
"temperature": preset.temperature,
"timeout": 60,
"extra_body": preset.extra_body,
}
logger.debug(
f"发送API请求 模型:{preset.model_name} 历史消息数:{len(messages)}"
)
if preset.support_mcp:
available_tools = await mcp_client.get_available_tools(is_group)
client_config["tools"] = available_tools
response = await client.chat.completions.create(
**client_config,
messages=messages + new_messages,
)
if response.usage is not None:
logger.debug(f"收到API响应 使用token数{response.usage.total_tokens}")
message = response.choices[0].message
# 处理响应并处理工具调用
while preset.support_mcp and message and message.tool_calls:
llm_reply: ChatCompletionMessageParam = {
"role": "assistant",
"content": message.content,
"tool_calls": [tool_call.model_dump() for tool_call in message.tool_calls]
client_config = {
"model": preset.model_name,
"max_tokens": preset.max_tokens,
"temperature": preset.temperature,
"timeout": 60,
"extra_body": preset.extra_body,
}
if preset.request_with_reasoning_content:
llm_reply["reasoning_content"] = message.reasoning_content# pyright: ignore[reportGeneralTypeIssues]
if preset.support_mcp:
available_tools = await mcp_client.get_available_tools(is_group)
client_config["tools"] = available_tools
# 发送LLM调用工具时的回复一般没有
if message.content:
await send_split_messages(handler, message.content)
# 处理每个工具调用
new_messages.append(llm_reply)
for tool_call in message.tool_calls:
logger.debug(f"处理工具调用:{tool_call.function.name} 参数:{tool_call.function.arguments}")
tool_name = tool_call.function.name
try:
tool_args = json.loads(tool_call.function.arguments)
except (json.JSONDecodeError, TypeError, ValueError) as e:
error_message = (
f"工具调用参数格式错误,无法解析 {tool_name} 的 arguments: {e!s}. "
f"原始参数: {tool_call.function.arguments}"
)
logger.warning(error_message)
new_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": error_message,
})
continue
# 发送工具调用提示
await handler.send(Message(f"正在使用{mcp_client.get_friendly_name(tool_name)}"))
if is_group:
result = await mcp_client.call_tool(
tool_name,
tool_args,
group_id=event.group_id,
bot_id=str(event.self_id)
)
else:
result = await mcp_client.call_tool(
tool_name,
tool_args,
bot_id=str(event.self_id)
)
new_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
# 将工具调用的结果交给 LLM
response = await client.chat.completions.create(
**client_config,
messages=messages + new_messages,
)
if response.usage is not None:
logger.debug(f"收到API响应 使用token数{response.usage.total_tokens}")
message = response.choices[0].message
# 安全检查:确保 message 不为 None
if not message:
logger.error("API 响应中的 message 为 None")
await handler.send(Message("服务暂时不可用,请稍后再试"))
return
# 处理响应并处理工具调用
while preset.support_mcp and message and message.tool_calls:
llm_reply: ChatCompletionMessageParam = {
"role": "assistant",
"content": message.content,
"tool_calls": [tool_call.model_dump() for tool_call in message.tool_calls]
}
reply, matched_reasoning_content = pop_reasoning_content(
message.content
)
reasoning_content: str | None = (
getattr(message, "reasoning_content", None)
or matched_reasoning_content
)
if preset.request_with_reasoning_content:
llm_reply["reasoning_content"] = message.reasoning_content # pyright: ignore[reportGeneralTypeIssues]
llm_reply: ChatCompletionMessageParam = {
"role": "assistant",
"content": reply,
}
# 发送LLM调用工具时的回复一般没有
if message.content:
await send_split_messages(handler, message.content)
reply_images = getattr(message, "images", None)
# 处理每个工具调用
new_messages.append(llm_reply)
if reply_images:
# openai的sdk里的assistant消息暂时没有images字段需要单独处理
llm_reply["images"] = reply_images # pyright: ignore[reportGeneralTypeIssues]
for tool_call in message.tool_calls:
logger.debug(f"处理工具调用:{tool_call.function.name} 参数:{tool_call.function.arguments}")
if preset.request_with_reasoning_content:
llm_reply["reasoning_content"] = reasoning_content# pyright: ignore[reportGeneralTypeIssues]
tool_name = tool_call.function.name
try:
tool_args = json.loads(tool_call.function.arguments)
except (json.JSONDecodeError, TypeError, ValueError) as e:
error_message = (
f"工具调用参数格式错误,无法解析 {tool_name} 的 arguments: {e!s}. "
f"原始参数: {tool_call.function.arguments}"
)
logger.warning(error_message)
new_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": error_message,
})
continue
new_messages.append(llm_reply)
# 发送工具调用提示
await handler.send(Message(f"正在使用{mcp_client.get_friendly_name(tool_name)}"))
# 请求成功后再保存历史记录保证user和assistant穿插防止R1模型报错
for message in new_messages:
state.history.append(message)
if is_group:
result = await mcp_client.call_tool(
tool_name,
tool_args,
group_id=event.group_id,
bot_id=str(event.self_id)
)
else:
result = await mcp_client.call_tool(
tool_name,
tool_args,
bot_id=str(event.self_id)
)
if state.output_reasoning_content and reasoning_content:
try:
bot = get_bot(str(event.self_id))
if is_group:
await bot.send_group_forward_msg(
group_id=group_id,
messages=build_reasoning_forward_nodes(
bot.self_id, reasoning_content
),
)
else:
await bot.send_private_forward_msg(
user_id=context_id,
messages=build_reasoning_forward_nodes(
bot.self_id, reasoning_content
),
)
except Exception as e:
logger.error(f"合并转发消息发送失败:\n{e!s}\n")
new_messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
assert reply is not None
await send_split_messages(handler, reply)
# 将工具调用的结果交给 LLM
response = await client.chat.completions.create(
**client_config,
messages=messages + new_messages,
)
if reply_images:
logger.debug(f"API响应 图片数:{len(reply_images)}")
for i, image in enumerate(reply_images, start=1):
logger.debug(f"正在发送第{i}张图片")
image_base64 = image["image_url"]["url"].removeprefix("data:image/png;base64,")
image_msg = MessageSegment.image(base64.b64decode(image_base64))
await handler.send(image_msg)
message = response.choices[0].message
except Exception as e:
logger.opt(exception=e).error(f"API请求失败 {'群号' if is_group else '用户'}{context_id}")
# 如果在处理过程中出现异常恢复未处理的消息到state中
state.past_events.extendleft(reversed(past_events_snapshot))
await handler.send(Message(f"服务暂时不可用,请稍后再试\n{e!s}"))
finally:
state.processing = False
state.queue.task_done()
# 不再需要每次都清理MCPClient因为它现在是单例
# await mcp_client.cleanup()
# 安全检查:确保 message 不为 None
if not message:
logger.error("API 响应中的 message 为 None")
await handler.send(Message("服务暂时不可用,请稍后再试"))
return
reply, matched_reasoning_content = pop_reasoning_content(
message.content
)
reasoning_content: str | None = (
getattr(message, "reasoning_content", None)
or matched_reasoning_content
)
llm_reply: ChatCompletionMessageParam = {
"role": "assistant",
"content": reply,
}
reply_images = getattr(message, "images", None)
if reply_images:
# openai的sdk里的assistant消息暂时没有images字段需要单独处理
llm_reply["images"] = reply_images # pyright: ignore[reportGeneralTypeIssues]
if preset.request_with_reasoning_content:
llm_reply["reasoning_content"] = reasoning_content # pyright: ignore[reportGeneralTypeIssues]
new_messages.append(llm_reply)
# 请求成功后再保存历史记录保证user和assistant穿插防止R1模型报错
for message in new_messages:
state.history.append(message)
if state.output_reasoning_content and reasoning_content:
try:
bot = get_bot(str(event.self_id))
if is_group:
await bot.send_group_forward_msg(
group_id=group_id,
messages=build_reasoning_forward_nodes(
bot.self_id, reasoning_content
),
)
else:
await bot.send_private_forward_msg(
user_id=context_id,
messages=build_reasoning_forward_nodes(
bot.self_id, reasoning_content
),
)
except Exception as e:
logger.error(f"合并转发消息发送失败:\n{e!s}\n")
assert reply is not None
await send_split_messages(handler, reply)
if reply_images:
logger.debug(f"API响应 图片数:{len(reply_images)}")
for i, image in enumerate(reply_images, start=1):
logger.debug(f"正在发送第{i}张图片")
image_base64 = image["image_url"]["url"].removeprefix("data:image/png;base64,")
image_msg = MessageSegment.image(base64.b64decode(image_base64))
await handler.send(image_msg)
except Exception as e:
logger.opt(exception=e).error(f"API请求失败 {'群号' if is_group else '用户'}{context_id}")
# 如果在处理过程中出现异常恢复未处理的消息到state中
state.past_events.extendleft(reversed(past_events_snapshot))
await handler.send(Message(f"服务暂时不可用,请稍后再试\n{e!s}"))
finally:
state.queue.task_done()
# 不再需要每次都清理MCPClient因为它现在是单例
# await mcp_client.cleanup()
finally:
state.processing = False
# 预设切换命令

View file

@ -48,6 +48,10 @@ class ScopedConfig(BaseModel):
"你的回答应该尽量简洁、幽默、可以使用一些语气词、颜文字。你应该拒绝回答任何政治相关的问题。",
description="默认提示词",
)
mcp_server_cwd: str | None = Field(
None,
description="command类型MCP服务器的全局工作目录cwd"
)
mcp_servers: dict[str, MCPServerConfig] = Field({}, description="MCP服务器配置")
blacklist_user_ids: set[int] = Field(set(), description="黑名单用户ID列表")
ignore_prefixes: list[str] = Field(

View file

@ -18,12 +18,20 @@ class MCPClient:
_SESSION_TTL_SECONDS = 600
_SESSION_CLEANUP_INTERVAL_SECONDS = 60
def __new__(cls, server_config: dict[str, MCPServerConfig] | None = None):
def __new__(
cls,
server_config: dict[str, MCPServerConfig] | None = None,
default_command_cwd: str | None = None,
):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, server_config: dict[str, MCPServerConfig] | None = None):
def __init__(
self,
server_config: dict[str, MCPServerConfig] | None = None,
default_command_cwd: str | None = None,
):
if self._initialized:
return
@ -32,6 +40,7 @@ class MCPClient:
logger.info(f"正在初始化MCPClient单例共有{len(server_config)}个服务器配置")
self.server_config = server_config
self.default_command_cwd = default_command_cwd
self.sessions = {}
self.exit_stack = AsyncExitStack()
self._session_exit_stacks: dict[str, AsyncExitStack] = {}
@ -47,12 +56,16 @@ class MCPClient:
logger.debug("MCPClient单例初始化成功")
@classmethod
def get_instance(cls, server_config: dict[str, MCPServerConfig] | None = None):
def get_instance(
cls,
server_config: dict[str, MCPServerConfig] | None = None,
default_command_cwd: str | None = None,
):
"""获取MCPClient实例"""
if cls._instance is None:
if server_config is None:
raise ValueError("server_config must be provided for first initialization")
cls._instance = cls(server_config)
cls._instance = cls(server_config, default_command_cwd)
return cls._instance
@classmethod
@ -79,8 +92,15 @@ class MCPClient:
sse_client(url=config.url, headers=config.headers)
)
elif config.command:
stdio_params: dict[str, Any] = {
"command": config.command,
"args": config.args or [],
"env": config.env or {},
}
if self.default_command_cwd:
stdio_params["cwd"] = self.default_command_cwd
transport = await session_stack.enter_async_context(
cast(Any, stdio_client(StdioServerParameters(**config.model_dump())))
cast(Any, stdio_client(StdioServerParameters(**stdio_params)))
)
else:
raise ValueError("Server config must have either url or command")