Async Tasks
WARNING
Experimental Feature: This feature is only available in nekro-agent v2.2.0+ and the API may be adjusted in future versions.
Overview
Async tasks allow plugins to execute long-running background tasks without blocking the main AI's conversation capability.
Use Cases
- Web Application Development: AI generates complete applications in the background while continuing to chat
- Video/Image Generation: Time-consuming content generation without interrupting conversations
- Data Processing: Large-scale data processing runs in the background
- External API Calls: Waiting for long-running API responses
Core Features
- ✅ Non-blocking: Main AI can continue conversing with users
- ✅ Status Tracking: Real-time progress reporting
- ✅ Interruptible: Support for canceling running tasks
- ✅ Bidirectional Communication: Tasks can notify the main Agent
- ✅ Terminal Callbacks: Execute callbacks when task completes/fails/cancels
Core API
1. Define Async Task
Use the mount_async_task decorator to register async task functions:
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator
plugin = NekroPlugin(
name="My Async Plugin",
module_name="my_async_plugin",
description="Async task example plugin",
version="1.0.0",
author="Developer",
url="https://github.com/xxx",
)
@plugin.mount_async_task("my_task")
async def my_async_task(
handle: AsyncTaskHandle,
prompt: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""Async task example"""
# Report progress
yield TaskCtl.report_progress("🔍 Analyzing requirements...", 10)
# Check if cancelled
if handle.is_cancelled:
yield TaskCtl.cancel("Task cancelled")
return
# Execute task steps
yield TaskCtl.report_progress("⚙️ Processing...", 50)
# Task complete
yield TaskCtl.success("Processing complete", data={"result": "success"})2. Task Control Signals (TaskCtl)
Tasks report status via yield TaskCtl.xxx():
| Method | Description | Terminal |
|---|---|---|
TaskCtl.report_progress(msg, percent) | Report progress | ❌ |
TaskCtl.success(msg, data) | Success completion | ✅ |
TaskCtl.fail(msg, error) | Task failed | ✅ |
TaskCtl.cancel(msg) | Task cancelled | ✅ |
TaskCtl Properties:
ctl = TaskCtl.success("Done", data={"url": "..."})
ctl.signal # TaskSignal.SUCCESS
ctl.message # "Done"
ctl.data # {"url": "..."}
ctl.progress # None (only set for report_progress)
ctl.is_terminal # True (whether it's a terminal state)3. Task Handle (AsyncTaskHandle)
Task functions receive an AsyncTaskHandle parameter for task control:
| Property/Method | Description |
|---|---|
handle.task_id | Task ID |
handle.chat_key | Chat Key |
handle.plugin | Plugin instance |
handle.is_cancelled | Check if cancelled |
await handle.wait(key, timeout) | Wait for external signal |
handle.notify(key, data) | Resume wait point |
await handle.notify_agent(message, trigger) | Notify main Agent |
handle.cancel_wait(key) | Cancel specific wait point |
handle.cancel_all() | Cancel all wait points |
4. Global Task API (task)
Control tasks via the global task object:
| Method | Description |
|---|---|
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback) | Start task |
task.is_running(type, id) | Check if running |
await task.cancel(type, id) | Cancel task |
await task.stop_all() | Stop all tasks |
task.get_handle(type, id) | Get task handle |
task.get_state(type, id) | Get latest task state |
task.get_running_tasks() | Get list of running tasks |
Terminal Callbacks (on_terminal)
New in v2.2.0: Execute callback functions when task completes, for resource cleanup, notifications, etc.
Basic Usage
from nekro_agent.services.plugin.task import TaskCtl, task
def handle_task_complete(ctl: TaskCtl) -> None:
"""Task completion callback"""
if ctl.signal == TaskSignal.SUCCESS:
print(f"✅ Task succeeded: {ctl.message}")
print(f" Result data: {ctl.data}")
elif ctl.signal == TaskSignal.FAIL:
print(f"❌ Task failed: {ctl.message}")
elif ctl.signal == TaskSignal.CANCEL:
print(f"⚠️ Task cancelled: {ctl.message}")
# Pass callback when starting task
await task.start(
task_type="my_task",
task_id="T001",
chat_key=_ctx.chat_key,
plugin=plugin,
prompt="Generate report",
on_terminal=handle_task_complete, # Terminal callback
)Callback Trigger Conditions
Callbacks are triggered in these situations:
| Trigger Condition | TaskCtl Signal |
|---|---|
Task yields TaskCtl.success() | TaskSignal.SUCCESS |
Task yields TaskCtl.fail() | TaskSignal.FAIL |
Task yields TaskCtl.cancel() | TaskSignal.CANCEL |
External call to task.cancel() | TaskSignal.CANCEL |
| Task throws exception | TaskSignal.FAIL |
Practical Example
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "Create Dev Task")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
"""Create development task"""
task_id = f"DEV_{int(time.time())}"
def _on_terminal(ctl: TaskCtl) -> None:
"""Auto-notify on task completion"""
import asyncio
from nekro_agent.services.message_service import message_service
if ctl.signal == TaskSignal.SUCCESS:
url = ctl.data.get("url", "")
msg = f"✅ Dev task complete!\n🔗 {url}"
else:
msg = f"❌ Dev task failed: {ctl.message}"
# Send async message in callback
asyncio.create_task(
message_service.push_system_message(
chat_key=_ctx.chat_key,
agent_messages=msg,
trigger_agent=True,
)
)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
on_terminal=_on_terminal,
)
return f"Task started: {task_id}"Complete Example: WebApp Development Task
Define Async Task
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
handle: AsyncTaskHandle,
requirement: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""WebApp development async task"""
yield TaskCtl.report_progress("🚀 Starting development...", 0)
# Execute development
success, result = await run_developer_loop(requirement)
if success:
yield TaskCtl.report_progress("📦 Compiling...", 70)
yield TaskCtl.success("Deployment successful", data={"url": url})
else:
yield TaskCtl.fail(f"Development failed: {result}")Start Task
from nekro_agent.api.schemas import AgentCtx
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "Create WebApp Task")
async def create_webapp_task(
_ctx: AgentCtx,
requirement: str,
) -> str:
"""Create WebApp development task"""
task_id = generate_task_id()
# Start async task (non-blocking)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
)
return f"Task started: {task_id}"Best Practices
1. Task Design
# ✅ Recommended: Clear task phases
yield TaskCtl.report_progress("📥 Downloading resources...", 20)
yield TaskCtl.report_progress("🔄 Processing...", 60)
yield TaskCtl.report_progress("💾 Saving...", 90)
# ❌ Not recommended: Vague progress
yield TaskCtl.report_progress("Processing...", 50)
yield TaskCtl.report_progress("Almost done...", 80)2. Error Handling
try:
result = await external_api()
yield TaskCtl.success("Done", data=result)
except TimeoutError:
yield TaskCtl.fail("API timeout")
except Exception as e:
yield TaskCtl.fail(f"Error: {e}")3. Resource Cleanup
@plugin.on_disabled()
async def cleanup():
count = await task.stop_all()
if count > 0:
logger.info(f"Stopped {count} tasks")4. Concurrency Control
MAX_CONCURRENT_TASKS = 3
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
if active >= MAX_CONCURRENT_TASKS:
raise ValueError(f"Max concurrent tasks reached ({MAX_CONCURRENT_TASKS})")
await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
return f"Task started: {task_id}"5. Terminal Callback Notes
# ✅ Recommended: Handle async operations in callback
def _on_terminal(ctl: TaskCtl) -> None:
import asyncio
asyncio.create_task(send_notification(ctl))
# ❌ Not recommended: Direct await in callback (callback is synchronous)
def _on_terminal(ctl: TaskCtl) -> None:
await send_notification(ctl) # Error!FAQ
Q: What's the difference between async tasks and regular sandbox methods?
| Comparison | Sandbox Method | Async Task |
|---|---|---|
| Execution | Immediate on call | Background after call |
| Blocks main chat | Yes | No |
| Status tracking | None | Full progress reporting |
| Long-running | Not suitable | Suitable |
| Cancellable | No | Yes |
| Terminal callback | Not supported | Supported |
Q: How to make a task wait for user confirmation?
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
yield TaskCtl.report_progress("⏳ Waiting for approval...", 50)
# Wait for external notify() call
approved = await handle.wait("approval", timeout=3600)
if approved:
yield TaskCtl.success("Approved")
else:
yield TaskCtl.fail("Approval timeout or rejected")Q: Can the callback function be async?
No. The on_terminal callback must be a synchronous function. If you need to perform async operations, wrap them with asyncio.create_task():
def _on_terminal(ctl: TaskCtl) -> None:
asyncio.create_task(async_cleanup(ctl))Version Information
- Feature Version: nekro-agent v2.2.0+
- Documentation Version: 1.1.0
- Last Updated: 2026-01-31
