Skip to content

Async Tasks

WARNING

Experimental Feature: This feature is only available in nekro-agent v2.2.0+ and the API may be adjusted in future versions.

Overview

Async tasks allow plugins to execute long-running background tasks without blocking the main AI's conversation capability.

Use Cases

  • Web Application Development: AI generates complete applications in the background while continuing to chat
  • Video/Image Generation: Time-consuming content generation without interrupting conversations
  • Data Processing: Large-scale data processing runs in the background
  • External API Calls: Waiting for long-running API responses

Core Features

  • Non-blocking: Main AI can continue conversing with users
  • Status Tracking: Real-time progress reporting
  • Interruptible: Support for canceling running tasks
  • Bidirectional Communication: Tasks can notify the main Agent
  • Terminal Callbacks: Execute callbacks when task completes/fails/cancels

Core API

1. Define Async Task

Use the mount_async_task decorator to register async task functions:

python
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator

plugin = NekroPlugin(
    name="My Async Plugin",
    module_name="my_async_plugin",
    description="Async task example plugin",
    version="1.0.0",
    author="Developer",
    url="https://github.com/xxx",
)

@plugin.mount_async_task("my_task")
async def my_async_task(
    handle: AsyncTaskHandle,
    prompt: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """Async task example"""
    # Report progress
    yield TaskCtl.report_progress("🔍 Analyzing requirements...", 10)

    # Check if cancelled
    if handle.is_cancelled:
        yield TaskCtl.cancel("Task cancelled")
        return

    # Execute task steps
    yield TaskCtl.report_progress("⚙️ Processing...", 50)

    # Task complete
    yield TaskCtl.success("Processing complete", data={"result": "success"})

2. Task Control Signals (TaskCtl)

Tasks report status via yield TaskCtl.xxx():

MethodDescriptionTerminal
TaskCtl.report_progress(msg, percent)Report progress
TaskCtl.success(msg, data)Success completion
TaskCtl.fail(msg, error)Task failed
TaskCtl.cancel(msg)Task cancelled

TaskCtl Properties:

python
ctl = TaskCtl.success("Done", data={"url": "..."})

ctl.signal      # TaskSignal.SUCCESS
ctl.message     # "Done"
ctl.data        # {"url": "..."}
ctl.progress    # None (only set for report_progress)
ctl.is_terminal # True (whether it's a terminal state)

3. Task Handle (AsyncTaskHandle)

Task functions receive an AsyncTaskHandle parameter for task control:

Property/MethodDescription
handle.task_idTask ID
handle.chat_keyChat Key
handle.pluginPlugin instance
handle.is_cancelledCheck if cancelled
await handle.wait(key, timeout)Wait for external signal
handle.notify(key, data)Resume wait point
await handle.notify_agent(message, trigger)Notify main Agent
handle.cancel_wait(key)Cancel specific wait point
handle.cancel_all()Cancel all wait points

4. Global Task API (task)

Control tasks via the global task object:

MethodDescription
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback)Start task
task.is_running(type, id)Check if running
await task.cancel(type, id)Cancel task
await task.stop_all()Stop all tasks
task.get_handle(type, id)Get task handle
task.get_state(type, id)Get latest task state
task.get_running_tasks()Get list of running tasks

Terminal Callbacks (on_terminal)

New in v2.2.0: Execute callback functions when task completes, for resource cleanup, notifications, etc.

Basic Usage

python
from nekro_agent.services.plugin.task import TaskCtl, task

def handle_task_complete(ctl: TaskCtl) -> None:
    """Task completion callback"""
    if ctl.signal == TaskSignal.SUCCESS:
        print(f"✅ Task succeeded: {ctl.message}")
        print(f"   Result data: {ctl.data}")
    elif ctl.signal == TaskSignal.FAIL:
        print(f"❌ Task failed: {ctl.message}")
    elif ctl.signal == TaskSignal.CANCEL:
        print(f"⚠️ Task cancelled: {ctl.message}")

# Pass callback when starting task
await task.start(
    task_type="my_task",
    task_id="T001",
    chat_key=_ctx.chat_key,
    plugin=plugin,
    prompt="Generate report",
    on_terminal=handle_task_complete,  # Terminal callback
)

Callback Trigger Conditions

Callbacks are triggered in these situations:

Trigger ConditionTaskCtl Signal
Task yields TaskCtl.success()TaskSignal.SUCCESS
Task yields TaskCtl.fail()TaskSignal.FAIL
Task yields TaskCtl.cancel()TaskSignal.CANCEL
External call to task.cancel()TaskSignal.CANCEL
Task throws exceptionTaskSignal.FAIL

Practical Example

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "Create Dev Task")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
    """Create development task"""
    task_id = f"DEV_{int(time.time())}"
    
    def _on_terminal(ctl: TaskCtl) -> None:
        """Auto-notify on task completion"""
        import asyncio
        from nekro_agent.services.message_service import message_service
        
        if ctl.signal == TaskSignal.SUCCESS:
            url = ctl.data.get("url", "")
            msg = f"✅ Dev task complete!\n🔗 {url}"
        else:
            msg = f"❌ Dev task failed: {ctl.message}"
        
        # Send async message in callback
        asyncio.create_task(
            message_service.push_system_message(
                chat_key=_ctx.chat_key,
                agent_messages=msg,
                trigger_agent=True,
            )
        )
    
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
        on_terminal=_on_terminal,
    )
    
    return f"Task started: {task_id}"

Complete Example: WebApp Development Task

Define Async Task

python
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
    handle: AsyncTaskHandle,
    requirement: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """WebApp development async task"""
    yield TaskCtl.report_progress("🚀 Starting development...", 0)

    # Execute development
    success, result = await run_developer_loop(requirement)

    if success:
        yield TaskCtl.report_progress("📦 Compiling...", 70)
        yield TaskCtl.success("Deployment successful", data={"url": url})
    else:
        yield TaskCtl.fail(f"Development failed: {result}")

Start Task

python
from nekro_agent.api.schemas import AgentCtx

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "Create WebApp Task")
async def create_webapp_task(
    _ctx: AgentCtx,
    requirement: str,
) -> str:
    """Create WebApp development task"""
    task_id = generate_task_id()

    # Start async task (non-blocking)
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
    )

    return f"Task started: {task_id}"

Best Practices

1. Task Design

python
# ✅ Recommended: Clear task phases
yield TaskCtl.report_progress("📥 Downloading resources...", 20)
yield TaskCtl.report_progress("🔄 Processing...", 60)
yield TaskCtl.report_progress("💾 Saving...", 90)

# ❌ Not recommended: Vague progress
yield TaskCtl.report_progress("Processing...", 50)
yield TaskCtl.report_progress("Almost done...", 80)

2. Error Handling

python
try:
    result = await external_api()
    yield TaskCtl.success("Done", data=result)
except TimeoutError:
    yield TaskCtl.fail("API timeout")
except Exception as e:
    yield TaskCtl.fail(f"Error: {e}")

3. Resource Cleanup

python
@plugin.on_disabled()
async def cleanup():
    count = await task.stop_all()
    if count > 0:
        logger.info(f"Stopped {count} tasks")

4. Concurrency Control

python
MAX_CONCURRENT_TASKS = 3

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
    active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
    if active >= MAX_CONCURRENT_TASKS:
        raise ValueError(f"Max concurrent tasks reached ({MAX_CONCURRENT_TASKS})")

    await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
    return f"Task started: {task_id}"

5. Terminal Callback Notes

python
# ✅ Recommended: Handle async operations in callback
def _on_terminal(ctl: TaskCtl) -> None:
    import asyncio
    asyncio.create_task(send_notification(ctl))

# ❌ Not recommended: Direct await in callback (callback is synchronous)
def _on_terminal(ctl: TaskCtl) -> None:
    await send_notification(ctl)  # Error!

FAQ

Q: What's the difference between async tasks and regular sandbox methods?

ComparisonSandbox MethodAsync Task
ExecutionImmediate on callBackground after call
Blocks main chatYesNo
Status trackingNoneFull progress reporting
Long-runningNot suitableSuitable
CancellableNoYes
Terminal callbackNot supportedSupported

Q: How to make a task wait for user confirmation?

python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
    yield TaskCtl.report_progress("⏳ Waiting for approval...", 50)

    # Wait for external notify() call
    approved = await handle.wait("approval", timeout=3600)

    if approved:
        yield TaskCtl.success("Approved")
    else:
        yield TaskCtl.fail("Approval timeout or rejected")

Q: Can the callback function be async?

No. The on_terminal callback must be a synchronous function. If you need to perform async operations, wrap them with asyncio.create_task():

python
def _on_terminal(ctl: TaskCtl) -> None:
    asyncio.create_task(async_cleanup(ctl))

Version Information

  • Feature Version: nekro-agent v2.2.0+
  • Documentation Version: 1.1.0
  • Last Updated: 2026-01-31