Skip to content

异步任务

异步任务用于在插件中执行耗时较长的后台流程,而不阻塞主 Agent 的对话能力。适用场景包括代码生成与部署、文件批处理、外部 API 长轮询以及需要暂停等待用户确认的多步骤流程。

与普通沙盒方法的区别

普通沙盒方法在调用时同步执行并阻塞当前会话,异步任务则在后台独立运行,主 Agent 可继续处理其他消息。任务完成后可通过 handle.notify_agent() 或终态回调主动通知。

前置导入

python
from typing import AsyncGenerator
from nekro_agent.api.plugin import (
    NekroPlugin,
    AsyncTaskHandle,
    TaskCtl,
    TaskSignal,
    task,
    SandboxMethodType,
)
from nekro_agent.api.schemas import AgentCtx

定义异步任务

使用 plugin.mount_async_task(task_type) 装饰器注册异步任务函数:

python
@plugin.mount_async_task("my_task")
async def my_async_task(
    handle: AsyncTaskHandle,
    prompt: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    yield TaskCtl.report_progress("开始处理...", 10)

    if handle.is_cancelled:
        yield TaskCtl.cancel("任务已取消")
        return

    # 执行实际逻辑
    result = await do_work(prompt)

    yield TaskCtl.success("处理完成", data={"result": result})

任务函数第一个参数固定为 AsyncTaskHandle,后续参数由调用方通过 task.start() 传入。函数通过 yield 上报状态控制信号,以终态信号(success / fail / cancel)结束。

启动任务

在沙盒方法中启动已注册的异步任务:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "启动处理任务")
async def start_task(_ctx: AgentCtx, prompt: str) -> str:
    task_id = f"task_{int(time.time())}"

    await task.start(
        task_type="my_task",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        prompt=prompt,
        task_id=task_id,
    )

    return f"任务已在后台启动,ID:{task_id}"

task.start() 参数:

参数说明
task_type任务类型名,与 mount_async_task 的参数一致
task_id任务实例标识,同类型下唯一
chat_key关联的聊天频道标识
plugin当前插件实例
on_terminal可选,终态回调(同步函数)
**kwargs传递给任务函数的其余参数

状态控制信号(TaskCtl)

任务通过 yield TaskCtl.xxx() 上报状态:

方法终态说明
TaskCtl.report_progress(msg, percent)上报进度,0-100
TaskCtl.success(msg, data=None)任务成功完成
TaskCtl.fail(msg, error=None)任务失败
TaskCtl.cancel(msg)任务取消

任务函数必须以终态信号结束,否则视为异常结束(等同于 fail)。

任务句柄(AsyncTaskHandle)

任务函数接收的 handle 提供以下能力:

属性/方法说明
handle.task_id当前任务 ID
handle.chat_key关联的聊天频道标识
handle.plugin插件实例
handle.is_cancelled是否已被外部请求取消
await handle.wait(key, timeout)暂停任务,等待外部通过 notify() 恢复
handle.notify(key, data)恢复等待中的指定 key
await handle.notify_agent(message, trigger)向主 Agent 推送消息
handle.cancel_wait(key)取消特定等待点
handle.cancel_all()取消所有等待点

全局任务控制(task)

通过全局 task 对象管理任务生命周期:

方法说明
await task.start(...)启动任务
task.is_running(task_type, task_id)检查任务是否在运行
await task.cancel(task_type, task_id)取消任务
await task.stop_all()停止所有任务
task.get_handle(task_type, task_id)获取任务句柄
task.get_state(task_type, task_id)获取任务最新状态
task.get_running_tasks()获取运行中的任务列表

终态回调

任务到达终态时可触发同步回调,适用于清理资源或发送通知:

python
def on_task_done(ctl: TaskCtl) -> None:
    if ctl.signal == TaskSignal.SUCCESS:
        import asyncio
        asyncio.create_task(notify_user(ctl.data))

await task.start(
    task_type="my_task",
    task_id=task_id,
    chat_key=chat_key,
    plugin=plugin,
    on_terminal=on_task_done,
    prompt=prompt,
    task_id=task_id,
)

WARNING

on_terminal 回调是同步函数。如需在回调中执行异步操作,使用 asyncio.create_task() 包装,不能直接 await

等待外部信号

任务可以主动暂停,等待外部通过 notify() 恢复执行:

python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str, task_id: str):
    yield TaskCtl.report_progress("等待审批确认...", 50)

    approved = await handle.wait("approval", timeout=3600)

    if approved:
        yield TaskCtl.success("已批准,继续执行")
    else:
        yield TaskCtl.cancel("审批超时或未通过")

由另一侧(通常是另一个命令或沙盒方法)调用:

python
h = task.get_handle("approval_task", task_id)
if h:
    h.notify("approval", True)

任务完成后通知主 Agent

python
@plugin.mount_async_task("build_task")
async def build_task(handle: AsyncTaskHandle, requirement: str, task_id: str):
    yield TaskCtl.report_progress("构建中...", 30)

    result_url = await run_build(requirement)

    # 把结果推回主 Agent,让其继续处理
    await handle.notify_agent(f"构建完成,访问地址:{result_url}")

    yield TaskCtl.success("构建任务完成")

插件卸载时的清理

在插件禁用时主动停止所有相关任务:

python
@plugin.on_disabled()
async def cleanup():
    stopped = await task.stop_all()
    if stopped > 0:
        plugin.logger.info(f"已停止 {stopped} 个运行中的任务")

相关文档