From d7c4979dbfebee346f8e5b99a5aa78ae56e00f13 Mon Sep 17 00:00:00 2001 From: KawakazeNotFound Date: Fri, 7 Nov 2025 12:46:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EMCP=E6=9D=83=E9=99=90?= =?UTF-8?q?=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nonebot_plugin_llmchat/__init__.py | 8 +++-- nonebot_plugin_llmchat/config.py | 5 +++ nonebot_plugin_llmchat/mcpclient.py | 55 ++++++++++++++++++++++++++++- 3 files changed, 64 insertions(+), 4 deletions(-) diff --git a/nonebot_plugin_llmchat/__init__.py b/nonebot_plugin_llmchat/__init__.py index 811c47c..0ab0736 100755 --- a/nonebot_plugin_llmchat/__init__.py +++ b/nonebot_plugin_llmchat/__init__.py @@ -377,7 +377,7 @@ async def process_messages(context_id: int, is_group: bool = True): "- 代码则不需要分段,用单独的一条消息发送。", "- 请使用发送者的昵称称呼发送者,你可以礼貌地问候发送者,但只需要在第一次回答这位发送者的问题时问候他。", "- 你有at群成员的能力,只需要在某条消息中插入[CQ:at,qq=(QQ号)]," - "也就是CQ码。at发送者是非必要的,你可以根据你自己的想法at某个人。", + "也就是CQ码。at发送者是非必要的,如果不是必要,请不要at别人。", "- 你有引用某条消息的能力,使用[CQ:reply,id=(消息id)]来引用。", "- 如果有多条消息,你应该优先回复提到你的,一段时间之前的就不要回复了,也可以直接选择不回复。", "- 如果你选择完全不回复,你只需要直接输出一个。", @@ -476,7 +476,8 @@ async def process_messages(context_id: int, is_group: bool = True): tool_name, tool_args, group_id=event.group_id, - bot_id=str(event.self_id) + bot_id=str(event.self_id), + user_id=event.user_id ) else: # 私聊时某些工具不可用(如群操作工具),跳过这些工具 @@ -487,7 +488,8 @@ async def process_messages(context_id: int, is_group: bool = True): tool_name, tool_args, group_id=None, - bot_id=str(event.self_id) + bot_id=str(event.self_id), + user_id=event.user_id ) new_messages.append({ diff --git a/nonebot_plugin_llmchat/config.py b/nonebot_plugin_llmchat/config.py index 6ecebf1..806d0d2 100755 --- a/nonebot_plugin_llmchat/config.py +++ b/nonebot_plugin_llmchat/config.py @@ -25,6 +25,11 @@ class MCPServerConfig(BaseModel): # 额外字段 friendly_name: str | None = Field(None, description="MCP服务器友好名称") addtional_prompt: str | None = Field(None, description="额外提示词") + + # 权限控制字段 + require_admin: bool = Field(False, description="是否需要管理员权限") + admin_user_ids: list[int] = Field(default_factory=list, description="有权限的用户ID列表") + read_only: bool = Field(True, description="非管理员是否使用只读模式") class ScopedConfig(BaseModel): """LLM Chat Plugin配置""" diff --git a/nonebot_plugin_llmchat/mcpclient.py b/nonebot_plugin_llmchat/mcpclient.py index 4d38f0a..ee284e9 100644 --- a/nonebot_plugin_llmchat/mcpclient.py +++ b/nonebot_plugin_llmchat/mcpclient.py @@ -8,6 +8,8 @@ from nonebot import logger from .config import MCPServerConfig from .onebottools import OneBotTools +# 导入你的自定义工具类(可选) +# from .your_tools import YourCustomTools class MCPClient: @@ -120,6 +122,12 @@ class MCPClient: available_tools.extend(onebot_tools) logger.debug(f"添加了{len(onebot_tools)}个OneBot内置工具") + # 添加自定义工具(可选) + # if hasattr(self, 'custom_tools'): + # custom_tools = self.custom_tools.get_available_tools() + # available_tools.extend(custom_tools) + # logger.debug(f"添加了{len(custom_tools)}个自定义工具") + # 添加MCP服务器工具 for server_name in self.server_config.keys(): logger.debug(f"正在从服务器[{server_name}]获取工具列表") @@ -146,8 +154,44 @@ class MCPClient: logger.info(f"工具列表缓存完成,共缓存{len(available_tools)}个工具") return available_tools - async def call_tool(self, tool_name: str, tool_args: dict, group_id: int | None = None, bot_id: str | None = None): + def check_tool_permission(self, tool_name: str, user_id: int) -> tuple[bool, str | None]: + """检查用户是否有权限调用该工具 + + 返回: (有权限, 错误信息) + """ + # MCP 工具权限检查 + if tool_name.startswith("mcp__"): + parts = tool_name.split("__") + if len(parts) < 2: + return False, f"工具名称格式错误: {tool_name}" + + server_name = parts[1] + if server_name not in self.server_config: + return False, f"未知的 MCP 服务器: {server_name}" + + config = self.server_config[server_name] + + # 如果不需要管理员权限,直接允许 + if not config.require_admin: + return True, None + + # 检查用户是否是管理员 + if user_id not in config.admin_user_ids: + return False, f"您没有权限使用此工具,只有管理员才能使用" + + return True, None + + return True, None + + async def call_tool(self, tool_name: str, tool_args: dict, group_id: int | None = None, bot_id: str | None = None, user_id: int | None = None): """按需连接调用工具,调用后立即断开""" + # 检查权限 + if user_id is not None: + has_permission, error_msg = self.check_tool_permission(tool_name, user_id) + if not has_permission: + logger.warning(f"用户 {user_id} 尝试调用无权限的工具 {tool_name}") + return error_msg or "您没有权限使用此工具" + # 检查是否是OneBot内置工具 if tool_name.startswith("ob__"): if group_id is None or bot_id is None: @@ -155,6 +199,11 @@ class MCPClient: logger.info(f"调用OneBot工具[{tool_name}]") return await self.onebot_tools.call_tool(tool_name, tool_args, group_id, bot_id) + # 检查是否是自定义工具(可选) + # if tool_name.startswith("custom__"): + # logger.info(f"调用自定义工具[{tool_name}]") + # return await self.custom_tools.call_tool(tool_name, tool_args) + # 检查是否是MCP工具 if tool_name.startswith("mcp__"): # MCP工具处理:mcp__server_name__tool_name @@ -184,6 +233,10 @@ class MCPClient: if tool_name.startswith("ob__"): return self.onebot_tools.get_friendly_name(tool_name) + # 检查是否是自定义工具(可选) + # if tool_name.startswith("custom__"): + # return self.custom_tools.get_friendly_name(tool_name) + # 检查是否是MCP工具 if tool_name.startswith("mcp__"): # MCP工具处理:mcp__server_name__tool_name