非同期タスク
WARNING
実験的機能:この機能は nekro-agent v2.2.0+ 以降でのみ利用可能です。API は今後のバージョンで調整される可能性があります。
概要
非同期タスクツールにより、プラグインはメイン AI の会話能力をブロックすることなく、長時間実行されるバックグラウンドタスクを実行できます。
ユースケース
- Web アプリケーション開発: AI がバックグラウンドで完全なアプリケーションを生成し、フロントエンドは会話を継続
- 動画/画像生成: 時間のかかるコンテンツ生成が会話を中断しない
- データ処理: 大規模なデータ処理がバックグラウンドで実行
- 外部 API 呼び出し: 長時間の API レスポンスを待機
コア機能
- ✅ ノンブロッキング: メイン AI はユーザーとの会話を継続可能
- ✅ ステータス追跡: リアルタイムの進捗報告
- ✅ 中断可能: 実行中のタスクのキャンセルをサポート
- ✅ 双方向通信: タスクからメイン Agent への通知が可能
- ✅ 終了コールバック: タスク完了/失敗/キャンセル時にコールバックを実行
コア API
1. 非同期タスクの定義
mount_async_task デコレータを使用して非同期タスク関数を登録します:
python
from nekro_agent.services.plugin.task import AsyncTaskHandle, TaskCtl, task
from nekro_agent.api.plugin import NekroPlugin
from typing import AsyncGenerator
plugin = NekroPlugin(
name="非同期プラグイン",
module_name="my_async_plugin",
description="非同期タスクのサンプルプラグイン",
version="1.0.0",
author="開発者",
url="https://github.com/xxx",
)
@plugin.mount_async_task("my_task")
async def my_async_task(
handle: AsyncTaskHandle,
prompt: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""非同期タスクのサンプル"""
# 進捗を報告
yield TaskCtl.report_progress("🔍 要件を分析中...", 10)
# キャンセルされたかチェック
if handle.is_cancelled:
yield TaskCtl.cancel("タスクがキャンセルされました")
return
# タスクステップを実行
yield TaskCtl.report_progress("⚙️ 処理中...", 50)
# タスク完了
yield TaskCtl.success("処理が完了しました", data={"result": "success"})2. タスク制御シグナル (TaskCtl)
タスクは yield TaskCtl.xxx() でステータスを報告します:
| メソッド | 説明 | 終端状態 |
|---|---|---|
TaskCtl.report_progress(msg, percent) | 進捗を報告 | ❌ |
TaskCtl.success(msg, data) | 成功完了 | ✅ |
TaskCtl.fail(msg, error) | タスク失敗 | ✅ |
TaskCtl.cancel(msg) | タスクキャンセル | ✅ |
TaskCtl プロパティ:
python
ctl = TaskCtl.success("完了", data={"url": "..."})
ctl.signal # TaskSignal.SUCCESS
ctl.message # "完了"
ctl.data # {"url": "..."}
ctl.progress # None (report_progress 時のみ値あり)
ctl.is_terminal # True (終端状態かどうか)3. タスクハンドル (AsyncTaskHandle)
タスク関数はタスク制御用の AsyncTaskHandle パラメータを受け取ります:
| プロパティ/メソッド | 説明 |
|---|---|
handle.task_id | タスク ID |
handle.chat_key | チャット Key |
handle.plugin | プラグインインスタンス |
handle.is_cancelled | キャンセルされたかチェック |
await handle.wait(key, timeout) | 外部シグナルを待機 |
handle.notify(key, data) | 待機ポイントを再開 |
await handle.notify_agent(message, trigger) | メイン Agent に通知 |
handle.cancel_wait(key) | 特定の待機ポイントをキャンセル |
handle.cancel_all() | すべての待機ポイントをキャンセル |
4. グローバルタスク API (task)
グローバル task オブジェクトでタスクを制御:
| メソッド | 説明 |
|---|---|
await task.start(type, id, chat_key, plugin, ..., on_terminal=callback) | タスクを開始 |
task.is_running(type, id) | 実行中かチェック |
await task.cancel(type, id) | タスクをキャンセル |
await task.stop_all() | すべてのタスクを停止 |
task.get_handle(type, id) | タスクハンドルを取得 |
task.get_state(type, id) | 最新のタスク状態を取得 |
task.get_running_tasks() | 実行中のタスクリストを取得 |
終端コールバック (on_terminal)
v2.2.0 新機能: タスク完了時にコールバック関数を実行し、リソースのクリーンアップや通知などに使用できます。
基本的な使い方
python
from nekro_agent.services.plugin.task import TaskCtl, task
def handle_task_complete(ctl: TaskCtl) -> None:
"""タスク完了コールバック"""
if ctl.signal == TaskSignal.SUCCESS:
print(f"✅ タスク成功: {ctl.message}")
print(f" 結果データ: {ctl.data}")
elif ctl.signal == TaskSignal.FAIL:
print(f"❌ タスク失敗: {ctl.message}")
elif ctl.signal == TaskSignal.CANCEL:
print(f"⚠️ タスクキャンセル: {ctl.message}")
# タスク開始時にコールバックを渡す
await task.start(
task_type="my_task",
task_id="T001",
chat_key=_ctx.chat_key,
plugin=plugin,
prompt="レポートを生成",
on_terminal=handle_task_complete, # 終端コールバック
)コールバックのトリガー条件
コールバックは以下の状況でトリガーされます:
| トリガー条件 | TaskCtl シグナル |
|---|---|
タスクが TaskCtl.success() を yield | TaskSignal.SUCCESS |
タスクが TaskCtl.fail() を yield | TaskSignal.FAIL |
タスクが TaskCtl.cancel() を yield | TaskSignal.CANCEL |
外部から task.cancel() を呼び出し | TaskSignal.CANCEL |
| タスクが例外をスロー | TaskSignal.FAIL |
実践的な例
python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "開発タスクを作成")
async def create_dev_task(_ctx: AgentCtx, requirement: str) -> str:
"""開発タスクを作成"""
task_id = f"DEV_{int(time.time())}"
def _on_terminal(ctl: TaskCtl) -> None:
"""タスク完了時に自動通知"""
import asyncio
from nekro_agent.services.message_service import message_service
if ctl.signal == TaskSignal.SUCCESS:
url = ctl.data.get("url", "")
msg = f"✅ 開発タスク完了!\n🔗 {url}"
else:
msg = f"❌ 開発タスク失敗: {ctl.message}"
# コールバック内で非同期メッセージを送信
asyncio.create_task(
message_service.push_system_message(
chat_key=_ctx.chat_key,
agent_messages=msg,
trigger_agent=True,
)
)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
on_terminal=_on_terminal,
)
return f"タスクを開始しました: {task_id}"完全な例: WebApp 開発タスク
非同期タスクの定義
python
@plugin.mount_async_task("webapp_dev")
async def webapp_development_task(
handle: AsyncTaskHandle,
requirement: str,
task_id: str,
) -> AsyncGenerator[TaskCtl, None]:
"""WebApp 開発非同期タスク"""
yield TaskCtl.report_progress("🚀 開発を開始...", 0)
# 開発を実行
success, result = await run_developer_loop(requirement)
if success:
yield TaskCtl.report_progress("📦 コンパイル中...", 70)
yield TaskCtl.success("デプロイ成功", data={"url": url})
else:
yield TaskCtl.fail(f"開発失敗: {result}")タスクの開始
python
from nekro_agent.api.schemas import AgentCtx
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "WebApp タスクを作成")
async def create_webapp_task(
_ctx: AgentCtx,
requirement: str,
) -> str:
"""WebApp 開発タスクを作成"""
task_id = generate_task_id()
# 非同期タスクを開始(ノンブロッキング)
await task.start(
task_type="webapp_dev",
task_id=task_id,
chat_key=_ctx.chat_key,
plugin=plugin,
requirement=requirement,
task_id=task_id,
)
return f"タスクを開始しました: {task_id}"ベストプラクティス
1. タスク設計
python
# ✅ 推奨: 明確なタスクフェーズ
yield TaskCtl.report_progress("📥 リソースをダウンロード中...", 20)
yield TaskCtl.report_progress("🔄 処理中...", 60)
yield TaskCtl.report_progress("💾 保存中...", 90)
# ❌ 非推奨: 曖昧な進捗
yield TaskCtl.report_progress("処理中...", 50)
yield TaskCtl.report_progress("もうすぐ完了...", 80)2. エラー処理
python
try:
result = await external_api()
yield TaskCtl.success("完了", data=result)
except TimeoutError:
yield TaskCtl.fail("API タイムアウト")
except Exception as e:
yield TaskCtl.fail(f"エラー: {e}")3. リソースクリーンアップ
python
@plugin.on_disabled()
async def cleanup():
count = await task.stop_all()
if count > 0:
logger.info(f"{count} 個のタスクを停止しました")4. 同時実行制御
python
MAX_CONCURRENT_TASKS = 3
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "start_task")
async def start_task(_ctx: AgentCtx, name: str) -> str:
active = len([t for t in task.get_running_tasks() if "my_plugin" in t])
if active >= MAX_CONCURRENT_TASKS:
raise ValueError(f"最大同時実行数に達しました ({MAX_CONCURRENT_TASKS})")
await task.start("my_task", task_id, _ctx.chat_key, plugin, name)
return f"タスクを開始しました: {task_id}"5. 終端コールバックの注意点
python
# ✅ 推奨: コールバック内で非同期操作を処理
def _on_terminal(ctl: TaskCtl) -> None:
import asyncio
asyncio.create_task(send_notification(ctl))
# ❌ 非推奨: コールバック内で直接 await(コールバックは同期関数)
def _on_terminal(ctl: TaskCtl) -> None:
await send_notification(ctl) # エラー!よくある質問
Q: 非同期タスクと通常のサンドボックスメソッドの違いは?
| 比較項目 | サンドボックスメソッド | 非同期タスク |
|---|---|---|
| 実行タイミング | 呼び出し時に即座に実行 | 呼び出し後バックグラウンドで実行 |
| メインチャットをブロック | する | しない |
| ステータス追跡 | なし | 完全な進捗報告 |
| 長時間実行 | 不向き | 適している |
| キャンセル可能 | 不可 | 可能 |
| 終端コールバック | 非対応 | 対応 |
Q: タスクにユーザー確認を待たせるには?
python
@plugin.mount_async_task("approval_task")
async def task_with_approval(handle: AsyncTaskHandle, prompt: str):
yield TaskCtl.report_progress("⏳ 承認を待機中...", 50)
# 外部からの notify() 呼び出しを待機
approved = await handle.wait("approval", timeout=3600)
if approved:
yield TaskCtl.success("承認されました")
else:
yield TaskCtl.fail("承認タイムアウトまたは拒否されました")Q: コールバック関数は非同期にできますか?
いいえ。on_terminal コールバックは同期関数でなければなりません。非同期操作が必要な場合は、asyncio.create_task() でラップしてください:
python
def _on_terminal(ctl: TaskCtl) -> None:
asyncio.create_task(async_cleanup(ctl))バージョン情報
- 機能バージョン: nekro-agent v2.2.0+
- ドキュメントバージョン: 1.1.0
- 最終更新: 2026-01-31
