Skip to content

异步任务工具

WARNING

实验性功能:此功能仅在 nekro-agent v2.2.0+ 版本中可用,API 可能在后续版本中有所调整。

概述

异步任务工具允许插件执行长时间运行的后台任务,而不会阻塞主 AI 的对话能力。

使用场景

  • Web 应用开发: AI 在后台生成完整应用,前端继续对话
  • 视频/图片生成: 耗时内容生成不中断对话
  • 数据处理: 大规模数据处理在后台运行
  • 外部 API 调用: 等待长时间 API 响应

核心特性

  • 非阻塞: 主 AI 可继续与用户对话
  • 状态追踪: 实时报告任务进度
  • 可中断: 支持取消正在运行的任务
  • 双向通信: 任务可通知主 Agent
  • 终态回调: 任务完成/失败/取消时执行回调

核心 API

1. 定义异步任务

使用 mount_async_task 装饰器注册异步任务函数:

python
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator

plugin = NekroPlugin(
    name="我的异步插件",
    module_name="my_async_plugin",
    description="异步任务示例插件",
    version="1.0.0",
    author="开发者",
    url="https://github.com/xxx",
)

@plugin.mount_async_task("my_task")
async def my_async_task(
    handle: AsyncTaskHandle,
    prompt: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """异步任务示例"""
    # 报告进度
    yield TaskCtl.report_progress("🔍 分析需求...", 10)

    # 检查是否被取消
    if handle.is_cancelled:
        yield TaskCtl.cancel("任务已取消")
        return

    # 执行任务步骤
    yield TaskCtl.report_progress("⚙️ 处理中...", 50)

    # 任务完成
    yield TaskCtl.success("处理完成", data={"result": "success"})

2. 任务控制信号 (TaskCtl)

任务通过 yield TaskCtl.xxx() 报告状态:

方法说明终态
TaskCtl.report_progress(msg, percent)报告进度
TaskCtl.success(msg, data)成功完成
TaskCtl.fail(msg, error)任务失败
TaskCtl.cancel(msg)任务取消

TaskCtl 属性

python
ctl = TaskCtl.success("完成", data={"url": "..."})

ctl.signal      # TaskSignal.SUCCESS
ctl.message     # "完成"
ctl.data        # {"url": "..."}
ctl.progress    # None (仅 report_progress 时有值)
ctl.is_terminal # True (是否为终态)

3. 任务句柄 (AsyncTaskHandle)

任务函数接收 AsyncTaskHandle 参数,提供任务控制能力:

属性/方法说明
handle.task_id任务 ID
handle.chat_key会话 Key
handle.plugin插件实例
handle.is_cancelled检查是否已取消
await handle.wait(key, timeout)等待外部信号
handle.notify(key, data)通知等待点恢复
await handle.notify_agent(message, trigger)通知主 Agent
handle.cancel_wait(key)取消特定等待点
handle.cancel_all()取消所有等待点

4. 全局任务 API (task)

通过 task 全局对象控制任务:

方法说明
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback)启动任务
task.is_running(type, id)检查是否运行
await task.cancel(type, id)取消任务
await task.stop_all()停止所有任务
task.get_handle(type, id)获取任务句柄
task.get_state(type, id)获取任务最新状态
task.get_running_tasks()获取运行中的任务列表

终态回调 (on_terminal)

v2.2.0 新增:任务完成时可执行回调函数,用于清理资源、发送通知等。

基本用法

python
from nekro_agent.services.plugin.task import TaskCtl, task

def handle_task_complete(ctl: TaskCtl) -> None:
    """任务完成回调"""
    if ctl.signal == TaskSignal.SUCCESS:
        print(f"✅ 任务成功: {ctl.message}")
        print(f"   结果数据: {ctl.data}")
    elif ctl.signal == TaskSignal.FAIL:
        print(f"❌ 任务失败: {ctl.message}")
    elif ctl.signal == TaskSignal.CANCEL:
        print(f"⚠️ 任务取消: {ctl.message}")

# 启动任务时传入回调
await task.start(
    task_type="my_task",
    task_id="T001",
    chat_key=_ctx.chat_key,
    plugin=plugin,
    prompt="生成报告",
    on_terminal=handle_task_complete,  # 终态回调
)

回调触发时机

回调在以下情况触发:

触发条件TaskCtl 信号
任务 yield TaskCtl.success()TaskSignal.SUCCESS
任务 yield TaskCtl.fail()TaskSignal.FAIL
任务 yield TaskCtl.cancel()TaskSignal.CANCEL
外部调用 task.cancel()TaskSignal.CANCEL
任务抛出异常TaskSignal.FAIL

实际应用示例

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "创建开发任务")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
    """创建开发任务"""
    task_id = f"DEV_{int(time.time())}"
    
    def _on_terminal(ctl: TaskCtl) -> None:
        """任务完成后自动通知"""
        import asyncio
        from nekro_agent.services.message_service import message_service
        
        if ctl.signal == TaskSignal.SUCCESS:
            url = ctl.data.get("url", "")
            msg = f"✅ 开发任务完成!\n🔗 {url}"
        else:
            msg = f"❌ 开发任务失败: {ctl.message}"
        
        # 在回调中发送异步消息
        asyncio.create_task(
            message_service.push_system_message(
                chat_key=_ctx.chat_key,
                agent_messages=msg,
                trigger_agent=True,
            )
        )
    
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
        on_terminal=_on_terminal,
    )
    
    return f"任务已启动: {task_id}"

完整示例:WebApp 开发任务

定义异步任务

python
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
    handle: AsyncTaskHandle,
    requirement: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """WebApp 开发异步任务"""
    yield TaskCtl.report_progress("🚀 开始开发...", 0)

    # 执行开发
    success, result = await run_developer_loop(requirement)

    if success:
        yield TaskCtl.report_progress("📦 编译中...", 70)
        yield TaskCtl.success("部署成功", data={"url": url})
    else:
        yield TaskCtl.fail(f"开发失败: {result}")

启动任务

python
from nekro_agent.api.schemas import AgentCtx

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "创建WebApp任务")
async def create_webapp_task(
    _ctx: AgentCtx,
    requirement: str,
) -> str:
    """创建 WebApp 开发任务"""
    task_id = generate_task_id()

    # 启动异步任务(不阻塞)
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
    )

    return f"任务已启动: {task_id}"

与主 Agent 交互

python
# 任务完成后通知主 Agent
await handle.notify_agent(
    f"✅ WebApp 部署成功!\n🔗 {url}"
)

# 等待用户确认
approved = await handle.wait("user_confirm", timeout=300)
if not approved:
    yield TaskCtl.cancel("用户未确认")
    return

最佳实践

1. 任务设计

python
# ✅ 推荐:清晰的任务阶段
yield TaskCtl.report_progress("📥 下载资源...", 20)
yield TaskCtl.report_progress("🔄 处理中...", 60)
yield TaskCtl.report_progress("💾 保存中...", 90)

# ❌ 不推荐:模糊的进度
yield TaskCtl.report_progress("处理中...", 50)
yield TaskCtl.report_progress("快完成了...", 80)

2. 错误处理

python
try:
    result = await external_api()
    yield TaskCtl.success("完成", data=result)
except TimeoutError:
    yield TaskCtl.fail("API 超时")
except Exception as e:
    yield TaskCtl.fail(f"错误: {e}")

3. 资源清理

python
@plugin.on_disabled()
async def cleanup():
    count = await task.stop_all()
    if count > 0:
        logger.info(f"已停止 {count} 个任务")

4. 并发控制

python
MAX_CONCURRENT_TASKS = 3

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
    active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
    if active >= MAX_CONCURRENT_TASKS:
        raise ValueError(f"已达最大并发数 ({MAX_CONCURRENT_TASKS})")

    await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
    return f"任务已启动: {task_id}"

5. 终态回调注意事项

python
# ✅ 推荐:回调中处理异步操作
def _on_terminal(ctl: TaskCtl) -> None:
    import asyncio
    asyncio.create_task(send_notification(ctl))

# ❌ 不推荐:回调中直接 await(回调是同步函数)
def _on_terminal(ctl: TaskCtl) -> None:
    await send_notification(ctl)  # 错误!

常见问题

Q: 异步任务和普通沙盒方法有什么区别?

对比项沙盒方法异步任务
执行时机调用时立即执行调用后后台运行
阻塞主对话阻塞不阻塞
状态追踪完整进度报告
长时间运行不适合适合
可取消不可可取消
终态回调不支持支持

Q: 如何让任务等待用户确认?

python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
    yield TaskCtl.report_progress("⏳ 等待审批...", 50)

    # 等待外部调用 notify()
    approved = await handle.wait("approval", timeout=3600)

    if approved:
        yield TaskCtl.success("已批准")
    else:
        yield TaskCtl.fail("审批超时或被拒绝")

Q: 任务完成后如何通知用户?

方式一:在任务内部通知

python
await handle.notify_agent(
    f"✅ 任务完成!\n结果: {result}"
)

方式二:使用终态回调

python
def _on_terminal(ctl: TaskCtl) -> None:
    if ctl.signal == TaskSignal.SUCCESS:
        asyncio.create_task(
            message_service.push_system_message(...)
        )

await task.start(..., on_terminal=_on_terminal)

Q: 回调函数可以是异步的吗?

不可以。on_terminal 回调必须是同步函数。如果需要执行异步操作,请使用 asyncio.create_task() 包装:

python
def _on_terminal(ctl: TaskCtl) -> None:
    asyncio.create_task(async_cleanup(ctl))

版本信息

  • 功能版本: nekro-agent v2.2.0+
  • 文档版本: 1.1.0
  • 最后更新: 2026-01-31