Skip to content

データストレージ

プラグイン開発では、ユーザー設定、セッション状態、キャッシュ情報、設定ファイル、モデルファイルなど、データを永続化する必要がよくあります。Nekro Agentは、異なるストレージニーズに対応するため、プラグイン向けの2つの補完的なデータストレージ方法を提供します。

ストレージ方法の概要

Nekro Agentプラグインシステムは2つのデータストレージ方法を提供します:

1. KVキー値ストレージ(plugin.store

  • ストレージタイプ: データベースバックエンドのキー値ペアストレージ
  • データ形式: 文字列型(複雑なデータはシリアル化が必要)
  • 使用例: 小さな構造化データ、設定項目、状態情報
  • アクセス方法: plugin.store APIを介した非同期アクセス
  • データスコープ: セッションレベル、ユーザーレベル、プラグイングローバルの3つのスコープをサポート

2. プラグイン永続ディレクトリ(plugin.get_plugin_path()

  • ストレージタイプ: ファイルシステムディレクトリ
  • データ形式: 任意のファイルとバイナリデータ
  • 使用例: 大きなファイル、バイナリデータ、モデルファイル、リソースファイル
  • アクセス方法: pathlib.Pathを介したファイルシステム操作
  • データスコープ: プラグイン専用ディレクトリ、サブディレクトリ構造の自己管理が必要

選択ガイド

ストレージニーズ推奨方法理由
ユーザー設定KVストレージ小さな構造化データ、スコープ分離をサポート
セッション状態KVストレージセッションレベルのデータ分離が必要
設定キャッシュKVストレージ高速な読み書き、クエリと更新が容易
画像、音声、動画永続ディレクトリ大きなバイナリファイル
機械学習モデル永続ディレクトリ大きなファイル、データベースストレージに不向き
ログファイル永続ディレクトリ継続的な追加、ファイル操作がより効率的
一時ファイル永続ディレクトリファイルシステム操作がより柔軟
データセットファイル永続ディレクトリ大量のデータ、ストリーム処理が必要な場合あり

方法1:KVキー値ストレージ

概要

plugin.storeは、データベースに保存されたキー値ペアデータを操作するための一連の非同期メソッドを提供します。主な特徴は以下の通りです:

  • キー値ストレージ: シンプルで直感的なKVストレージモデル
  • データ分離: 各プラグインは独立した名前空間を持ち、キー名の競合を回避します
  • スコープデータ: 3つのデータスコープをサポート
    • セッション固有データchat_key): データは特定のチャットセッションにバインドされます
    • ユーザー固有データuser_key): データは特定のユーザーにバインドされます(セッションを跨ぐ)
    • プラグイングローバルデータ(キーなし): データはプラグイン自体に属し、特定のセッションやユーザーに関連付けられません
  • 文字列ストレージ: 基底ストレージは文字列ベースで、複雑なデータはシリアル化が必要です

コアAPI

1. データの設定(set

ストレージにキー値ペアを追加または更新します。

python
async def set(
    self,
    chat_key: str = "",    # オプション、セッション識別子
    user_key: str = "",    # オプション、ユーザー識別子
    store_key: str = "",   # 必須、ストレージキー名
    value: str = ""        # 必須、保存する値(文字列)
) -> Literal[0, 1]:        # 成功は1、失敗は0を返す

:

python
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "ユーザー設定を保存")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
    """ユーザー設定を保存する"""

    # セッション固有データを保存
    await plugin.store.set(
        chat_key=_ctx.from_chat_key,
        store_key="last_command",
        value="/weather London"
    )

    # ユーザー固有の設定を保存(セッションを跨ぐ)
    user_prefs = {"theme": "dark", "notifications": True}
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="preferences",
        value=json.dumps(user_prefs)
    )

    # プラグイングローバル設定を保存
    await plugin.store.set(
        store_key="plugin_last_updated",
        value=str(time.time())
    )

    return "設定が保存されました"

2. データの取得(get

キー名に基づいてストレージからデータを取得します。

python
async def get(
    self,
    chat_key: str = "",    # オプション、セッション識別子
    user_key: str = "",    # オプション、ユーザー識別子
    store_key: str = ""    # 必須、ストレージキー名
) -> Optional[str]:        # 保存された文字列値を返す、存在しない場合はNone

:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "ユーザー設定を取得")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
    """ユーザー設定を取得する"""

    # セッション固有データを取得
    last_command = await plugin.store.get(
        chat_key=_ctx.from_chat_key,
        store_key="last_command"
    )

    # ユーザー設定を取得、存在しない場合はデフォルト値を使用
    prefs_str = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="preferences"
    )

    if prefs_str:
        user_preferences = json.loads(prefs_str)
    else:
        # デフォルト値
        user_preferences = {"theme": "light", "notifications": False}

    # プラグイングローバルデータを取得
    timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
    last_updated = float(timestamp_str) if timestamp_str else None

    return f"設定: {user_preferences}"

3. データの削除(delete

キー名に基づいてストレージからキー値ペアを削除します。

python
async def delete(
    self,
    chat_key: str = "",    # オプション、セッション識別子
    user_key: str = "",    # オプション、ユーザー識別子
    store_key: str = ""    # 必須、ストレージキー名
) -> Literal[0, 1]:        # 成功は1、失敗は0を返す

:

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "キャッシュをクリア")
async def clear_cache(_ctx: AgentCtx) -> str:
    """セッションキャッシュデータをクリアする"""

    # セッションの特定のキャッシュを削除
    await plugin.store.delete(
        chat_key=_ctx.from_chat_key,
        store_key="session_cache_data"
    )

    # ユーザーの設定を削除
    await plugin.store.delete(
        user_key=_ctx.from_user_id,
        store_key="old_setting"
    )

    return "キャッシュがクリアされました"

4. データの存在確認

getメソッドの戻り値でキーが存在するかを判断します:

python
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
    core.logger.info("'my_key'がストレージに存在します")
else:
    core.logger.info("'my_key'は存在しません")

構造化データの保存

KVストレージは文字列のみをサポートするため、複雑なデータ構造は保存時にシリアル化する必要があります。Pydanticモデルを使用することを推奨します:

python
from pydantic import BaseModel
from typing import List, Dict, Optional
import time

class Note(BaseModel):
    id: str
    title: str
    content: str
    created_at: float
    tags: List[str] = []

class UserNotes(BaseModel):
    notes: Dict[str, Note] = {}

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "メモを追加")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
    """現在のユーザーにメモを追加する"""

    # 1. 既存のメモデータを取得
    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if user_notes_json:
        user_notes_data = UserNotes.model_validate_json(user_notes_json)
    else:
        user_notes_data = UserNotes()

    # 2. 新しいメモを作成
    new_note = Note(
        id=note_id,
        title=title,
        content=content,
        created_at=time.time(),
        tags=tags_str.split(',') if tags_str else []
    )
    user_notes_data.notes[note_id] = new_note

    # 3. シリアル化して保存
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="all_notes",
        value=user_notes_data.model_dump_json()
    )

    return f"メモ '{title}' が追加されました、ID: {note_id}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "メモを取得")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
    """指定されたIDのメモ内容を取得する"""

    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if not user_notes_json:
        return "ユーザーにメモがありません"

    user_notes_data = UserNotes.model_validate_json(user_notes_json)
    note = user_notes_data.notes.get(note_id)

    if note:
        return f"タイトル: {note.title}\n内容: {note.content}\nタグ: {', '.join(note.tags)}"

    return f"ID '{note_id}' のメモが見つかりません"

方法2:プラグイン永続ディレクトリ

概要

plugin.get_plugin_path()は、プラグイン専用のファイルシステムディレクトリを指すpathlib.Pathオブジェクトを返します。各プラグインは独立したディレクトリを持ち、通常は以下の場所にあります:

data/plugins/<plugin_author>.<plugin_module_name>/

この方法は、大きなファイル、バイナリデータ、複雑なファイル構造を必要とするシナリオに適しています。

プラグインディレクトリの取得

python
from pathlib import Path
from nekro_agent.api import core

# プラグインデータディレクトリを取得
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"プラグインデータディレクトリ: {plugin_dir}")

# 出力例: data/plugins/my_author.my_plugin/

基本的なファイル操作

例1:テキストファイルの保存

python
import aiofiles

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "ログを保存")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
    """ログをファイルに保存する"""

    # プラグインディレクトリを取得
    plugin_dir = plugin.get_plugin_path()

    # ログサブディレクトリを作成
    logs_dir = plugin_dir / "logs"
    logs_dir.mkdir(parents=True, exist_ok=True)

    # ログファイルを保存
    log_file = logs_dir / f"log_{int(time.time())}.txt"
    async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
        await f.write(log_content)

    core.logger.info(f"ログが保存されました: {log_file}")
    return f"ログが {log_file.name} に保存されました"

例2:バイナリファイルの保存

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "画像を保存")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
    """画像ファイルを保存する"""

    plugin_dir = plugin.get_plugin_path()
    images_dir = plugin_dir / "images"
    images_dir.mkdir(parents=True, exist_ok=True)

    image_path = images_dir / filename
    async with aiofiles.open(image_path, "wb") as f:
        await f.write(image_data)

    return f"画像が保存されました: {image_path}"

例3:ファイルの読み取り

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "設定ファイルを読み取り")
async def read_config(_ctx: AgentCtx) -> str:
    """プラグイン設定ファイルを読み取る"""

    plugin_dir = plugin.get_plugin_path()
    config_file = plugin_dir / "config.json"

    # ファイルが存在するか確認
    if not config_file.exists():
        return "設定ファイルが存在しません"

    # ファイルを読み取り
    async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
        config_content = await f.read()

    return f"設定内容: {config_content}"

高度なファイル操作の例

例4:リソースファイルの管理

python
from typing import List

@plugin.mount_init_method()
async def init_plugin_resources():
    """プラグイン初期化時にリソースディレクトリ構造を準備する"""

    plugin_dir = plugin.get_plugin_path()

    # 複数のサブディレクトリを作成
    directories = [
        plugin_dir / "cache",
        plugin_dir / "models",
        plugin_dir / "exports",
        plugin_dir / "temp",
        plugin_dir / "user_uploads"
    ]

    for directory in directories:
        directory.mkdir(parents=True, exist_ok=True)
        core.logger.info(f"ディレクトリを作成しました: {directory}")

    # デフォルト設定ファイルを作成(存在しない場合)
    default_config = plugin_dir / "config.json"
    if not default_config.exists():
        default_settings = {
            "version": "1.0.0",
            "enabled": True,
            "cache_ttl": 3600
        }
        async with aiofiles.open(default_config, "w") as f:
            await f.write(json.dumps(default_settings, indent=2))
        core.logger.success(f"デフォルト設定ファイルを作成しました: {default_config}")

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "エクスポートファイルを一覧表示")
async def list_exports(_ctx: AgentCtx) -> str:
    """すべてのエクスポートファイルを一覧表示する"""

    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"

    if not exports_dir.exists():
        return "エクスポートディレクトリが存在しません"

    # ディレクトリ内のファイルを一覧表示
    files = [f.name for f in exports_dir.iterdir() if f.is_file()]

    if not files:
        return "利用可能なエクスポートファイルがありません"

    return f"エクスポートファイルリスト:\n" + "\n".join(f"- {f}" for f in files)

例5:外部リソースのダウンロードとキャッシュ

python
import hashlib

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "外部リソースをキャッシュ")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
    """外部リソースをダウンロードしてキャッシュする"""

    # requestsを動的にインポート
    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests>=2.25.0")

    plugin_dir = plugin.get_plugin_path()
    cache_dir = plugin_dir / "cache"
    cache_dir.mkdir(parents=True, exist_ok=True)

    # URLハッシュをファイル名として使用
    url_hash = hashlib.md5(url.encode()).hexdigest()
    cache_file = cache_dir / f"{url_hash}.cache"

    # キャッシュが存在するか確認
    if cache_file.exists():
        core.logger.info(f"キャッシュファイルを使用します: {cache_file}")
        async with aiofiles.open(cache_file, "rb") as f:
            cached_data = await f.read()
        return f"キャッシュから読み込み、サイズ: {len(cached_data)} バイト"

    # リソースをダウンロード
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        content = response.content

        # キャッシュに保存
        async with aiofiles.open(cache_file, "wb") as f:
            await f.write(content)

        core.logger.success(f"リソースがダウンロードされキャッシュされました: {cache_file}")
        return f"リソースがダウンロードされました、サイズ: {len(content)} バイト"

    except Exception as e:
        core.logger.error(f"リソースのダウンロードに失敗しました: {e}")
        return f"ダウンロードに失敗しました: {e}"

例6:一時ファイルのクリーンアップ

python
import time
import os

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "一時ファイルをクリーンアップ")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
    """指定された時間より古い一時ファイルをクリーンアップする"""

    plugin_dir = plugin.get_plugin_path()
    temp_dir = plugin_dir / "temp"

    if not temp_dir.exists():
        return "一時ディレクトリが存在しません"

    now = time.time()
    max_age_seconds = max_age_hours * 3600
    deleted_count = 0

    # 一時ディレクトリを反復処理
    for file_path in temp_dir.iterdir():
        if file_path.is_file():
            # ファイル変更時刻を確認
            file_age = now - file_path.stat().st_mtime

            if file_age > max_age_seconds:
                try:
                    os.remove(file_path)
                    deleted_count += 1
                    core.logger.info(f"期限切れの一時ファイルを削除しました: {file_path.name}")
                except Exception as e:
                    core.logger.error(f"ファイルの削除に失敗しました: {e}")

    return f"クリーンアップが完了し、{deleted_count}個のファイルを削除しました"

ファイルとAIサンドボックスの相互作用

プラグインディレクトリからAIにファイルを渡したり、AI生成のファイルを受け取ったりする必要がある場合は、ファイルシステムAPIを使用する必要があります:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "ユーザーアップロードファイルを処理")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
    """ユーザーアップロードファイルを処理してプラグインディレクトリに保存する"""

    # 1. AIから渡されたファイルの実際のパスを取得
    host_path = _ctx.fs.get_file(file_path)

    # 2. ファイル内容を読み取り
    async with aiofiles.open(host_path, "rb") as f:
        file_content = await f.read()

    # 3. プラグインディレクトリに保存
    plugin_dir = plugin.get_plugin_path()
    uploads_dir = plugin_dir / "user_uploads"
    uploads_dir.mkdir(parents=True, exist_ok=True)

    # タイムスタンプを一意のファイル名として使用
    saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
    async with aiofiles.open(saved_file, "wb") as f:
        await f.write(file_content)

    core.logger.success(f"ファイルがプラグインディレクトリに保存されました: {saved_file}")
    return f"ファイルが処理され保存されました、サイズ: {len(file_content)} バイト"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "データファイルをエクスポート")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
    """データファイルをエクスポートしてAIに返す"""

    # 1. プラグインディレクトリに保存
    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"
    exports_dir.mkdir(parents=True, exist_ok=True)

    export_file = exports_dir / filename
    async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
        await f.write(data_content)

    # 2. AIがアクセス可能なサンドボックスパスに変換
    sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)

    return f"データがエクスポートされました: {sandbox_path}"

詳細なファイル転送メカニズムについては、ファイルインタラクションの章を参照してください。

両方のストレージ方法の組み合わせ

実際のアプリケーションでは、通常、両方のストレージ方法を組み合わせて使用します:

python
from pydantic import BaseModel
from typing import Optional

class ModelInfo(BaseModel):
    """モデル情報(KVに保存)"""
    name: str
    version: str
    file_path: str  # 実際のモデルファイルへのパス(プラグインディレクトリからの相対パス)
    size_bytes: int
    created_at: float

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "モデルをダウンロード")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
    """機械学習モデルをダウンロードしてメタデータを保存する"""

    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests")

    # 1. モデルファイルをプラグインディレクトリにダウンロード
    plugin_dir = plugin.get_plugin_path()
    models_dir = plugin_dir / "models"
    models_dir.mkdir(parents=True, exist_ok=True)

    model_file = models_dir / f"{model_name}.model"

    try:
        response = requests.get(model_url, timeout=300)
        response.raise_for_status()
        model_data = response.content

        # モデルファイルを保存
        async with aiofiles.open(model_file, "wb") as f:
            await f.write(model_data)

        # 2. モデルメタデータをKVストレージに保存
        model_info = ModelInfo(
            name=model_name,
            version="1.0.0",
            file_path=f"models/{model_name}.model",  # 相対パス
            size_bytes=len(model_data),
            created_at=time.time()
        )

        await plugin.store.set(
            store_key=f"model_info:{model_name}",
            value=model_info.model_dump_json()
        )

        return f"モデル '{model_name}' が正常にダウンロードされました、サイズ: {len(model_data)} バイト"

    except Exception as e:
        return f"ダウンロードに失敗しました: {e}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "モデルをロード")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
    """ダウンロードしたモデルをロードする"""

    # 1. KVストレージからモデルメタデータを取得
    model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")

    if not model_info_json:
        return f"モデル '{model_name}' は存在しません"

    model_info = ModelInfo.model_validate_json(model_info_json)

    # 2. プラグインディレクトリからモデルファイルをロード
    plugin_dir = plugin.get_plugin_path()
    model_file_path = plugin_dir / model_info.file_path

    if not model_file_path.exists():
        return f"モデルファイルが見つかりません: {model_file_path}"

    # ここでモデルをロードして使用する処理を実装
    # 例: model = load_my_model(model_file_path)
    
    return f"モデル '{model_name}' が正常にロードされました、パス: {model_file_path}"

ベストプラクティス

  1. 適切なストレージ方法の選択:

    • 小さな構造化データ(設定、状態情報など)にはKVストレージを使用します
    • 大きなファイル、バイナリデータ、複雑なファイル構造には永続ディレクトリを使用します
    • 両方を組み合わせて使用することも一般的です(例:メタデータをKVに、実際のファイルをディレクトリに保存)
  2. データのシリアル化:

    • KVストレージに複雑なデータを保存する場合は、JSONやPydanticモデルを使用してシリアル化します
    • 一貫性のあるシリアル化形式を使用し、エラーハンドリングを適切に行います
  3. ファイルパスの管理:

    • プラグインディレクトリ内では相対パスを使用し、絶対パスのハードコーディングを避けます
    • サブディレクトリを適切に整理し、ファイルの種類ごとに分類します
  4. エラーハンドリング:

    • ファイル操作やデータベース操作では適切な例外処理を行います
    • ファイルが存在しない場合やデータが破損している場合に備えて、デフォルト値やフォールバック処理を用意します
  5. クリーンアップとメンテナンス:

    • 一時ファイルやキャッシュファイルの定期的なクリーンアップメカニズムを実装します
    • 不要になったデータの削除方法を提供します
  6. パフォーマンスの考慮:

    • 大きなファイルの操作では非同期I/Oを使用します
    • 頻繁にアクセスするデータはKVストレージに、大きなファイルはディレクトリに保存します

これらのストレージ方法を適切に使用することで、プラグインは効率的にデータを管理し、ユーザーに優れた体験を提供できます。