异步任务
异步任务用于在插件中执行耗时较长的后台流程,而不阻塞主 Agent 的对话能力。适用场景包括代码生成与部署、文件批处理、外部 API 长轮询以及需要暂停等待用户确认的多步骤流程。
与普通沙盒方法的区别
普通沙盒方法在调用时同步执行并阻塞当前会话,异步任务则在后台独立运行,主 Agent 可继续处理其他消息。任务完成后可通过 handle.notify_agent() 或终态回调主动通知。
前置导入
python
from typing import AsyncGenerator
from nekro_agent.api.plugin import (
NekroPlugin,
AsyncTaskHandle,
TaskCtl,
TaskSignal,
task,
SandboxMethodType,
)
from nekro_agent.api.schemas import AgentCtx定义异步任务
使用 plugin.mount_async_task(task_type) 装饰器注册异步任务函数:
python
@plugin.mount_async_task("my_task")
async def my_async_task(
handle: AsyncTaskHandle,
prompt: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
yield TaskCtl.report_progress("开始处理...", 10)
if handle.is_cancelled:
yield TaskCtl.cancel("任务已取消")
return
# 执行实际逻辑
result = await do_work(prompt)
yield TaskCtl.success("处理完成", data={"result": result})任务函数第一个参数固定为 AsyncTaskHandle,后续参数由调用方通过 task.start() 传入。函数通过 yield 上报状态控制信号,以终态信号(success / fail / cancel)结束。
启动任务
在沙盒方法中启动已注册的异步任务:
python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "启动处理任务")
async def start_task(_ctx: AgentCtx, prompt: str) -> str:
task_id = f"task_{int(time.time())}"
await task.start(
task_type="my_task",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
prompt=prompt,
task_id=task_id,
)
return f"任务已在后台启动,ID:{task_id}"task.start() 参数:
| 参数 | 说明 |
|---|---|
task_type | 任务类型名,与 mount_async_task 的参数一致 |
task_id | 任务实例标识,同类型下唯一 |
chat_key | 关联的聊天频道标识 |
plugin | 当前插件实例 |
on_terminal | 可选,终态回调(同步函数) |
**kwargs | 传递给任务函数的其余参数 |
状态控制信号(TaskCtl)
任务通过 yield TaskCtl.xxx() 上报状态:
| 方法 | 终态 | 说明 |
|---|---|---|
TaskCtl.report_progress(msg, percent) | 否 | 上报进度,0-100 |
TaskCtl.success(msg, data=None) | 是 | 任务成功完成 |
TaskCtl.fail(msg, error=None) | 是 | 任务失败 |
TaskCtl.cancel(msg) | 是 | 任务取消 |
任务函数必须以终态信号结束,否则视为异常结束(等同于 fail)。
任务句柄(AsyncTaskHandle)
任务函数接收的 handle 提供以下能力:
| 属性/方法 | 说明 |
|---|---|
handle.task_id | 当前任务 ID |
handle.chat_key | 关联的聊天频道标识 |
handle.plugin | 插件实例 |
handle.is_cancelled | 是否已被外部请求取消 |
await handle.wait(key, timeout) | 暂停任务,等待外部通过 notify() 恢复 |
handle.notify(key, data) | 恢复等待中的指定 key |
await handle.notify_agent(message, trigger) | 向主 Agent 推送消息 |
handle.cancel_wait(key) | 取消特定等待点 |
handle.cancel_all() | 取消所有等待点 |
全局任务控制(task)
通过全局 task 对象管理任务生命周期:
| 方法 | 说明 |
|---|---|
await task.start(...) | 启动任务 |
task.is_running(task_type, task_id) | 检查任务是否在运行 |
await task.cancel(task_type, task_id) | 取消任务 |
await task.stop_all() | 停止所有任务 |
task.get_handle(task_type, task_id) | 获取任务句柄 |
task.get_state(task_type, task_id) | 获取任务最新状态 |
task.get_running_tasks() | 获取运行中的任务列表 |
终态回调
任务到达终态时可触发同步回调,适用于清理资源或发送通知:
python
def on_task_done(ctl: TaskCtl) -> None:
if ctl.signal == TaskSignal.SUCCESS:
import asyncio
asyncio.create_task(notify_user(ctl.data))
await task.start(
task_type="my_task",
task_id=task_id,
chat_key=chat_key,
plugin=plugin,
on_terminal=on_task_done,
prompt=prompt,
task_id=task_id,
)WARNING
on_terminal 回调是同步函数。如需在回调中执行异步操作,使用 asyncio.create_task() 包装,不能直接 await。
等待外部信号
任务可以主动暂停,等待外部通过 notify() 恢复执行:
python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str, task_id: str):
yield TaskCtl.report_progress("等待审批确认...", 50)
approved = await handle.wait("approval", timeout=3600)
if approved:
yield TaskCtl.success("已批准,继续执行")
else:
yield TaskCtl.cancel("审批超时或未通过")由另一侧(通常是另一个命令或沙盒方法)调用:
python
h = task.get_handle("approval_task", task_id)
if h:
h.notify("approval", True)任务完成后通知主 Agent
python
@plugin.mount_async_task("build_task")
async def build_task(handle: AsyncTaskHandle, requirement: str, task_id: str):
yield TaskCtl.report_progress("构建中...", 30)
result_url = await run_build(requirement)
# 把结果推回主 Agent,让其继续处理
await handle.notify_agent(f"构建完成,访问地址:{result_url}")
yield TaskCtl.success("构建任务完成")插件卸载时的清理
在插件禁用时主动停止所有相关任务:
python
@plugin.on_disabled()
async def cleanup():
stopped = await task.stop_all()
if stopped > 0:
plugin.logger.info(f"已停止 {stopped} 个运行中的任务")