データストレージ
プラグイン開発では、ユーザー設定、セッション状態、キャッシュ情報、設定ファイル、モデルファイルなど、データを永続化する必要がよくあります。Nekro Agentは、異なるストレージニーズに対応するため、プラグイン向けの2つの補完的なデータストレージ方法を提供します。
ストレージ方法の概要
Nekro Agentプラグインシステムは2つのデータストレージ方法を提供します:
1. KVキー値ストレージ(plugin.store)
- ストレージタイプ: データベースバックエンドのキー値ペアストレージ
- データ形式: 文字列型(複雑なデータはシリアル化が必要)
- 使用例: 小さな構造化データ、設定項目、状態情報
- アクセス方法:
plugin.storeAPIを介した非同期アクセス - データスコープ: セッションレベル、ユーザーレベル、プラグイングローバルの3つのスコープをサポート
2. プラグイン永続ディレクトリ(plugin.get_plugin_path())
- ストレージタイプ: ファイルシステムディレクトリ
- データ形式: 任意のファイルとバイナリデータ
- 使用例: 大きなファイル、バイナリデータ、モデルファイル、リソースファイル
- アクセス方法:
pathlib.Pathを介したファイルシステム操作 - データスコープ: プラグイン専用ディレクトリ、サブディレクトリ構造の自己管理が必要
選択ガイド
| ストレージニーズ | 推奨方法 | 理由 |
|---|---|---|
| ユーザー設定 | KVストレージ | 小さな構造化データ、スコープ分離をサポート |
| セッション状態 | KVストレージ | セッションレベルのデータ分離が必要 |
| 設定キャッシュ | KVストレージ | 高速な読み書き、クエリと更新が容易 |
| 画像、音声、動画 | 永続ディレクトリ | 大きなバイナリファイル |
| 機械学習モデル | 永続ディレクトリ | 大きなファイル、データベースストレージに不向き |
| ログファイル | 永続ディレクトリ | 継続的な追加、ファイル操作がより効率的 |
| 一時ファイル | 永続ディレクトリ | ファイルシステム操作がより柔軟 |
| データセットファイル | 永続ディレクトリ | 大量のデータ、ストリーム処理が必要な場合あり |
方法1:KVキー値ストレージ
概要
plugin.storeは、データベースに保存されたキー値ペアデータを操作するための一連の非同期メソッドを提供します。主な特徴は以下の通りです:
- キー値ストレージ: シンプルで直感的なKVストレージモデル
- データ分離: 各プラグインは独立した名前空間を持ち、キー名の競合を回避します
- スコープデータ: 3つのデータスコープをサポート
- セッション固有データ(
chat_key): データは特定のチャットセッションにバインドされます - ユーザー固有データ(
user_key): データは特定のユーザーにバインドされます(セッションを跨ぐ) - プラグイングローバルデータ(キーなし): データはプラグイン自体に属し、特定のセッションやユーザーに関連付けられません
- セッション固有データ(
- 文字列ストレージ: 基底ストレージは文字列ベースで、複雑なデータはシリアル化が必要です
コアAPI
1. データの設定(set)
ストレージにキー値ペアを追加または更新します。
async def set(
self,
chat_key: str = "", # オプション、セッション識別子
user_key: str = "", # オプション、ユーザー識別子
store_key: str = "", # 必須、ストレージキー名
value: str = "" # 必須、保存する値(文字列)
) -> Literal[0, 1]: # 成功は1、失敗は0を返す例:
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "ユーザー設定を保存")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
"""ユーザー設定を保存する"""
# セッション固有データを保存
await plugin.store.set(
chat_key=_ctx.from_chat_key,
store_key="last_command",
value="/weather London"
)
# ユーザー固有の設定を保存(セッションを跨ぐ)
user_prefs = {"theme": "dark", "notifications": True}
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="preferences",
value=json.dumps(user_prefs)
)
# プラグイングローバル設定を保存
await plugin.store.set(
store_key="plugin_last_updated",
value=str(time.time())
)
return "設定が保存されました"2. データの取得(get)
キー名に基づいてストレージからデータを取得します。
async def get(
self,
chat_key: str = "", # オプション、セッション識別子
user_key: str = "", # オプション、ユーザー識別子
store_key: str = "" # 必須、ストレージキー名
) -> Optional[str]: # 保存された文字列値を返す、存在しない場合はNone例:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "ユーザー設定を取得")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
"""ユーザー設定を取得する"""
# セッション固有データを取得
last_command = await plugin.store.get(
chat_key=_ctx.from_chat_key,
store_key="last_command"
)
# ユーザー設定を取得、存在しない場合はデフォルト値を使用
prefs_str = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="preferences"
)
if prefs_str:
user_preferences = json.loads(prefs_str)
else:
# デフォルト値
user_preferences = {"theme": "light", "notifications": False}
# プラグイングローバルデータを取得
timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
last_updated = float(timestamp_str) if timestamp_str else None
return f"設定: {user_preferences}"3. データの削除(delete)
キー名に基づいてストレージからキー値ペアを削除します。
async def delete(
self,
chat_key: str = "", # オプション、セッション識別子
user_key: str = "", # オプション、ユーザー識別子
store_key: str = "" # 必須、ストレージキー名
) -> Literal[0, 1]: # 成功は1、失敗は0を返す例:
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "キャッシュをクリア")
async def clear_cache(_ctx: AgentCtx) -> str:
"""セッションキャッシュデータをクリアする"""
# セッションの特定のキャッシュを削除
await plugin.store.delete(
chat_key=_ctx.from_chat_key,
store_key="session_cache_data"
)
# ユーザーの設定を削除
await plugin.store.delete(
user_key=_ctx.from_user_id,
store_key="old_setting"
)
return "キャッシュがクリアされました"4. データの存在確認
getメソッドの戻り値でキーが存在するかを判断します:
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
core.logger.info("'my_key'がストレージに存在します")
else:
core.logger.info("'my_key'は存在しません")構造化データの保存
KVストレージは文字列のみをサポートするため、複雑なデータ構造は保存時にシリアル化する必要があります。Pydanticモデルを使用することを推奨します:
from pydantic import BaseModel
from typing import List, Dict, Optional
import time
class Note(BaseModel):
id: str
title: str
content: str
created_at: float
tags: List[str] = []
class UserNotes(BaseModel):
notes: Dict[str, Note] = {}
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "メモを追加")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
"""現在のユーザーにメモを追加する"""
# 1. 既存のメモデータを取得
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if user_notes_json:
user_notes_data = UserNotes.model_validate_json(user_notes_json)
else:
user_notes_data = UserNotes()
# 2. 新しいメモを作成
new_note = Note(
id=note_id,
title=title,
content=content,
created_at=time.time(),
tags=tags_str.split(',') if tags_str else []
)
user_notes_data.notes[note_id] = new_note
# 3. シリアル化して保存
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="all_notes",
value=user_notes_data.model_dump_json()
)
return f"メモ '{title}' が追加されました、ID: {note_id}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "メモを取得")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
"""指定されたIDのメモ内容を取得する"""
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if not user_notes_json:
return "ユーザーにメモがありません"
user_notes_data = UserNotes.model_validate_json(user_notes_json)
note = user_notes_data.notes.get(note_id)
if note:
return f"タイトル: {note.title}\n内容: {note.content}\nタグ: {', '.join(note.tags)}"
return f"ID '{note_id}' のメモが見つかりません"方法2:プラグイン永続ディレクトリ
概要
plugin.get_plugin_path()は、プラグイン専用のファイルシステムディレクトリを指すpathlib.Pathオブジェクトを返します。各プラグインは独立したディレクトリを持ち、通常は以下の場所にあります:
data/plugins/<plugin_author>.<plugin_module_name>/この方法は、大きなファイル、バイナリデータ、複雑なファイル構造を必要とするシナリオに適しています。
プラグインディレクトリの取得
from pathlib import Path
from nekro_agent.api import core
# プラグインデータディレクトリを取得
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"プラグインデータディレクトリ: {plugin_dir}")
# 出力例: data/plugins/my_author.my_plugin/基本的なファイル操作
例1:テキストファイルの保存
import aiofiles
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "ログを保存")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
"""ログをファイルに保存する"""
# プラグインディレクトリを取得
plugin_dir = plugin.get_plugin_path()
# ログサブディレクトリを作成
logs_dir = plugin_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
# ログファイルを保存
log_file = logs_dir / f"log_{int(time.time())}.txt"
async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
await f.write(log_content)
core.logger.info(f"ログが保存されました: {log_file}")
return f"ログが {log_file.name} に保存されました"例2:バイナリファイルの保存
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "画像を保存")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
"""画像ファイルを保存する"""
plugin_dir = plugin.get_plugin_path()
images_dir = plugin_dir / "images"
images_dir.mkdir(parents=True, exist_ok=True)
image_path = images_dir / filename
async with aiofiles.open(image_path, "wb") as f:
await f.write(image_data)
return f"画像が保存されました: {image_path}"例3:ファイルの読み取り
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "設定ファイルを読み取り")
async def read_config(_ctx: AgentCtx) -> str:
"""プラグイン設定ファイルを読み取る"""
plugin_dir = plugin.get_plugin_path()
config_file = plugin_dir / "config.json"
# ファイルが存在するか確認
if not config_file.exists():
return "設定ファイルが存在しません"
# ファイルを読み取り
async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
config_content = await f.read()
return f"設定内容: {config_content}"高度なファイル操作の例
例4:リソースファイルの管理
from typing import List
@plugin.mount_init_method()
async def init_plugin_resources():
"""プラグイン初期化時にリソースディレクトリ構造を準備する"""
plugin_dir = plugin.get_plugin_path()
# 複数のサブディレクトリを作成
directories = [
plugin_dir / "cache",
plugin_dir / "models",
plugin_dir / "exports",
plugin_dir / "temp",
plugin_dir / "user_uploads"
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
core.logger.info(f"ディレクトリを作成しました: {directory}")
# デフォルト設定ファイルを作成(存在しない場合)
default_config = plugin_dir / "config.json"
if not default_config.exists():
default_settings = {
"version": "1.0.0",
"enabled": True,
"cache_ttl": 3600
}
async with aiofiles.open(default_config, "w") as f:
await f.write(json.dumps(default_settings, indent=2))
core.logger.success(f"デフォルト設定ファイルを作成しました: {default_config}")
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "エクスポートファイルを一覧表示")
async def list_exports(_ctx: AgentCtx) -> str:
"""すべてのエクスポートファイルを一覧表示する"""
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
if not exports_dir.exists():
return "エクスポートディレクトリが存在しません"
# ディレクトリ内のファイルを一覧表示
files = [f.name for f in exports_dir.iterdir() if f.is_file()]
if not files:
return "利用可能なエクスポートファイルがありません"
return f"エクスポートファイルリスト:\n" + "\n".join(f"- {f}" for f in files)例5:外部リソースのダウンロードとキャッシュ
import hashlib
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "外部リソースをキャッシュ")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
"""外部リソースをダウンロードしてキャッシュする"""
# requestsを動的にインポート
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests>=2.25.0")
plugin_dir = plugin.get_plugin_path()
cache_dir = plugin_dir / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
# URLハッシュをファイル名として使用
url_hash = hashlib.md5(url.encode()).hexdigest()
cache_file = cache_dir / f"{url_hash}.cache"
# キャッシュが存在するか確認
if cache_file.exists():
core.logger.info(f"キャッシュファイルを使用します: {cache_file}")
async with aiofiles.open(cache_file, "rb") as f:
cached_data = await f.read()
return f"キャッシュから読み込み、サイズ: {len(cached_data)} バイト"
# リソースをダウンロード
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
content = response.content
# キャッシュに保存
async with aiofiles.open(cache_file, "wb") as f:
await f.write(content)
core.logger.success(f"リソースがダウンロードされキャッシュされました: {cache_file}")
return f"リソースがダウンロードされました、サイズ: {len(content)} バイト"
except Exception as e:
core.logger.error(f"リソースのダウンロードに失敗しました: {e}")
return f"ダウンロードに失敗しました: {e}"例6:一時ファイルのクリーンアップ
import time
import os
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "一時ファイルをクリーンアップ")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
"""指定された時間より古い一時ファイルをクリーンアップする"""
plugin_dir = plugin.get_plugin_path()
temp_dir = plugin_dir / "temp"
if not temp_dir.exists():
return "一時ディレクトリが存在しません"
now = time.time()
max_age_seconds = max_age_hours * 3600
deleted_count = 0
# 一時ディレクトリを反復処理
for file_path in temp_dir.iterdir():
if file_path.is_file():
# ファイル変更時刻を確認
file_age = now - file_path.stat().st_mtime
if file_age > max_age_seconds:
try:
os.remove(file_path)
deleted_count += 1
core.logger.info(f"期限切れの一時ファイルを削除しました: {file_path.name}")
except Exception as e:
core.logger.error(f"ファイルの削除に失敗しました: {e}")
return f"クリーンアップが完了し、{deleted_count}個のファイルを削除しました"ファイルとAIサンドボックスの相互作用
プラグインディレクトリからAIにファイルを渡したり、AI生成のファイルを受け取ったりする必要がある場合は、ファイルシステムAPIを使用する必要があります:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "ユーザーアップロードファイルを処理")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
"""ユーザーアップロードファイルを処理してプラグインディレクトリに保存する"""
# 1. AIから渡されたファイルの実際のパスを取得
host_path = _ctx.fs.get_file(file_path)
# 2. ファイル内容を読み取り
async with aiofiles.open(host_path, "rb") as f:
file_content = await f.read()
# 3. プラグインディレクトリに保存
plugin_dir = plugin.get_plugin_path()
uploads_dir = plugin_dir / "user_uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
# タイムスタンプを一意のファイル名として使用
saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
async with aiofiles.open(saved_file, "wb") as f:
await f.write(file_content)
core.logger.success(f"ファイルがプラグインディレクトリに保存されました: {saved_file}")
return f"ファイルが処理され保存されました、サイズ: {len(file_content)} バイト"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "データファイルをエクスポート")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
"""データファイルをエクスポートしてAIに返す"""
# 1. プラグインディレクトリに保存
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
export_file = exports_dir / filename
async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
await f.write(data_content)
# 2. AIがアクセス可能なサンドボックスパスに変換
sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)
return f"データがエクスポートされました: {sandbox_path}"詳細なファイル転送メカニズムについては、ファイルインタラクションの章を参照してください。
両方のストレージ方法の組み合わせ
実際のアプリケーションでは、通常、両方のストレージ方法を組み合わせて使用します:
from pydantic import BaseModel
from typing import Optional
class ModelInfo(BaseModel):
"""モデル情報(KVに保存)"""
name: str
version: str
file_path: str # 実際のモデルファイルへのパス(プラグインディレクトリからの相対パス)
size_bytes: int
created_at: float
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "モデルをダウンロード")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
"""機械学習モデルをダウンロードしてメタデータを保存する"""
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests")
# 1. モデルファイルをプラグインディレクトリにダウンロード
plugin_dir = plugin.get_plugin_path()
models_dir = plugin_dir / "models"
models_dir.mkdir(parents=True, exist_ok=True)
model_file = models_dir / f"{model_name}.model"
try:
response = requests.get(model_url, timeout=300)
response.raise_for_status()
model_data = response.content
# モデルファイルを保存
async with aiofiles.open(model_file, "wb") as f:
await f.write(model_data)
# 2. モデルメタデータをKVストレージに保存
model_info = ModelInfo(
name=model_name,
version="1.0.0",
file_path=f"models/{model_name}.model", # 相対パス
size_bytes=len(model_data),
created_at=time.time()
)
await plugin.store.set(
store_key=f"model_info:{model_name}",
value=model_info.model_dump_json()
)
return f"モデル '{model_name}' が正常にダウンロードされました、サイズ: {len(model_data)} バイト"
except Exception as e:
return f"ダウンロードに失敗しました: {e}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "モデルをロード")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
"""ダウンロードしたモデルをロードする"""
# 1. KVストレージからモデルメタデータを取得
model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")
if not model_info_json:
return f"モデル '{model_name}' は存在しません"
model_info = ModelInfo.model_validate_json(model_info_json)
# 2. プラグインディレクトリからモデルファイルをロード
plugin_dir = plugin.get_plugin_path()
model_file_path = plugin_dir / model_info.file_path
if not model_file_path.exists():
return f"モデルファイルが見つかりません: {model_file_path}"
# ここでモデルをロードして使用する処理を実装
# 例: model = load_my_model(model_file_path)
return f"モデル '{model_name}' が正常にロードされました、パス: {model_file_path}"ベストプラクティス
適切なストレージ方法の選択:
- 小さな構造化データ(設定、状態情報など)にはKVストレージを使用します
- 大きなファイル、バイナリデータ、複雑なファイル構造には永続ディレクトリを使用します
- 両方を組み合わせて使用することも一般的です(例:メタデータをKVに、実際のファイルをディレクトリに保存)
データのシリアル化:
- KVストレージに複雑なデータを保存する場合は、JSONやPydanticモデルを使用してシリアル化します
- 一貫性のあるシリアル化形式を使用し、エラーハンドリングを適切に行います
ファイルパスの管理:
- プラグインディレクトリ内では相対パスを使用し、絶対パスのハードコーディングを避けます
- サブディレクトリを適切に整理し、ファイルの種類ごとに分類します
エラーハンドリング:
- ファイル操作やデータベース操作では適切な例外処理を行います
- ファイルが存在しない場合やデータが破損している場合に備えて、デフォルト値やフォールバック処理を用意します
クリーンアップとメンテナンス:
- 一時ファイルやキャッシュファイルの定期的なクリーンアップメカニズムを実装します
- 不要になったデータの削除方法を提供します
パフォーマンスの考慮:
- 大きなファイルの操作では非同期I/Oを使用します
- 頻繁にアクセスするデータはKVストレージに、大きなファイルはディレクトリに保存します
これらのストレージ方法を適切に使用することで、プラグインは効率的にデータを管理し、ユーザーに優れた体験を提供できます。
