🆕 新增MCP权限管理

This commit is contained in:
KawakazeNotFound 2025-11-07 12:46:43 +08:00
parent b29386bf0f
commit b145e70e39
3 changed files with 64 additions and 4 deletions

View file

@ -377,7 +377,7 @@ async def process_messages(context_id: int, is_group: bool = True):
"- 代码则不需要分段,用单独的一条消息发送。",
"- 请使用发送者的昵称称呼发送者,你可以礼貌地问候发送者,但只需要在第一次回答这位发送者的问题时问候他。",
"- 你有at群成员的能力只需要在某条消息中插入[CQ:at,qq=QQ号]"
"也就是CQ码。at发送者是非必要的你可以根据你自己的想法at某个人。",
"也就是CQ码。at发送者是非必要的如果不是必要请不要at别人。",
"- 你有引用某条消息的能力,使用[CQ:reply,id=消息id]来引用。",
"- 如果有多条消息,你应该优先回复提到你的,一段时间之前的就不要回复了,也可以直接选择不回复。",
"- 如果你选择完全不回复,你只需要直接输出一个<botbr>。",
@ -476,7 +476,8 @@ async def process_messages(context_id: int, is_group: bool = True):
tool_name,
tool_args,
group_id=event.group_id,
bot_id=str(event.self_id)
bot_id=str(event.self_id),
user_id=event.user_id
)
else:
# 私聊时某些工具不可用(如群操作工具),跳过这些工具
@ -487,7 +488,8 @@ async def process_messages(context_id: int, is_group: bool = True):
tool_name,
tool_args,
group_id=None,
bot_id=str(event.self_id)
bot_id=str(event.self_id),
user_id=event.user_id
)
new_messages.append({

View file

@ -25,6 +25,11 @@ class MCPServerConfig(BaseModel):
# 额外字段
friendly_name: str | None = Field(None, description="MCP服务器友好名称")
addtional_prompt: str | None = Field(None, description="额外提示词")
# 权限控制字段
require_admin: bool = Field(False, description="是否需要管理员权限")
admin_user_ids: list[int] = Field(default_factory=list, description="有权限的用户ID列表")
read_only: bool = Field(True, description="非管理员是否使用只读模式")
class ScopedConfig(BaseModel):
"""LLM Chat Plugin配置"""

View file

@ -8,6 +8,8 @@ from nonebot import logger
from .config import MCPServerConfig
from .onebottools import OneBotTools
# 导入你的自定义工具类(可选)
# from .your_tools import YourCustomTools
class MCPClient:
@ -120,6 +122,12 @@ class MCPClient:
available_tools.extend(onebot_tools)
logger.debug(f"添加了{len(onebot_tools)}个OneBot内置工具")
# 添加自定义工具(可选)
# if hasattr(self, 'custom_tools'):
# custom_tools = self.custom_tools.get_available_tools()
# available_tools.extend(custom_tools)
# logger.debug(f"添加了{len(custom_tools)}个自定义工具")
# 添加MCP服务器工具
for server_name in self.server_config.keys():
logger.debug(f"正在从服务器[{server_name}]获取工具列表")
@ -146,8 +154,44 @@ class MCPClient:
logger.info(f"工具列表缓存完成,共缓存{len(available_tools)}个工具")
return available_tools
async def call_tool(self, tool_name: str, tool_args: dict, group_id: int | None = None, bot_id: str | None = None):
def check_tool_permission(self, tool_name: str, user_id: int) -> tuple[bool, str | None]:
"""检查用户是否有权限调用该工具
返回: (有权限, 错误信息)
"""
# MCP 工具权限检查
if tool_name.startswith("mcp__"):
parts = tool_name.split("__")
if len(parts) < 2:
return False, f"工具名称格式错误: {tool_name}"
server_name = parts[1]
if server_name not in self.server_config:
return False, f"未知的 MCP 服务器: {server_name}"
config = self.server_config[server_name]
# 如果不需要管理员权限,直接允许
if not config.require_admin:
return True, None
# 检查用户是否是管理员
if user_id not in config.admin_user_ids:
return False, f"您没有权限使用此工具,只有管理员才能使用"
return True, None
return True, None
async def call_tool(self, tool_name: str, tool_args: dict, group_id: int | None = None, bot_id: str | None = None, user_id: int | None = None):
"""按需连接调用工具,调用后立即断开"""
# 检查权限
if user_id is not None:
has_permission, error_msg = self.check_tool_permission(tool_name, user_id)
if not has_permission:
logger.warning(f"用户 {user_id} 尝试调用无权限的工具 {tool_name}")
return error_msg or "您没有权限使用此工具"
# 检查是否是OneBot内置工具
if tool_name.startswith("ob__"):
if group_id is None or bot_id is None:
@ -155,6 +199,11 @@ class MCPClient:
logger.info(f"调用OneBot工具[{tool_name}]")
return await self.onebot_tools.call_tool(tool_name, tool_args, group_id, bot_id)
# 检查是否是自定义工具(可选)
# if tool_name.startswith("custom__"):
# logger.info(f"调用自定义工具[{tool_name}]")
# return await self.custom_tools.call_tool(tool_name, tool_args)
# 检查是否是MCP工具
if tool_name.startswith("mcp__"):
# MCP工具处理mcp__server_name__tool_name
@ -184,6 +233,10 @@ class MCPClient:
if tool_name.startswith("ob__"):
return self.onebot_tools.get_friendly_name(tool_name)
# 检查是否是自定义工具(可选)
# if tool_name.startswith("custom__"):
# return self.custom_tools.get_friendly_name(tool_name)
# 检查是否是MCP工具
if tool_name.startswith("mcp__"):
# MCP工具处理mcp__server_name__tool_name