From 4ab787a51948be7ea00558d1e12a4b380b0df889 Mon Sep 17 00:00:00 2001 From: slexce <2767145231@qq.com> Date: Tue, 10 Mar 2026 23:33:23 +0800 Subject: [PATCH] =?UTF-8?q?:sparkles:=20=E6=B7=BB=E5=8A=A0=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E8=B0=83=E7=94=A8=E5=AD=90=E6=A8=A1=E5=9E=8B=E7=9A=84?= =?UTF-8?q?=E8=83=BD=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nonebot_plugin_llmchat/__init__.py | 87 ++- nonebot_plugin_llmchat/config.py | 8 + nonebot_plugin_llmchat/mcpclient.py | 61 ++- nonebot_plugin_llmchat/submodel_caller.py | 612 ++++++++++++++++++++++ 4 files changed, 755 insertions(+), 13 deletions(-) create mode 100644 nonebot_plugin_llmchat/submodel_caller.py diff --git a/nonebot_plugin_llmchat/__init__.py b/nonebot_plugin_llmchat/__init__.py index 32eb4ed..6981e63 100755 --- a/nonebot_plugin_llmchat/__init__.py +++ b/nonebot_plugin_llmchat/__init__.py @@ -360,7 +360,7 @@ async def process_messages(context_id: int, is_group: bool = True): logger.debug(f"从队列获取消息 用户:{context_id} 消息ID:{event.message_id}") group_id = None past_events_snapshot = [] - mcp_client = MCPClient.get_instance(plugin_config.mcp_servers) + mcp_client = MCPClient.get_instance(plugin_config.mcp_servers, plugin_config) try: # 构建系统提示,分成多行以满足行长限制 chat_type = "群聊" if is_group else "私聊" @@ -447,9 +447,14 @@ async def process_messages(context_id: int, is_group: bool = True): } if preset.support_mcp: - available_tools = await mcp_client.get_available_tools(is_group) + available_tools = await mcp_client.get_available_tools(is_group, preset) client_config["tools"] = available_tools + # 用于存储子模型生成的多媒体内容 + submodel_images: list[str] = [] + submodel_voices: list[str] = [] + submodel_videos: list[str] = [] + response = await client.chat.completions.create( **client_config, messages=messages + new_messages, @@ -486,7 +491,8 @@ async def process_messages(context_id: int, is_group: bool = True): group_id=event.group_id, bot_id=str(event.self_id), user_id=event.user_id, - is_group=True + is_group=True, + current_preset=preset ) else: result = await mcp_client.call_tool( @@ -494,9 +500,37 @@ async def process_messages(context_id: int, is_group: bool = True): tool_args, bot_id=str(event.self_id), user_id=event.user_id, - is_group=False + is_group=False, + current_preset=preset ) + # 处理子模型返回的结构化结果 + if isinstance(result, dict) and tool_name.startswith("submodel__"): + if result.get("success"): + # 收集多媒体内容 + if result.get("images"): + submodel_images.extend(result["images"]) + logger.info(f"子模型生成了 {len(result['images'])} 张图片") + if result.get("audio"): + submodel_voices.append(result["audio"]) + logger.info("子模型生成了语音") + if result.get("video"): + submodel_videos.append(result["video"]) + logger.info("子模型生成了视频") + # 构建给主模型的结果消息 + result_msg = f"成功使用模型 {result.get('model_used', '未知')} 生成内容。" + if result.get("content"): + result_msg += f"\n子模型回复:{result['content']}" + if result.get("images"): + result_msg += f"\n已生成 {len(result['images'])} 张图片,将在你回复后发送给用户。" + if result.get("audio"): + result_msg += "\n已生成语音,将在你回复后发送给用户。" + if result.get("video"): + result_msg += "\n已生成视频,将在你回复后发送给用户。" + result = result_msg + else: + result = f"生成失败:{result.get('error', '未知错误')}" + new_messages.append({ "role": "tool", "tool_call_id": tool_call.id, @@ -557,6 +591,7 @@ async def process_messages(context_id: int, is_group: bool = True): assert reply is not None await send_split_messages(handler, reply) + # 发送主模型直接生成的图片 if reply_images: logger.debug(f"API响应 图片数:{len(reply_images)}") for i, image in enumerate(reply_images, start=1): @@ -565,6 +600,50 @@ async def process_messages(context_id: int, is_group: bool = True): image_msg = MessageSegment.image(base64.b64decode(image_base64)) await handler.send(image_msg) + # 发送子模型生成的图片 + if submodel_images: + logger.info(f"发送子模型生成的 {len(submodel_images)} 张图片") + for i, img_base64 in enumerate(submodel_images, start=1): + try: + logger.debug(f"正在发送子模型图片 {i}/{len(submodel_images)}") + # 处理可能的 data URL 前缀 + if img_base64.startswith("data:"): + img_base64 = img_base64.split(",", 1)[-1] if "," in img_base64 else img_base64 + image_msg = MessageSegment.image(base64.b64decode(img_base64)) + await handler.send(image_msg) + except Exception as e: + logger.error(f"发送子模型图片失败: {e}") + + # 发送子模型生成的语音 + if submodel_voices: + logger.info(f"发送子模型生成的 {len(submodel_voices)} 条语音") + for i, voice_data in enumerate(submodel_voices, start=1): + try: + logger.debug(f"正在发送子模型语音 {i}/{len(submodel_voices)}") + if voice_data.startswith("data:"): + voice_data = voice_data.split(",", 1)[-1] if "," in voice_data else voice_data + voice_msg = MessageSegment.record(base64.b64decode(voice_data)) + await handler.send(voice_msg) + except Exception as e: + logger.error(f"发送子模型语音失败: {e}") + + # 发送子模型生成的视频 + if submodel_videos: + logger.info(f"发送子模型生成的 {len(submodel_videos)} 个视频") + for i, video_data in enumerate(submodel_videos, start=1): + try: + logger.debug(f"正在发送子模型视频 {i}/{len(submodel_videos)}") + # 视频可能是 URL 或 base64 + if video_data.startswith("http"): + video_msg = MessageSegment.video(video_data) + else: + if video_data.startswith("data:"): + video_data = video_data.split(",", 1)[-1] if "," in video_data else video_data + video_msg = MessageSegment.video(base64.b64decode(video_data)) + await handler.send(video_msg) + except Exception as e: + logger.error(f"发送子模型视频失败: {e}") + except Exception as e: logger.opt(exception=e).error(f"API请求失败 {'群号' if is_group else '用户'}:{context_id}") # 如果在处理过程中出现异常,恢复未处理的消息到state中 diff --git a/nonebot_plugin_llmchat/config.py b/nonebot_plugin_llmchat/config.py index 5a47ab7..b2e4bf4 100755 --- a/nonebot_plugin_llmchat/config.py +++ b/nonebot_plugin_llmchat/config.py @@ -16,6 +16,14 @@ class PresetConfig(BaseModel): support_mcp: bool = Field(False, description="是否支持MCP") support_image: bool = Field(False, description="是否支持图片输入") + # 子模型能力标记 + support_to_image: bool = Field(False, description="是否支持生成图片") + support_to_voice: bool = Field(False, description="是否支持生成语音") + support_to_video: bool = Field(False, description="是否支持生成视频") + + # 可调用的子模型列表 + call_model_list: list[str] | None = Field(None, description="可调用的子模型名称列表") + class MCPServerConfig(BaseModel): """MCP服务器配置""" command: str | None = Field(None, description="stdio模式下MCP命令") diff --git a/nonebot_plugin_llmchat/mcpclient.py b/nonebot_plugin_llmchat/mcpclient.py index ab5a219..7c846b5 100644 --- a/nonebot_plugin_llmchat/mcpclient.py +++ b/nonebot_plugin_llmchat/mcpclient.py @@ -10,21 +10,22 @@ except: from mcp.client.streamable_http import streamablehttp_client from nonebot import logger -from .config import MCPServerConfig, transportType +from .config import MCPServerConfig, PresetConfig, ScopedConfig, transportType from .onebottools import OneBotTools from .scheduler import SchedulerManager +from .submodel_caller import SubModelCaller class MCPClient: _instance = None _initialized = False - def __new__(cls, server_config: dict[str, MCPServerConfig] | None = None): + def __new__(cls, server_config: dict[str, MCPServerConfig] | None = None, plugin_config: ScopedConfig | None = None): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance - def __init__(self, server_config: dict[str, MCPServerConfig] | None = None): + def __init__(self, server_config: dict[str, MCPServerConfig] | None = None, plugin_config: ScopedConfig | None = None): if self._initialized: return @@ -33,6 +34,7 @@ class MCPClient: logger.info(f"正在初始化MCPClient单例,共有{len(server_config)}个服务器配置") self.server_config = server_config + self.plugin_config = plugin_config self.sessions = {} self.exit_stack = AsyncExitStack() # 添加工具列表缓存 @@ -42,16 +44,18 @@ class MCPClient: self.onebot_tools = OneBotTools() # 初始化定时任务管理器 self.scheduler_manager = SchedulerManager.get_instance() + # 初始化子模型调用器(如果有 plugin_config) + self.submodel_caller = SubModelCaller.get_instance(plugin_config) if plugin_config else None self._initialized = True logger.debug("MCPClient单例初始化成功") @classmethod - def get_instance(cls, server_config: dict[str, MCPServerConfig] | None = None): + def get_instance(cls, server_config: dict[str, MCPServerConfig] | None = None, plugin_config: ScopedConfig | None = None): """获取MCPClient实例""" if cls._instance is None: if server_config is None: raise ValueError("server_config must be provided for first initialization") - cls._instance = cls(server_config) + cls._instance = cls(server_config, plugin_config) return cls._instance @classmethod @@ -160,8 +164,13 @@ class MCPClient: - async def get_available_tools(self, is_group: bool): - """获取可用工具列表,使用缓存机制""" + async def get_available_tools(self, is_group: bool, current_preset: PresetConfig | None = None): + """获取可用工具列表,使用缓存机制 + + Args: + is_group: 是否群聊场景 + current_preset: 当前使用的预设配置(用于获取子模型工具) + """ await self.init_tools_cache() available_tools = self._tools_cache.copy() if self._tools_cache else [] if is_group: @@ -169,6 +178,12 @@ class MCPClient: available_tools.extend(self.onebot_tools.get_available_tools()) # 添加定时任务工具(群聊和私聊都可用) available_tools.extend(self.scheduler_manager.get_available_tools()) + # 添加子模型调用工具(根据当前预设的 call_model_list 动态生成) + if self.submodel_caller and current_preset: + submodel_tools = self.submodel_caller.get_available_tools(current_preset) + available_tools.extend(submodel_tools) + if submodel_tools: + logger.debug(f"添加了 {len(submodel_tools)} 个子模型调用工具") logger.debug(f"获取可用工具列表,共{len(available_tools)}个工具") return available_tools @@ -179,9 +194,20 @@ class MCPClient: group_id: int | None = None, bot_id: str | None = None, user_id: int | None = None, - is_group: bool = True + is_group: bool = True, + current_preset: PresetConfig | None = None ): - """按需连接调用工具,调用后立即断开""" + """按需连接调用工具,调用后立即断开 + + Args: + tool_name: 工具名称 + tool_args: 工具参数 + group_id: 群号(群聊时必需) + bot_id: 机器人ID + user_id: 用户ID + is_group: 是否群聊 + current_preset: 当前使用的预设配置(子模型调用时必需) + """ # 检查是否是OneBot内置工具 if tool_name.startswith("ob__"): if group_id is None or bot_id is None: @@ -199,6 +225,17 @@ class MCPClient: tool_name, tool_args, context_id, is_group, user_id ) + # 检查是否是子模型调用工具 + if tool_name.startswith("submodel__"): + if not self.submodel_caller: + return "子模型调用器未初始化" + if not current_preset: + return "子模型调用需要提供 current_preset 参数" + logger.info(f"调用子模型工具[{tool_name}]") + result = await self.submodel_caller.call_tool(tool_name, tool_args, current_preset) + # 返回结构化结果,让上层处理 + return result + # 检查是否是MCP工具 if tool_name.startswith("mcp__"): # MCP工具处理:mcp__server_name__tool_name @@ -232,6 +269,12 @@ class MCPClient: if tool_name.startswith("scheduler__"): return self.scheduler_manager.get_friendly_name(tool_name) + # 检查是否是子模型调用工具 + if tool_name.startswith("submodel__"): + if self.submodel_caller: + return self.submodel_caller.get_friendly_name(tool_name) + return tool_name + # 检查是否是MCP工具 if tool_name.startswith("mcp__"): # MCP工具处理:mcp__server_name__tool_name diff --git a/nonebot_plugin_llmchat/submodel_caller.py b/nonebot_plugin_llmchat/submodel_caller.py new file mode 100644 index 0000000..df4d793 --- /dev/null +++ b/nonebot_plugin_llmchat/submodel_caller.py @@ -0,0 +1,612 @@ +"""子模型调用模块 + +允许主模型通过 function tool 调用其他模型来完成特定任务(如生成图片、语音、视频)。 +""" + +import asyncio +import base64 +import json +from typing import Any + +import httpx +from nonebot import logger +from openai import AsyncOpenAI + +from .config import PresetConfig, ScopedConfig + + +class SubModelCaller: + """子模型调用管理器""" + + _instance = None + _initialized = False + + def __new__(cls, plugin_config: ScopedConfig | None = None): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self, plugin_config: ScopedConfig | None = None): + if self._initialized: + return + + if plugin_config is None: + raise ValueError("plugin_config must be provided for first initialization") + + self.plugin_config = plugin_config + self._preset_map: dict[str, PresetConfig] = { + p.name: p for p in plugin_config.api_presets + } + self._initialized = True + logger.info("SubModelCaller 初始化完成") + + @classmethod + def get_instance(cls, plugin_config: ScopedConfig | None = None) -> "SubModelCaller": + """获取单例实例""" + if cls._instance is None: + if plugin_config is None: + raise ValueError("plugin_config must be provided for first initialization") + cls._instance = cls(plugin_config) + return cls._instance + + def _get_callable_presets(self, current_preset: PresetConfig) -> list[PresetConfig]: + """获取当前预设可调用的子模型预设列表""" + if not current_preset.call_model_list: + return [] + + callable_presets = [] + for name in current_preset.call_model_list: + if name in self._preset_map: + callable_presets.append(self._preset_map[name]) + else: + logger.warning(f"call_model_list 中的模型 '{name}' 不存在于 api_presets 中") + + return callable_presets + + def _get_presets_with_capability( + self, + current_preset: PresetConfig, + capability: str + ) -> list[PresetConfig]: + """获取具有特定能力的可调用子模型列表 + + Args: + current_preset: 当前主模型预设 + capability: 能力名称,如 'support_to_image' + + Returns: + 具有该能力的子模型预设列表(按 call_model_list 顺序) + """ + callable_presets = self._get_callable_presets(current_preset) + return [p for p in callable_presets if getattr(p, capability, False)] + + def get_available_tools(self, current_preset: PresetConfig) -> list[dict[str, Any]]: + """根据当前预设的 call_model_list 动态生成可用的子模型调用工具 + + 只有当 call_model_list 中存在具有相应能力的模型时,才会生成对应的工具。 + """ + tools = [] + + # 检查是否有可调用的图片生成模型 + image_models = self._get_presets_with_capability(current_preset, "support_to_image") + if image_models: + model_names = [m.name for m in image_models] + tools.append({ + "type": "function", + "function": { + "name": "submodel__generate_image", + "description": f"""调用子模型生成图片。可用的图片生成模型:{', '.join(model_names)}。 +使用说明: +- 当用户要求生成图片时使用此工具 +- prompt 应该是详细的图片描述,用英文效果更好 +- 系统会自动选择最优的模型,如果失败会自动切换备选模型 +- 返回结果包含 base64 编码的图片数据""", + "parameters": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "图片生成提示词,描述要生成的图片内容,建议使用英文" + }, + "preferred_model": { + "type": "string", + "description": f"可选:指定使用的模型名称,可选值:{', '.join(model_names)}", + "enum": model_names + } + }, + "required": ["prompt"] + } + } + }) + + # 检查是否有可调用的语音生成模型 + voice_models = self._get_presets_with_capability(current_preset, "support_to_voice") + if voice_models: + model_names = [m.name for m in voice_models] + tools.append({ + "type": "function", + "function": { + "name": "submodel__generate_voice", + "description": f"""调用子模型生成语音。可用的语音生成模型:{', '.join(model_names)}。 +使用说明: +- 当用户要求生成语音或朗读文本时使用此工具 +- text 是要转换为语音的文本内容 +- 返回结果包含 base64 编码的音频数据""", + "parameters": { + "type": "object", + "properties": { + "text": { + "type": "string", + "description": "要转换为语音的文本内容" + }, + "preferred_model": { + "type": "string", + "description": f"可选:指定使用的模型名称,可选值:{', '.join(model_names)}", + "enum": model_names + } + }, + "required": ["text"] + } + } + }) + + # 检查是否有可调用的视频生成模型 + video_models = self._get_presets_with_capability(current_preset, "support_to_video") + if video_models: + model_names = [m.name for m in video_models] + tools.append({ + "type": "function", + "function": { + "name": "submodel__generate_video", + "description": f"""调用子模型生成视频。可用的视频生成模型:{', '.join(model_names)}。 +使用说明: +- 当用户要求生成视频时使用此工具 +- prompt 是视频内容描述 +- 返回结果包含视频数据或URL""", + "parameters": { + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "视频生成提示词,描述要生成的视频内容" + }, + "preferred_model": { + "type": "string", + "description": f"可选:指定使用的模型名称,可选值:{', '.join(model_names)}", + "enum": model_names + } + }, + "required": ["prompt"] + } + } + }) + + return tools + + async def _call_model_api( + self, + preset: PresetConfig, + messages: list[dict], + tools: list[dict] | None = None + ) -> dict[str, Any]: + """调用模型 API + + Args: + preset: 模型预设配置 + messages: 消息列表 + tools: 可选的工具列表(如果模型支持 MCP) + + Returns: + 包含响应内容的字典 + """ + # 初始化 OpenAI 客户端 + if preset.proxy: + client = AsyncOpenAI( + base_url=preset.api_base, + api_key=preset.api_key, + timeout=self.plugin_config.request_timeout, + http_client=httpx.AsyncClient(proxy=preset.proxy), + ) + else: + client = AsyncOpenAI( + base_url=preset.api_base, + api_key=preset.api_key, + timeout=self.plugin_config.request_timeout, + ) + + # 构建请求参数 + request_params = { + "model": preset.model_name, + "max_tokens": preset.max_tokens, + "temperature": preset.temperature, + "messages": messages + } + + # 如果模型支持 MCP 并且提供了工具,添加到请求中 + if preset.support_mcp and tools: + request_params["tools"] = tools + + response = await client.chat.completions.create(**request_params) + message = response.choices[0].message + + result = { + "content": message.content, + "tool_calls": message.tool_calls, + "images": getattr(message, "images", None), + "audio": getattr(message, "audio", None), + "video": getattr(message, "video", None), + } + + return result + + async def _call_with_mcp_support( + self, + preset: PresetConfig, + initial_messages: list[dict], + mcp_tools: list[dict] | None = None + ) -> dict[str, Any]: + """调用模型并处理可能的 MCP 工具调用 + + 如果模型支持 MCP,会处理工具调用循环直到得到最终响应。 + """ + messages = initial_messages.copy() + tools = mcp_tools if preset.support_mcp else None + + # 最多进行 5 轮工具调用 + max_tool_rounds = 5 + + for _ in range(max_tool_rounds): + result = await self._call_model_api(preset, messages, tools) + + # 如果没有工具调用,直接返回结果 + if not result["tool_calls"]: + return result + + # 处理工具调用 + logger.info(f"子模型 {preset.name} 请求调用工具: {[tc.function.name for tc in result['tool_calls']]}") + + # 添加 assistant 消息 + messages.append({ + "role": "assistant", + "tool_calls": [tc.model_dump() for tc in result["tool_calls"]] + }) + + # 处理每个工具调用 + for tool_call in result["tool_calls"]: + tool_name = tool_call.function.name + tool_args = json.loads(tool_call.function.arguments) + + # 调用 MCP 工具 + try: + from .mcpclient import MCPClient + mcp_client = MCPClient.get_instance(self.plugin_config.mcp_servers) + tool_result = await mcp_client.call_tool( + tool_name, + tool_args, + group_id=None, + bot_id=None, + user_id=None, + is_group=False + ) + result_str = str(tool_result) if tool_result else "工具调用成功" + except Exception as e: + logger.error(f"子模型 MCP 工具调用失败: {e}") + result_str = f"工具调用失败: {e}" + + messages.append({ + "role": "tool", + "tool_call_id": tool_call.id, + "content": result_str + }) + + # 超过最大轮数,返回最后的结果 + logger.warning(f"子模型 {preset.name} 工具调用超过 {max_tool_rounds} 轮") + return await self._call_model_api(preset, messages, None) + + async def generate_image( + self, + current_preset: PresetConfig, + prompt: str, + preferred_model: str | None = None + ) -> dict[str, Any]: + """生成图片 + + Args: + current_preset: 当前主模型预设 + prompt: 图片生成提示词 + preferred_model: 可选的指定模型名称 + + Returns: + 包含生成结果的字典: + - success: bool + - images: list[str] (base64 编码的图片) + - content: str (模型的文本回复) + - error: str (如果失败) + - model_used: str (实际使用的模型名称) + """ + image_models = self._get_presets_with_capability(current_preset, "support_to_image") + + if not image_models: + return { + "success": False, + "error": "没有可用的图片生成模型", + "images": [], + "content": "" + } + + # 如果指定了模型,调整顺序 + if preferred_model: + image_models = sorted( + image_models, + key=lambda p: 0 if p.name == preferred_model else 1 + ) + + # 获取 MCP 工具(如果需要) + mcp_tools = None + try: + from .mcpclient import MCPClient + mcp_client = MCPClient.get_instance(self.plugin_config.mcp_servers) + await mcp_client.init_tools_cache() + mcp_tools = mcp_client._tools_cache.copy() if mcp_client._tools_cache else None + except Exception as e: + logger.debug(f"获取 MCP 工具失败: {e}") + + # 构建消息 + messages = [ + { + "role": "system", + "content": "你是一个图片生成助手。请根据用户的描述生成图片。直接生成图片,不需要额外解释。" + }, + { + "role": "user", + "content": prompt + } + ] + + errors = [] + for preset in image_models: + logger.info(f"尝试使用模型 {preset.name} 生成图片") + try: + result = await self._call_with_mcp_support(preset, messages, mcp_tools) + + # 检查是否有图片返回 + images = result.get("images") + if images: + # 提取 base64 图片数据 + image_list = [] + for img in images: + if isinstance(img, dict) and "image_url" in img: + url = img["image_url"].get("url", "") + # 移除 data URL 前缀 + if url.startswith("data:"): + # 格式: data:image/png;base64,xxxxx + base64_data = url.split(",", 1)[-1] if "," in url else url + else: + base64_data = url + image_list.append(base64_data) + elif isinstance(img, str): + image_list.append(img) + + if image_list: + logger.info(f"模型 {preset.name} 成功生成 {len(image_list)} 张图片") + return { + "success": True, + "images": image_list, + "content": result.get("content", ""), + "model_used": preset.name + } + + # 没有图片但有内容,可能是模型回复了文本 + if result.get("content"): + logger.warning(f"模型 {preset.name} 返回了文本但没有图片") + errors.append(f"{preset.name}: 模型未生成图片") + else: + errors.append(f"{preset.name}: 模型无响应") + + except Exception as e: + logger.error(f"模型 {preset.name} 调用失败: {e}") + errors.append(f"{preset.name}: {str(e)}") + continue + + # 所有模型都失败了 + return { + "success": False, + "error": f"所有模型都无法生成图片。详情:{'; '.join(errors)}", + "images": [], + "content": "" + } + + async def generate_voice( + self, + current_preset: PresetConfig, + text: str, + preferred_model: str | None = None + ) -> dict[str, Any]: + """生成语音 + + Args: + current_preset: 当前主模型预设 + text: 要转换为语音的文本 + preferred_model: 可选的指定模型名称 + + Returns: + 包含生成结果的字典 + """ + voice_models = self._get_presets_with_capability(current_preset, "support_to_voice") + + if not voice_models: + return { + "success": False, + "error": "没有可用的语音生成模型", + "audio": None, + "content": "" + } + + if preferred_model: + voice_models = sorted( + voice_models, + key=lambda p: 0 if p.name == preferred_model else 1 + ) + + messages = [ + { + "role": "system", + "content": "你是一个语音生成助手。请将用户提供的文本转换为语音。" + }, + { + "role": "user", + "content": f"请将以下文本转换为语音:\n{text}" + } + ] + + errors = [] + for preset in voice_models: + logger.info(f"尝试使用模型 {preset.name} 生成语音") + try: + result = await self._call_with_mcp_support(preset, messages, None) + + audio = result.get("audio") + if audio: + logger.info(f"模型 {preset.name} 成功生成语音") + return { + "success": True, + "audio": audio, + "content": result.get("content", ""), + "model_used": preset.name + } + + errors.append(f"{preset.name}: 模型未生成语音") + + except Exception as e: + logger.error(f"模型 {preset.name} 调用失败: {e}") + errors.append(f"{preset.name}: {str(e)}") + continue + + return { + "success": False, + "error": f"所有模型都无法生成语音。详情:{'; '.join(errors)}", + "audio": None, + "content": "" + } + + async def generate_video( + self, + current_preset: PresetConfig, + prompt: str, + preferred_model: str | None = None + ) -> dict[str, Any]: + """生成视频 + + Args: + current_preset: 当前主模型预设 + prompt: 视频生成提示词 + preferred_model: 可选的指定模型名称 + + Returns: + 包含生成结果的字典 + """ + video_models = self._get_presets_with_capability(current_preset, "support_to_video") + + if not video_models: + return { + "success": False, + "error": "没有可用的视频生成模型", + "video": None, + "content": "" + } + + if preferred_model: + video_models = sorted( + video_models, + key=lambda p: 0 if p.name == preferred_model else 1 + ) + + messages = [ + { + "role": "system", + "content": "你是一个视频生成助手。请根据用户的描述生成视频。" + }, + { + "role": "user", + "content": prompt + } + ] + + errors = [] + for preset in video_models: + logger.info(f"尝试使用模型 {preset.name} 生成视频") + try: + result = await self._call_with_mcp_support(preset, messages, None) + + video = result.get("video") + if video: + logger.info(f"模型 {preset.name} 成功生成视频") + return { + "success": True, + "video": video, + "content": result.get("content", ""), + "model_used": preset.name + } + + errors.append(f"{preset.name}: 模型未生成视频") + + except Exception as e: + logger.error(f"模型 {preset.name} 调用失败: {e}") + errors.append(f"{preset.name}: {str(e)}") + continue + + return { + "success": False, + "error": f"所有模型都无法生成视频。详情:{'; '.join(errors)}", + "video": None, + "content": "" + } + + async def call_tool( + self, + tool_name: str, + tool_args: dict[str, Any], + current_preset: PresetConfig + ) -> dict[str, Any]: + """工具调用入口 + + Args: + tool_name: 工具名称 + tool_args: 工具参数 + current_preset: 当前主模型预设 + + Returns: + 工具调用结果 + """ + if tool_name == "submodel__generate_image": + return await self.generate_image( + current_preset=current_preset, + prompt=tool_args.get("prompt", ""), + preferred_model=tool_args.get("preferred_model") + ) + elif tool_name == "submodel__generate_voice": + return await self.generate_voice( + current_preset=current_preset, + text=tool_args.get("text", ""), + preferred_model=tool_args.get("preferred_model") + ) + elif tool_name == "submodel__generate_video": + return await self.generate_video( + current_preset=current_preset, + prompt=tool_args.get("prompt", ""), + preferred_model=tool_args.get("preferred_model") + ) + else: + return { + "success": False, + "error": f"未知的子模型工具: {tool_name}" + } + + def get_friendly_name(self, tool_name: str) -> str: + """获取工具的友好名称""" + friendly_names = { + "submodel__generate_image": "子模型 - 生成图片", + "submodel__generate_voice": "子模型 - 生成语音", + "submodel__generate_video": "子模型 - 生成视频", + } + return friendly_names.get(tool_name, tool_name)