nonebot-plugin-llmchat/nonebot_plugin_llmchat/scheduler.py

785 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""定时任务管理模块"""
import asyncio
import json
import os
import uuid
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
import aiofiles
import httpx
from nonebot import get_bot, get_driver, logger, require
from nonebot.adapters.onebot.v11 import Bot, Message
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
require("nonebot_plugin_localstore")
import nonebot_plugin_localstore as store
require("nonebot_plugin_apscheduler")
from nonebot_plugin_apscheduler import scheduler
class ScheduleType(str, Enum):
"""定时任务类型"""
INTERVAL_MINUTES = "interval_minutes" # 每N分钟
DAILY = "daily" # 每天指定时间
WEEKLY = "weekly" # 每周指定天
YEARLY = "yearly" # 每年指定日期
ONCE = "once" # 一次性任务
class ScheduledTask(BaseModel):
"""定时任务模型"""
task_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
context_id: int # 群号或用户ID
is_group: bool # 是否群聊
schedule_type: ScheduleType # 任务类型
description: str # 任务描述用于AI生成提醒
creator_id: int # 创建者用户ID
created_at: datetime = Field(default_factory=datetime.now)
# 调度参数
interval_minutes: int | None = None # 间隔分钟数
hour: int | None = None # 小时 (0-23)
minute: int | None = None # 分钟 (0-59)
day_of_week: int | None = None # 周几 (0-6, 0=周一)
month: int | None = None # 月份 (1-12)
day: int | None = None # 日期 (1-31)
# 一次性任务
trigger_time: datetime | None = None # 触发时间
class SchedulerTools:
"""定时任务工具定义"""
def __init__(self):
self.tools = [
{
"type": "function",
"function": {
"name": "scheduler__create_task",
"description": """创建一个定时提醒任务。支持以下类型:
- interval_minutes: 每隔N分钟提醒需提供 interval_minutes (1-10080)
- daily: 每天指定时间提醒,需提供 hour (0-23) 和 minute (0-59)
- weekly: 每周指定天提醒,需提供 hour, minute, day_of_week (0=周一, 1=周二...6=周日)
- yearly: 每年指定日期提醒,需提供 month (1-12), day (1-31), hour, minute
- once: 一次性提醒,需提供 minutes_later 表示几分钟后触发 (1-525600)
重要提示:
- description 字段非常重要,它将在任务触发时用于生成提醒信息
- 如果任务需要获取实时信息如天气、新闻等请在描述中明确说明触发时AI会调用相应工具获取最新数据
- 如果用户没有提供完整信息(如查天气但没说城市),可以先创建任务,然后询问用户,得到答案后用 scheduler__update_task 更新描述""",
"parameters": {
"type": "object",
"properties": {
"schedule_type": {
"type": "string",
"description": "任务类型",
"enum": ["interval_minutes", "daily", "weekly", "yearly", "once"]
},
"description": {
"type": "string",
"description": "任务描述,将用于生成提醒信息"
},
"interval_minutes": {
"type": "integer",
"description": "间隔分钟数仅interval_minutes类型需要",
"minimum": 1,
"maximum": 10080
},
"hour": {
"type": "integer",
"description": "小时 (0-23)",
"minimum": 0,
"maximum": 23
},
"minute": {
"type": "integer",
"description": "分钟 (0-59)",
"minimum": 0,
"maximum": 59
},
"day_of_week": {
"type": "integer",
"description": "周几 (0=周一, 1=周二...6=周日)",
"minimum": 0,
"maximum": 6
},
"month": {
"type": "integer",
"description": "月份 (1-12)",
"minimum": 1,
"maximum": 12
},
"day": {
"type": "integer",
"description": "日期 (1-31)",
"minimum": 1,
"maximum": 31
},
"minutes_later": {
"type": "integer",
"description": "几分钟后触发仅once类型需要",
"minimum": 1,
"maximum": 525600
}
},
"required": ["schedule_type", "description"]
}
}
},
{
"type": "function",
"function": {
"name": "scheduler__list_tasks",
"description": "列出当前聊天的所有定时任务",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "scheduler__delete_task",
"description": "删除指定的定时任务",
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "要删除的任务ID"
}
},
"required": ["task_id"]
}
}
},
{
"type": "function",
"function": {
"name": "scheduler__update_task",
"description": """更新定时任务的描述。重要提示:
- 当用户在后续对话中补充了与已创建定时任务相关的信息时(如地点、具体要求、人名等),你应该主动调用此工具更新任务描述
- 例如:用户创建了"提醒我明天天气"的任务,后来告诉你他在"北京",你应该更新描述为"提醒我北京明天的天气"
- 任务描述应包含执行任务所需的所有关键信息因为触发时AI会根据描述来生成提醒或执行操作
- 如果不确定最近创建的任务ID可以先调用 scheduler__list_tasks 查看""",
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "要更新的任务ID"
},
"description": {
"type": "string",
"description": "新的任务描述,应包含执行任务所需的所有关键信息"
}
},
"required": ["task_id", "description"]
}
}
}
]
def get_available_tools(self) -> list[dict[str, Any]]:
"""获取可用的工具列表"""
return self.tools
def get_friendly_name(self, tool_name: str) -> str:
"""获取工具的友好名称"""
friendly_names = {
"scheduler__create_task": "定时任务 - 创建任务",
"scheduler__list_tasks": "定时任务 - 列出任务",
"scheduler__delete_task": "定时任务 - 删除任务",
"scheduler__update_task": "定时任务 - 更新任务",
}
return friendly_names.get(tool_name, tool_name)
class SchedulerManager:
"""定时任务管理器"""
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._initialized:
return
self.tasks: dict[str, ScheduledTask] = {}
self.tools = SchedulerTools()
self.data_file = store.get_plugin_data_file("llmchat_scheduler_tasks.json")
self._initialized = True
logger.info("SchedulerManager 初始化完成")
@classmethod
def get_instance(cls) -> "SchedulerManager":
"""获取单例实例"""
if cls._instance is None:
cls._instance = cls()
return cls._instance
async def load_tasks(self):
"""从文件加载任务"""
logger.info(f"从文件加载定时任务: {self.data_file}")
if not os.path.exists(self.data_file):
logger.debug("定时任务文件不存在,跳过加载")
return
try:
async with aiofiles.open(self.data_file, encoding="utf8") as f:
data = json.loads(await f.read())
for task_id, task_data in data.items():
# 转换字符串为datetime
if task_data.get("created_at"):
task_data["created_at"] = datetime.fromisoformat(task_data["created_at"])
if task_data.get("trigger_time"):
task_data["trigger_time"] = datetime.fromisoformat(task_data["trigger_time"])
self.tasks[task_id] = ScheduledTask(**task_data)
logger.info(f"成功加载 {len(self.tasks)} 个定时任务")
# 注册所有任务到APScheduler
await self.register_all_jobs()
except Exception as e:
logger.error(f"加载定时任务失败: {e}")
async def save_tasks(self):
"""保存任务到文件"""
logger.info(f"保存定时任务到文件: {self.data_file}")
try:
data = {}
for task_id, task in self.tasks.items():
task_dict = task.model_dump()
# 转换datetime为字符串
if task_dict.get("created_at"):
task_dict["created_at"] = task_dict["created_at"].isoformat()
if task_dict.get("trigger_time"):
task_dict["trigger_time"] = task_dict["trigger_time"].isoformat()
data[task_id] = task_dict
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
async with aiofiles.open(self.data_file, "w", encoding="utf8") as f:
await f.write(json.dumps(data, ensure_ascii=False, indent=2))
logger.info(f"成功保存 {len(self.tasks)} 个定时任务")
except Exception as e:
logger.error(f"保存定时任务失败: {e}")
def _validate_task_params(self, schedule_type: ScheduleType, **kwargs) -> str | None:
"""校验任务参数返回错误信息或None"""
if schedule_type == ScheduleType.INTERVAL_MINUTES:
interval = kwargs.get("interval_minutes")
if interval is None:
return "interval_minutes类型需要提供 interval_minutes 参数"
if not 1 <= interval <= 10080:
return "interval_minutes 必须在 1-10080 之间"
elif schedule_type == ScheduleType.DAILY:
hour = kwargs.get("hour")
minute = kwargs.get("minute")
if hour is None or minute is None:
return "daily类型需要提供 hour 和 minute 参数"
if not 0 <= hour <= 23:
return "hour 必须在 0-23 之间"
if not 0 <= minute <= 59:
return "minute 必须在 0-59 之间"
elif schedule_type == ScheduleType.WEEKLY:
hour = kwargs.get("hour")
minute = kwargs.get("minute")
day_of_week = kwargs.get("day_of_week")
if hour is None or minute is None or day_of_week is None:
return "weekly类型需要提供 hour, minute 和 day_of_week 参数"
if not 0 <= hour <= 23:
return "hour 必须在 0-23 之间"
if not 0 <= minute <= 59:
return "minute 必须在 0-59 之间"
if not 0 <= day_of_week <= 6:
return "day_of_week 必须在 0-6 之间 (0=周一)"
elif schedule_type == ScheduleType.YEARLY:
hour = kwargs.get("hour")
minute = kwargs.get("minute")
month = kwargs.get("month")
day = kwargs.get("day")
if hour is None or minute is None or month is None or day is None:
return "yearly类型需要提供 hour, minute, month 和 day 参数"
if not 0 <= hour <= 23:
return "hour 必须在 0-23 之间"
if not 0 <= minute <= 59:
return "minute 必须在 0-59 之间"
if not 1 <= month <= 12:
return "month 必须在 1-12 之间"
if not 1 <= day <= 31:
return "day 必须在 1-31 之间"
elif schedule_type == ScheduleType.ONCE:
minutes_later = kwargs.get("minutes_later")
if minutes_later is None:
return "once类型需要提供 minutes_later 参数"
if not 1 <= minutes_later <= 525600:
return "minutes_later 必须在 1-525600 之间"
return None
async def create_task(
self,
context_id: int,
is_group: bool,
creator_id: int,
schedule_type: str,
description: str,
**kwargs
) -> tuple[bool, str]:
"""创建定时任务"""
try:
stype = ScheduleType(schedule_type)
except ValueError:
return False, f"无效的任务类型: {schedule_type}"
# 参数校验
error = self._validate_task_params(stype, **kwargs)
if error:
return False, error
# 计算一次性任务的触发时间
trigger_time = None
if stype == ScheduleType.ONCE:
minutes_later = kwargs.get("minutes_later", 0)
trigger_time = datetime.now() + timedelta(minutes=minutes_later)
# 创建任务
task = ScheduledTask(
context_id=context_id,
is_group=is_group,
schedule_type=stype,
description=description,
creator_id=creator_id,
interval_minutes=kwargs.get("interval_minutes"),
hour=kwargs.get("hour"),
minute=kwargs.get("minute"),
day_of_week=kwargs.get("day_of_week"),
month=kwargs.get("month"),
day=kwargs.get("day"),
trigger_time=trigger_time
)
self.tasks[task.task_id] = task
# 注册到APScheduler
self._register_job(task)
# 保存
await self.save_tasks()
logger.info(f"创建定时任务成功: {task.task_id} - {description}")
return True, f"创建成功任务ID: {task.task_id}"
async def delete_task(self, task_id: str, context_id: int, is_group: bool) -> tuple[bool, str]:
"""删除定时任务"""
if task_id not in self.tasks:
return False, f"任务不存在: {task_id}"
task = self.tasks[task_id]
# 检查权限:只能删除同一聊天的任务
if task.context_id != context_id or task.is_group != is_group:
return False, "无法删除其他聊天的任务"
# 从APScheduler移除
job_id = f"scheduler_{task_id}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
# 删除任务
del self.tasks[task_id]
await self.save_tasks()
logger.info(f"删除定时任务: {task_id}")
return True, f"任务 {task_id} 已删除"
async def update_task(
self,
task_id: str,
context_id: int,
is_group: bool,
description: str
) -> tuple[bool, str]:
"""更新定时任务"""
if task_id not in self.tasks:
return False, f"任务不存在: {task_id}"
task = self.tasks[task_id]
# 检查权限
if task.context_id != context_id or task.is_group != is_group:
return False, "无法更新其他聊天的任务"
task.description = description
await self.save_tasks()
logger.info(f"更新定时任务: {task_id}")
return True, f"任务 {task_id} 已更新"
def list_tasks(self, context_id: int, is_group: bool) -> list[dict]:
"""列出指定聊天的所有任务"""
result = []
for task in self.tasks.values():
if task.context_id == context_id and task.is_group == is_group:
task_info = {
"task_id": task.task_id,
"description": task.description,
"schedule_type": task.schedule_type.value,
"created_at": task.created_at.strftime("%Y-%m-%d %H:%M:%S")
}
# 添加具体时间信息
if task.schedule_type == ScheduleType.INTERVAL_MINUTES:
task_info["schedule"] = f"{task.interval_minutes} 分钟"
elif task.schedule_type == ScheduleType.DAILY:
task_info["schedule"] = f"每天 {task.hour:02d}:{task.minute:02d}"
elif task.schedule_type == ScheduleType.WEEKLY:
weekdays = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
task_info["schedule"] = f"{weekdays[task.day_of_week]} {task.hour:02d}:{task.minute:02d}"
elif task.schedule_type == ScheduleType.YEARLY:
task_info["schedule"] = f"每年 {task.month}{task.day}{task.hour:02d}:{task.minute:02d}"
elif task.schedule_type == ScheduleType.ONCE:
if task.trigger_time:
task_info["schedule"] = f"一次性: {task.trigger_time.strftime('%Y-%m-%d %H:%M:%S')}"
result.append(task_info)
return result
def _register_job(self, task: ScheduledTask):
"""注册单个任务到APScheduler"""
job_id = f"scheduler_{task.task_id}"
# 移除已存在的同ID任务
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
if task.schedule_type == ScheduleType.INTERVAL_MINUTES:
scheduler.add_job(
self._trigger_task,
"interval",
minutes=task.interval_minutes,
id=job_id,
args=[task.task_id]
)
elif task.schedule_type == ScheduleType.DAILY:
scheduler.add_job(
self._trigger_task,
"cron",
hour=task.hour,
minute=task.minute,
id=job_id,
args=[task.task_id]
)
elif task.schedule_type == ScheduleType.WEEKLY:
# APScheduler的day_of_week: 0=周一...6=周日
scheduler.add_job(
self._trigger_task,
"cron",
day_of_week=task.day_of_week,
hour=task.hour,
minute=task.minute,
id=job_id,
args=[task.task_id]
)
elif task.schedule_type == ScheduleType.YEARLY:
scheduler.add_job(
self._trigger_task,
"cron",
month=task.month,
day=task.day,
hour=task.hour,
minute=task.minute,
id=job_id,
args=[task.task_id]
)
elif task.schedule_type == ScheduleType.ONCE:
if task.trigger_time and task.trigger_time > datetime.now():
scheduler.add_job(
self._trigger_task,
"date",
run_date=task.trigger_time,
id=job_id,
args=[task.task_id]
)
logger.debug(f"注册定时任务到APScheduler: {job_id}")
async def register_all_jobs(self):
"""注册所有任务到APScheduler"""
logger.info(f"注册 {len(self.tasks)} 个任务到APScheduler")
for task in self.tasks.values():
self._register_job(task)
async def _trigger_task(self, task_id: str):
"""任务触发处理"""
if task_id not in self.tasks:
logger.warning(f"触发的任务不存在: {task_id}")
return
task = self.tasks[task_id]
logger.info(f"定时任务触发: {task_id} - {task.description}")
# 导入配置(避免循环导入)
from .config import ScopedConfig
from nonebot import get_plugin_config
from .config import Config
plugin_config = get_plugin_config(Config).llmchat
# 获取Bot
try:
bots = list(get_driver().bots.values())
if not bots:
logger.error("没有可用的Bot")
return
bot: Bot = bots[0] # type: ignore
except Exception as e:
logger.error(f"获取Bot失败: {e}")
return
# 尝试调用AI生成提醒信息
reminder_message = await self._generate_ai_reminder(task, plugin_config)
# 发送消息
try:
if task.is_group:
await bot.send_group_msg(group_id=task.context_id, message=Message(reminder_message))
else:
await bot.send_private_msg(user_id=task.context_id, message=Message(reminder_message))
logger.info(f"定时任务提醒发送成功: {task_id}")
except Exception as e:
logger.error(f"发送提醒消息失败: {e}")
# 一次性任务触发后删除
if task.schedule_type == ScheduleType.ONCE:
logger.info(f"删除一次性任务: {task_id}")
del self.tasks[task_id]
await self.save_tasks()
async def _generate_ai_reminder(self, task: ScheduledTask, plugin_config) -> str:
"""调用AI生成提醒信息支持调用MCP工具获取实时信息"""
max_retry = plugin_config.scheduler_max_retry
default_reminder = plugin_config.scheduler_default_reminder.format(description=task.description)
# 获取预设配置
preset = None
if task.is_group:
from . import group_states
state = group_states.get(task.context_id)
if state and state.preset_name != "off":
for p in plugin_config.api_presets:
if p.name == state.preset_name:
preset = p
break
else:
from . import private_chat_states
state = private_chat_states.get(task.context_id)
if state and state.preset_name != "off":
for p in plugin_config.api_presets:
if p.name == state.preset_name:
preset = p
break
if not preset:
# 没有配置预设,使用默认提醒
logger.debug("没有可用的API预设使用默认提醒")
return default_reminder
# 构建AI请求
system_prompt = f"""你是一个友好的提醒助手。用户设置了一个定时提醒任务,现在任务触发了。
请根据任务描述生成一条简短、友好的提醒消息。
要求:
- 如果任务描述中涉及需要获取实时信息的内容(如天气、新闻、股票等),你应该先使用相应的工具获取最新信息,然后基于获取到的信息生成提醒
- 消息要简洁,不要太长
- 语气要友好、亲切
- 可以适当使用语气词或颜文字
- 不要有多余的解释,直接发送提醒内容
- 如果使用了工具获取信息,请将关键信息整合到提醒消息中"""
user_prompt = f"任务描述:{task.description}"
# 初始化OpenAI客户端
if preset.proxy:
client = AsyncOpenAI(
base_url=preset.api_base,
api_key=preset.api_key,
timeout=plugin_config.request_timeout,
http_client=httpx.AsyncClient(proxy=preset.proxy),
)
else:
client = AsyncOpenAI(
base_url=preset.api_base,
api_key=preset.api_key,
timeout=plugin_config.request_timeout,
)
# 获取可用工具如果预设支持MCP
available_tools = None
mcp_client = None
if preset.support_mcp:
try:
from .mcpclient import MCPClient
mcp_client = MCPClient.get_instance(plugin_config.mcp_servers)
# 获取MCP工具但不包含OneBot工具和定时任务工具避免在提醒时创建新任务
await mcp_client.init_tools_cache()
available_tools = mcp_client._tools_cache.copy() if mcp_client._tools_cache else []
logger.debug(f"定时任务触发时可用工具数: {len(available_tools)}")
except Exception as e:
logger.warning(f"获取MCP工具列表失败: {e}")
available_tools = None
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
for attempt in range(max_retry):
try:
# 构建请求参数
request_params = {
"model": preset.model_name,
"max_tokens": 512, # 增加token限制以支持工具调用
"temperature": 0.7,
"messages": messages
}
# 如果有可用工具,添加到请求中
if available_tools:
request_params["tools"] = available_tools
response = await client.chat.completions.create(**request_params)
message = response.choices[0].message
# 处理工具调用
while available_tools and mcp_client and message and message.tool_calls:
logger.info(f"定时任务触发时AI调用工具: {[tc.function.name for tc in message.tool_calls]}")
# 添加assistant消息
messages.append({
"role": "assistant",
"tool_calls": [tool_call.model_dump() for tool_call in message.tool_calls]
})
# 处理每个工具调用
for tool_call in message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
logger.debug(f"调用工具: {tool_name}, 参数: {tool_args}")
# 调用MCP工具使用简化的参数因为这里不需要群操作
try:
result = await mcp_client.call_tool(
tool_name,
tool_args,
group_id=task.context_id if task.is_group else None,
bot_id=None,
user_id=task.creator_id,
is_group=task.is_group
)
result_str = str(result) if result else "工具调用成功但无返回结果"
except Exception as e:
logger.error(f"工具调用失败: {e}")
result_str = f"工具调用失败: {e}"
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result_str
})
# 再次调用AI处理工具结果
response = await client.chat.completions.create(**request_params)
message = response.choices[0].message
content = message.content
if content:
logger.debug(f"AI生成提醒成功: {content[:50]}...")
return content.strip()
except Exception as e:
logger.warning(f"AI生成提醒失败 (尝试 {attempt + 1}/{max_retry}): {e}")
if attempt < max_retry - 1:
await asyncio.sleep(1)
# 重试失败,返回默认提醒
logger.warning(f"AI生成提醒全部失败使用默认提醒")
return default_reminder
async def call_tool(
self,
tool_name: str,
tool_args: dict[str, Any],
context_id: int,
is_group: bool,
creator_id: int
) -> str:
"""调用定时任务工具"""
if tool_name == "scheduler__create_task":
success, message = await self.create_task(
context_id=context_id,
is_group=is_group,
creator_id=creator_id,
schedule_type=tool_args.get("schedule_type", ""),
description=tool_args.get("description", ""),
interval_minutes=tool_args.get("interval_minutes"),
hour=tool_args.get("hour"),
minute=tool_args.get("minute"),
day_of_week=tool_args.get("day_of_week"),
month=tool_args.get("month"),
day=tool_args.get("day"),
minutes_later=tool_args.get("minutes_later")
)
return message
elif tool_name == "scheduler__list_tasks":
tasks = self.list_tasks(context_id, is_group)
if not tasks:
return "当前没有定时任务"
return json.dumps(tasks, ensure_ascii=False, indent=2)
elif tool_name == "scheduler__delete_task":
success, message = await self.delete_task(
task_id=tool_args.get("task_id", ""),
context_id=context_id,
is_group=is_group
)
return message
elif tool_name == "scheduler__update_task":
success, message = await self.update_task(
task_id=tool_args.get("task_id", ""),
context_id=context_id,
is_group=is_group,
description=tool_args.get("description", "")
)
return message
return f"未知的工具: {tool_name}"
def get_available_tools(self) -> list[dict[str, Any]]:
"""获取可用工具列表"""
return self.tools.get_available_tools()
def get_friendly_name(self, tool_name: str) -> str:
"""获取工具友好名称"""
return self.tools.get_friendly_name(tool_name)