异步任务工具
WARNING
实验性功能:此功能仅在 nekro-agent v2.2.0+ 版本中可用,API 可能在后续版本中有所调整。
概述
异步任务工具允许插件执行长时间运行的后台任务,而不会阻塞主 AI 的对话能力。
使用场景
- Web 应用开发: AI 在后台生成完整应用,前端继续对话
- 视频/图片生成: 耗时内容生成不中断对话
- 数据处理: 大规模数据处理在后台运行
- 外部 API 调用: 等待长时间 API 响应
核心特性
- ✅ 非阻塞: 主 AI 可继续与用户对话
- ✅ 状态追踪: 实时报告任务进度
- ✅ 可中断: 支持取消正在运行的任务
- ✅ 双向通信: 任务可通知主 Agent
- ✅ 终态回调: 任务完成/失败/取消时执行回调
核心 API
1. 定义异步任务
使用 mount_async_task 装饰器注册异步任务函数:
python
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator
plugin = NekroPlugin(
name="我的异步插件",
module_name="my_async_plugin",
description="异步任务示例插件",
version="1.0.0",
author="开发者",
url="https://github.com/xxx",
)
@plugin.mount_async_task("my_task")
async def my_async_task(
handle: AsyncTaskHandle,
prompt: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""异步任务示例"""
# 报告进度
yield TaskCtl.report_progress("🔍 分析需求...", 10)
# 检查是否被取消
if handle.is_cancelled:
yield TaskCtl.cancel("任务已取消")
return
# 执行任务步骤
yield TaskCtl.report_progress("⚙️ 处理中...", 50)
# 任务完成
yield TaskCtl.success("处理完成", data={"result": "success"})2. 任务控制信号 (TaskCtl)
任务通过 yield TaskCtl.xxx() 报告状态:
| 方法 | 说明 | 终态 |
|---|---|---|
TaskCtl.report_progress(msg, percent) | 报告进度 | ❌ |
TaskCtl.success(msg, data) | 成功完成 | ✅ |
TaskCtl.fail(msg, error) | 任务失败 | ✅ |
TaskCtl.cancel(msg) | 任务取消 | ✅ |
TaskCtl 属性:
python
ctl = TaskCtl.success("完成", data={"url": "..."})
ctl.signal # TaskSignal.SUCCESS
ctl.message # "完成"
ctl.data # {"url": "..."}
ctl.progress # None (仅 report_progress 时有值)
ctl.is_terminal # True (是否为终态)3. 任务句柄 (AsyncTaskHandle)
任务函数接收 AsyncTaskHandle 参数,提供任务控制能力:
| 属性/方法 | 说明 |
|---|---|
handle.task_id | 任务 ID |
handle.chat_key | 会话 Key |
handle.plugin | 插件实例 |
handle.is_cancelled | 检查是否已取消 |
await handle.wait(key, timeout) | 等待外部信号 |
handle.notify(key, data) | 通知等待点恢复 |
await handle.notify_agent(message, trigger) | 通知主 Agent |
handle.cancel_wait(key) | 取消特定等待点 |
handle.cancel_all() | 取消所有等待点 |
4. 全局任务 API (task)
通过 task 全局对象控制任务:
| 方法 | 说明 |
|---|---|
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback) | 启动任务 |
task.is_running(type, id) | 检查是否运行 |
await task.cancel(type, id) | 取消任务 |
await task.stop_all() | 停止所有任务 |
task.get_handle(type, id) | 获取任务句柄 |
task.get_state(type, id) | 获取任务最新状态 |
task.get_running_tasks() | 获取运行中的任务列表 |
终态回调 (on_terminal)
v2.2.0 新增:任务完成时可执行回调函数,用于清理资源、发送通知等。
基本用法
python
from nekro_agent.services.plugin.task import TaskCtl, task
def handle_task_complete(ctl: TaskCtl) -> None:
"""任务完成回调"""
if ctl.signal == TaskSignal.SUCCESS:
print(f"✅ 任务成功: {ctl.message}")
print(f" 结果数据: {ctl.data}")
elif ctl.signal == TaskSignal.FAIL:
print(f"❌ 任务失败: {ctl.message}")
elif ctl.signal == TaskSignal.CANCEL:
print(f"⚠️ 任务取消: {ctl.message}")
# 启动任务时传入回调
await task.start(
task_type="my_task",
task_id="T001",
chat_key=_ctx.chat_key,
plugin=plugin,
prompt="生成报告",
on_terminal=handle_task_complete, # 终态回调
)回调触发时机
回调在以下情况触发:
| 触发条件 | TaskCtl 信号 |
|---|---|
任务 yield TaskCtl.success() | TaskSignal.SUCCESS |
任务 yield TaskCtl.fail() | TaskSignal.FAIL |
任务 yield TaskCtl.cancel() | TaskSignal.CANCEL |
外部调用 task.cancel() | TaskSignal.CANCEL |
| 任务抛出异常 | TaskSignal.FAIL |
实际应用示例
python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "创建开发任务")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
"""创建开发任务"""
task_id = f"DEV_{int(time.time())}"
def _on_terminal(ctl: TaskCtl) -> None:
"""任务完成后自动通知"""
import asyncio
from nekro_agent.services.message_service import message_service
if ctl.signal == TaskSignal.SUCCESS:
url = ctl.data.get("url", "")
msg = f"✅ 开发任务完成!\n🔗 {url}"
else:
msg = f"❌ 开发任务失败: {ctl.message}"
# 在回调中发送异步消息
asyncio.create_task(
message_service.push_system_message(
chat_key=_ctx.chat_key,
agent_messages=msg,
trigger_agent=True,
)
)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
on_terminal=_on_terminal,
)
return f"任务已启动: {task_id}"完整示例:WebApp 开发任务
定义异步任务
python
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
handle: AsyncTaskHandle,
requirement: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""WebApp 开发异步任务"""
yield TaskCtl.report_progress("🚀 开始开发...", 0)
# 执行开发
success, result = await run_developer_loop(requirement)
if success:
yield TaskCtl.report_progress("📦 编译中...", 70)
yield TaskCtl.success("部署成功", data={"url": url})
else:
yield TaskCtl.fail(f"开发失败: {result}")启动任务
python
from nekro_agent.api.schemas import AgentCtx
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "创建WebApp任务")
async def create_webapp_task(
_ctx: AgentCtx,
requirement: str,
) -> str:
"""创建 WebApp 开发任务"""
task_id = generate_task_id()
# 启动异步任务(不阻塞)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
)
return f"任务已启动: {task_id}"与主 Agent 交互
python
# 任务完成后通知主 Agent
await handle.notify_agent(
f"✅ WebApp 部署成功!\n🔗 {url}"
)
# 等待用户确认
approved = await handle.wait("user_confirm", timeout=300)
if not approved:
yield TaskCtl.cancel("用户未确认")
return最佳实践
1. 任务设计
python
# ✅ 推荐:清晰的任务阶段
yield TaskCtl.report_progress("📥 下载资源...", 20)
yield TaskCtl.report_progress("🔄 处理中...", 60)
yield TaskCtl.report_progress("💾 保存中...", 90)
# ❌ 不推荐:模糊的进度
yield TaskCtl.report_progress("处理中...", 50)
yield TaskCtl.report_progress("快完成了...", 80)2. 错误处理
python
try:
result = await external_api()
yield TaskCtl.success("完成", data=result)
except TimeoutError:
yield TaskCtl.fail("API 超时")
except Exception as e:
yield TaskCtl.fail(f"错误: {e}")3. 资源清理
python
@plugin.on_disabled()
async def cleanup():
count = await task.stop_all()
if count > 0:
logger.info(f"已停止 {count} 个任务")4. 并发控制
python
MAX_CONCURRENT_TASKS = 3
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
if active >= MAX_CONCURRENT_TASKS:
raise ValueError(f"已达最大并发数 ({MAX_CONCURRENT_TASKS})")
await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
return f"任务已启动: {task_id}"5. 终态回调注意事项
python
# ✅ 推荐:回调中处理异步操作
def _on_terminal(ctl: TaskCtl) -> None:
import asyncio
asyncio.create_task(send_notification(ctl))
# ❌ 不推荐:回调中直接 await(回调是同步函数)
def _on_terminal(ctl: TaskCtl) -> None:
await send_notification(ctl) # 错误!常见问题
Q: 异步任务和普通沙盒方法有什么区别?
| 对比项 | 沙盒方法 | 异步任务 |
|---|---|---|
| 执行时机 | 调用时立即执行 | 调用后后台运行 |
| 阻塞主对话 | 阻塞 | 不阻塞 |
| 状态追踪 | 无 | 完整进度报告 |
| 长时间运行 | 不适合 | 适合 |
| 可取消 | 不可 | 可取消 |
| 终态回调 | 不支持 | 支持 |
Q: 如何让任务等待用户确认?
python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
yield TaskCtl.report_progress("⏳ 等待审批...", 50)
# 等待外部调用 notify()
approved = await handle.wait("approval", timeout=3600)
if approved:
yield TaskCtl.success("已批准")
else:
yield TaskCtl.fail("审批超时或被拒绝")Q: 任务完成后如何通知用户?
方式一:在任务内部通知
python
await handle.notify_agent(
f"✅ 任务完成!\n结果: {result}"
)方式二:使用终态回调
python
def _on_terminal(ctl: TaskCtl) -> None:
if ctl.signal == TaskSignal.SUCCESS:
asyncio.create_task(
message_service.push_system_message(...)
)
await task.start(..., on_terminal=_on_terminal)Q: 回调函数可以是异步的吗?
不可以。on_terminal 回调必须是同步函数。如果需要执行异步操作,请使用 asyncio.create_task() 包装:
python
def _on_terminal(ctl: TaskCtl) -> None:
asyncio.create_task(async_cleanup(ctl))版本信息
- 功能版本: nekro-agent v2.2.0+
- 文档版本: 1.1.0
- 最后更新: 2026-01-31
