Skip to content

非同期タスク

WARNING

実験的機能:この機能は nekro-agent v2.2.0+ 以降でのみ利用可能です。API は今後のバージョンで調整される可能性があります。

概要

非同期タスクツールにより、プラグインはメイン AI の会話能力をブロックすることなく、長時間実行されるバックグラウンドタスクを実行できます。

ユースケース

  • Web アプリケーション開発: AI がバックグラウンドで完全なアプリケーションを生成し、フロントエンドは会話を継続
  • 動画/画像生成: 時間のかかるコンテンツ生成が会話を中断しない
  • データ処理: 大規模なデータ処理がバックグラウンドで実行
  • 外部 API 呼び出し: 長時間の API レスポンスを待機

コア機能

  • ノンブロッキング: メイン AI はユーザーとの会話を継続可能
  • ステータス追跡: リアルタイムの進捗報告
  • 中断可能: 実行中のタスクのキャンセルをサポート
  • 双方向通信: タスクからメイン Agent への通知が可能
  • 終了コールバック: タスク完了/失敗/キャンセル時にコールバックを実行

コア API

1. 非同期タスクの定義

mount_async_task デコレータを使用して非同期タスク関数を登録します:

python
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator

plugin = NekroPlugin(
    name="非同期プラグイン",
    module_name="my_async_plugin",
    description="非同期タスクのサンプルプラグイン",
    version="1.0.0",
    author="開発者",
    url="https://github.com/xxx",
)

@plugin.mount_async_task("my_task")
async def my_async_task(
    handle: AsyncTaskHandle,
    prompt: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """非同期タスクのサンプル"""
    # 進捗を報告
    yield TaskCtl.report_progress("🔍 要件を分析中...", 10)

    # キャンセルされたかチェック
    if handle.is_cancelled:
        yield TaskCtl.cancel("タスクがキャンセルされました")
        return

    # タスクステップを実行
    yield TaskCtl.report_progress("⚙️ 処理中...", 50)

    # タスク完了
    yield TaskCtl.success("処理が完了しました", data={"result": "success"})

2. タスク制御シグナル (TaskCtl)

タスクは yield TaskCtl.xxx() でステータスを報告します:

メソッド説明終端状態
TaskCtl.report_progress(msg, percent)進捗を報告
TaskCtl.success(msg, data)成功完了
TaskCtl.fail(msg, error)タスク失敗
TaskCtl.cancel(msg)タスクキャンセル

TaskCtl プロパティ:

python
ctl = TaskCtl.success("完了", data={"url": "..."})

ctl.signal      # TaskSignal.SUCCESS
ctl.message     # "完了"
ctl.data        # {"url": "..."}
ctl.progress    # None (report_progress 時のみ値あり)
ctl.is_terminal # True (終端状態かどうか)

3. タスクハンドル (AsyncTaskHandle)

タスク関数はタスク制御用の AsyncTaskHandle パラメータを受け取ります:

プロパティ/メソッド説明
handle.task_idタスク ID
handle.chat_keyチャット Key
handle.pluginプラグインインスタンス
handle.is_cancelledキャンセルされたかチェック
await handle.wait(key, timeout)外部シグナルを待機
handle.notify(key, data)待機ポイントを再開
await handle.notify_agent(message, trigger)メイン Agent に通知
handle.cancel_wait(key)特定の待機ポイントをキャンセル
handle.cancel_all()すべての待機ポイントをキャンセル

4. グローバルタスク API (task)

グローバル task オブジェクトでタスクを制御:

メソッド説明
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback)タスクを開始
task.is_running(type, id)実行中かチェック
await task.cancel(type, id)タスクをキャンセル
await task.stop_all()すべてのタスクを停止
task.get_handle(type, id)タスクハンドルを取得
task.get_state(type, id)最新のタスク状態を取得
task.get_running_tasks()実行中のタスクリストを取得

終端コールバック (on_terminal)

v2.2.0 新機能: タスク完了時にコールバック関数を実行し、リソースのクリーンアップや通知などに使用できます。

基本的な使い方

python
from nekro_agent.services.plugin.task import TaskCtl, task

def handle_task_complete(ctl: TaskCtl) -> None:
    """タスク完了コールバック"""
    if ctl.signal == TaskSignal.SUCCESS:
        print(f"✅ タスク成功: {ctl.message}")
        print(f"   結果データ: {ctl.data}")
    elif ctl.signal == TaskSignal.FAIL:
        print(f"❌ タスク失敗: {ctl.message}")
    elif ctl.signal == TaskSignal.CANCEL:
        print(f"⚠️ タスクキャンセル: {ctl.message}")

# タスク開始時にコールバックを渡す
await task.start(
    task_type="my_task",
    task_id="T001",
    chat_key=_ctx.chat_key,
    plugin=plugin,
    prompt="レポートを生成",
    on_terminal=handle_task_complete,  # 終端コールバック
)

コールバックのトリガー条件

コールバックは以下の状況でトリガーされます:

トリガー条件TaskCtl シグナル
タスクが TaskCtl.success() を yieldTaskSignal.SUCCESS
タスクが TaskCtl.fail() を yieldTaskSignal.FAIL
タスクが TaskCtl.cancel() を yieldTaskSignal.CANCEL
外部から task.cancel() を呼び出しTaskSignal.CANCEL
タスクが例外をスローTaskSignal.FAIL

実践的な例

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "開発タスクを作成")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
    """開発タスクを作成"""
    task_id = f"DEV_{int(time.time())}"
    
    def _on_terminal(ctl: TaskCtl) -> None:
        """タスク完了時に自動通知"""
        import asyncio
        from nekro_agent.services.message_service import message_service
        
        if ctl.signal == TaskSignal.SUCCESS:
            url = ctl.data.get("url", "")
            msg = f"✅ 開発タスク完了!\n🔗 {url}"
        else:
            msg = f"❌ 開発タスク失敗: {ctl.message}"
        
        # コールバック内で非同期メッセージを送信
        asyncio.create_task(
            message_service.push_system_message(
                chat_key=_ctx.chat_key,
                agent_messages=msg,
                trigger_agent=True,
            )
        )
    
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
        on_terminal=_on_terminal,
    )
    
    return f"タスクを開始しました: {task_id}"

完全な例: WebApp 開発タスク

非同期タスクの定義

python
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
    handle: AsyncTaskHandle,
    requirement: str,
    task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
    """WebApp 開発非同期タスク"""
    yield TaskCtl.report_progress("🚀 開発を開始...", 0)

    # 開発を実行
    success, result = await run_developer_loop(requirement)

    if success:
        yield TaskCtl.report_progress("📦 コンパイル中...", 70)
        yield TaskCtl.success("デプロイ成功", data={"url": url})
    else:
        yield TaskCtl.fail(f"開発失敗: {result}")

タスクの開始

python
from nekro_agent.api.schemas import AgentCtx

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "WebApp タスクを作成")
async def create_webapp_task(
    _ctx: AgentCtx,
    requirement: str,
) -> str:
    """WebApp 開発タスクを作成"""
    task_id = generate_task_id()

    # 非同期タスクを開始(ノンブロッキング)
    await task.start(
        task_type="webapp_dev",
        task_id=task_id,
        chat_key=_ctx.chat_key,
        plugin=plugin,
        requirement=requirement,
        task_id=task_id,
    )

    return f"タスクを開始しました: {task_id}"

ベストプラクティス

1. タスク設計

python
# ✅ 推奨: 明確なタスクフェーズ
yield TaskCtl.report_progress("📥 リソースをダウンロード中...", 20)
yield TaskCtl.report_progress("🔄 処理中...", 60)
yield TaskCtl.report_progress("💾 保存中...", 90)

# ❌ 非推奨: 曖昧な進捗
yield TaskCtl.report_progress("処理中...", 50)
yield TaskCtl.report_progress("もうすぐ完了...", 80)

2. エラー処理

python
try:
    result = await external_api()
    yield TaskCtl.success("完了", data=result)
except TimeoutError:
    yield TaskCtl.fail("API タイムアウト")
except Exception as e:
    yield TaskCtl.fail(f"エラー: {e}")

3. リソースクリーンアップ

python
@plugin.on_disabled()
async def cleanup():
    count = await task.stop_all()
    if count > 0:
        logger.info(f"{count} 個のタスクを停止しました")

4. 同時実行制御

python
MAX_CONCURRENT_TASKS = 3

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
    active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
    if active >= MAX_CONCURRENT_TASKS:
        raise ValueError(f"最大同時実行数に達しました ({MAX_CONCURRENT_TASKS})")

    await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
    return f"タスクを開始しました: {task_id}"

5. 終端コールバックの注意点

python
# ✅ 推奨: コールバック内で非同期操作を処理
def _on_terminal(ctl: TaskCtl) -> None:
    import asyncio
    asyncio.create_task(send_notification(ctl))

# ❌ 非推奨: コールバック内で直接 await(コールバックは同期関数)
def _on_terminal(ctl: TaskCtl) -> None:
    await send_notification(ctl)  # エラー!

よくある質問

Q: 非同期タスクと通常のサンドボックスメソッドの違いは?

比較項目サンドボックスメソッド非同期タスク
実行タイミング呼び出し時に即座に実行呼び出し後バックグラウンドで実行
メインチャットをブロックするしない
ステータス追跡なし完全な進捗報告
長時間実行不向き適している
キャンセル可能不可可能
終端コールバック非対応対応

Q: タスクにユーザー確認を待たせるには?

python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
    yield TaskCtl.report_progress("⏳ 承認を待機中...", 50)

    # 外部からの notify() 呼び出しを待機
    approved = await handle.wait("approval", timeout=3600)

    if approved:
        yield TaskCtl.success("承認されました")
    else:
        yield TaskCtl.fail("承認タイムアウトまたは拒否されました")

Q: コールバック関数は非同期にできますか?

いいえ。on_terminal コールバックは同期関数でなければなりません。非同期操作が必要な場合は、asyncio.create_task() でラップしてください:

python
def _on_terminal(ctl: TaskCtl) -> None:
    asyncio.create_task(async_cleanup(ctl))

バージョン情報

  • 機能バージョン: nekro-agent v2.2.0+
  • ドキュメントバージョン: 1.1.0
  • 最終更新: 2026-01-31