Skip to content

数据存储

插件开发中经常需要持久化数据,例如用户偏好、会话状态、缓存信息、配置文件、模型文件等。Nekro Agent 为插件提供了两种互补的数据存储方式,满足不同的存储需求。

存储方式概览

Nekro Agent 插件系统提供两种数据存储方式:

1. KV 键值存储 (plugin.store)

  • 存储类型:数据库支持的键值对存储
  • 数据形式:字符串类型(需序列化复杂数据)
  • 适用场景:小型结构化数据、配置项、状态信息
  • 访问方式:通过 plugin.store API 异步访问
  • 数据作用域:支持会话级、用户级、插件全局三种作用域

2. 插件持久化目录 (plugin.get_plugin_path())

  • 存储类型:文件系统目录
  • 数据形式:任意文件和二进制数据
  • 适用场景:大文件、二进制数据、模型文件、资源文件
  • 访问方式:通过 pathlib.Path 进行文件系统操作
  • 数据作用域:插件独占目录,需自行管理子目录结构

选择指南

存储需求推荐方式原因
用户偏好设置KV 存储小型结构化数据,支持作用域隔离
会话状态KV 存储需要会话级别的数据隔离
配置缓存KV 存储快速读写,易于查询和更新
图片、音频、视频持久化目录大型二进制文件
机器学习模型持久化目录大文件,不适合数据库存储
日志文件持久化目录持续追加,文件操作更高效
临时文件持久化目录文件系统操作更灵活
数据集文件持久化目录大量数据,可能需要流式处理

方式一:KV 键值存储

概述

plugin.store 提供了一组异步方法来操作存储在数据库中的键值对数据。其主要特点包括:

  • 键值存储:简单直观的 KV 存储模型
  • 数据隔离:每个插件拥有独立的命名空间,避免键名冲突
  • 作用域数据:支持三种数据作用域
    • 会话特定数据 (chat_key): 数据与特定的聊天会话绑定
    • 用户特定数据 (user_key): 数据与特定的用户绑定(跨会话)
    • 插件全局数据 (无 key): 数据属于插件本身,不与任何特定会话或用户关联
  • 字符串存储:底层存储字符串,复杂数据需要序列化

核心 API

1. 设置数据 (set)

向存储中添加或更新键值对。

python
async def set(
    self,
    chat_key: str = "",    # 可选,会话标识
    user_key: str = "",    # 可选,用户标识
    store_key: str = "",   # 必需,存储的键名
    value: str = ""        # 必需,要存储的值 (字符串)
) -> Literal[0, 1]:        # 返回 1 表示成功,0 表示失败

示例

python
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "保存用户偏好")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
    """保存用户偏好设置"""

    # 存储会话特定数据
    await plugin.store.set(
        chat_key=_ctx.from_chat_key,
        store_key="last_command",
        value="/weather London"
    )

    # 存储用户特定偏好(跨会话)
    user_prefs = {"theme": "dark", "notifications": True}
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="preferences",
        value=json.dumps(user_prefs)
    )

    # 存储插件全局配置
    await plugin.store.set(
        store_key="plugin_last_updated",
        value=str(time.time())
    )

    return "偏好设置已保存"

2. 获取数据 (get)

根据键名从存储中检索数据。

python
async def get(
    self,
    chat_key: str = "",    # 可选,会话标识
    user_key: str = "",    # 可选,用户标识
    store_key: str = ""    # 必需,存储的键名
) -> Optional[str]:        # 返回存储的字符串值,不存在则返回 None

示例

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "获取用户偏好")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
    """获取用户偏好设置"""

    # 获取会话特定数据
    last_command = await plugin.store.get(
        chat_key=_ctx.from_chat_key,
        store_key="last_command"
    )

    # 获取用户偏好,如果不存在则使用默认值
    prefs_str = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="preferences"
    )

    if prefs_str:
        user_preferences = json.loads(prefs_str)
    else:
        # 默认值
        user_preferences = {"theme": "light", "notifications": False}

    # 获取插件全局数据
    timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
    last_updated = float(timestamp_str) if timestamp_str else None

    return f"偏好: {user_preferences}"

3. 删除数据 (delete)

根据键名从存储中移除键值对。

python
async def delete(
    self,
    chat_key: str = "",    # 可选,会话标识
    user_key: str = "",    # 可选,用户标识
    store_key: str = ""    # 必需,存储的键名
) -> Literal[0, 1]:        # 返回 1 表示成功,0 表示失败

示例

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "清除缓存")
async def clear_cache(_ctx: AgentCtx) -> str:
    """清除会话缓存数据"""

    # 删除会话的特定缓存
    await plugin.store.delete(
        chat_key=_ctx.from_chat_key,
        store_key="session_cache_data"
    )

    # 删除用户的某个设置
    await plugin.store.delete(
        user_key=_ctx.from_user_id,
        store_key="old_setting"
    )

    return "缓存已清除"

4. 检查数据是否存在

通过 get 方法返回值判断键是否存在:

python
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
    core.logger.info("'my_key' 存在于存储中")
else:
    core.logger.info("'my_key' 不存在")

存储结构化数据

由于 KV 存储只支持字符串,存储复杂数据结构时需要序列化。推荐使用 Pydantic 模型:

python
from pydantic import BaseModel
from typing import List, Dict, Optional
import time

class Note(BaseModel):
    id: str
    title: str
    content: str
    created_at: float
    tags: List[str] = []

class UserNotes(BaseModel):
    notes: Dict[str, Note] = {}

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "添加笔记")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
    """为当前用户添加笔记"""

    # 1. 获取现有笔记数据
    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if user_notes_json:
        user_notes_data = UserNotes.model_validate_json(user_notes_json)
    else:
        user_notes_data = UserNotes()

    # 2. 创建新笔记
    new_note = Note(
        id=note_id,
        title=title,
        content=content,
        created_at=time.time(),
        tags=tags_str.split(',') if tags_str else []
    )
    user_notes_data.notes[note_id] = new_note

    # 3. 序列化并存储
    await plugin.store.set(
        user_key=_ctx.from_user_id,
        store_key="all_notes",
        value=user_notes_data.model_dump_json()
    )

    return f"笔记 '{title}' 已添加,ID: {note_id}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "获取笔记")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
    """获取指定 ID 的笔记内容"""

    user_notes_json = await plugin.store.get(
        user_key=_ctx.from_user_id,
        store_key="all_notes"
    )

    if not user_notes_json:
        return "用户没有任何笔记"

    user_notes_data = UserNotes.model_validate_json(user_notes_json)
    note = user_notes_data.notes.get(note_id)

    if note:
        return f"标题: {note.title}\n内容: {note.content}\n标签: {', '.join(note.tags)}"

    return f"未找到 ID 为 '{note_id}' 的笔记"

方式二:插件持久化目录

概述

plugin.get_plugin_path() 返回一个 pathlib.Path 对象,指向插件专属的文件系统目录。每个插件拥有独立的目录,通常位于:

data/plugins/<plugin_author>.<plugin_module_name>/

这种方式适合存储大文件、二进制数据、需要复杂文件结构的场景。

获取插件目录

python
from pathlib import Path
from nekro_agent.api import core

# 获取插件数据目录
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"插件数据目录: {plugin_dir}")

# 示例输出:data/plugins/my_author.my_plugin/

基础文件操作

示例 1:保存文本文件

python
import aiofiles

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "保存日志")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
    """保存日志到文件"""

    # 获取插件目录
    plugin_dir = plugin.get_plugin_path()

    # 创建日志子目录
    logs_dir = plugin_dir / "logs"
    logs_dir.mkdir(parents=True, exist_ok=True)

    # 保存日志文件
    log_file = logs_dir / f"log_{int(time.time())}.txt"
    async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
        await f.write(log_content)

    core.logger.info(f"日志已保存: {log_file}")
    return f"日志已保存到 {log_file.name}"

示例 2:保存二进制文件

python
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "保存图片")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
    """保存图片文件"""

    plugin_dir = plugin.get_plugin_path()
    images_dir = plugin_dir / "images"
    images_dir.mkdir(parents=True, exist_ok=True)

    image_path = images_dir / filename
    async with aiofiles.open(image_path, "wb") as f:
        await f.write(image_data)

    return f"图片已保存: {image_path}"

示例 3:读取文件

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "读取配置文件")
async def read_config(_ctx: AgentCtx) -> str:
    """读取插件配置文件"""

    plugin_dir = plugin.get_plugin_path()
    config_file = plugin_dir / "config.json"

    # 检查文件是否存在
    if not config_file.exists():
        return "配置文件不存在"

    # 读取文件
    async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
        config_content = await f.read()

    return f"配置内容: {config_content}"

高级文件操作示例

示例 4:管理资源文件

python
from typing import List

@plugin.mount_init_method()
async def init_plugin_resources():
    """插件初始化时准备资源目录结构"""

    plugin_dir = plugin.get_plugin_path()

    # 创建多个子目录
    directories = [
        plugin_dir / "cache",
        plugin_dir / "models",
        plugin_dir / "exports",
        plugin_dir / "temp",
        plugin_dir / "user_uploads"
    ]

    for directory in directories:
        directory.mkdir(parents=True, exist_ok=True)
        core.logger.info(f"已创建目录: {directory}")

    # 创建默认配置文件(如果不存在)
    default_config = plugin_dir / "config.json"
    if not default_config.exists():
        default_settings = {
            "version": "1.0.0",
            "enabled": True,
            "cache_ttl": 3600
        }
        async with aiofiles.open(default_config, "w") as f:
            await f.write(json.dumps(default_settings, indent=2))
        core.logger.success(f"已创建默认配置文件: {default_config}")

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "列出导出文件")
async def list_exports(_ctx: AgentCtx) -> str:
    """列出所有导出的文件"""

    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"

    if not exports_dir.exists():
        return "导出目录不存在"

    # 列出目录中的文件
    files = [f.name for f in exports_dir.iterdir() if f.is_file()]

    if not files:
        return "暂无导出文件"

    return f"导出文件列表:\n" + "\n".join(f"- {f}" for f in files)

示例 5:下载和缓存外部资源

python
import hashlib

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "缓存外部资源")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
    """下载并缓存外部资源"""

    # 动态导入 requests
    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests>=2.25.0")

    plugin_dir = plugin.get_plugin_path()
    cache_dir = plugin_dir / "cache"
    cache_dir.mkdir(parents=True, exist_ok=True)

    # 使用 URL 的哈希作为文件名
    url_hash = hashlib.md5(url.encode()).hexdigest()
    cache_file = cache_dir / f"{url_hash}.cache"

    # 检查缓存是否存在
    if cache_file.exists():
        core.logger.info(f"使用缓存文件: {cache_file}")
        async with aiofiles.open(cache_file, "rb") as f:
            cached_data = await f.read()
        return f"已从缓存加载,大小: {len(cached_data)} 字节"

    # 下载资源
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        content = response.content

        # 保存到缓存
        async with aiofiles.open(cache_file, "wb") as f:
            await f.write(content)

        core.logger.success(f"资源已下载并缓存: {cache_file}")
        return f"资源已下载,大小: {len(content)} 字节"

    except Exception as e:
        core.logger.error(f"下载资源失败: {e}")
        return f"下载失败: {e}"

示例 6:清理临时文件

python
import time
import os

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "清理临时文件")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
    """清理超过指定时间的临时文件"""

    plugin_dir = plugin.get_plugin_path()
    temp_dir = plugin_dir / "temp"

    if not temp_dir.exists():
        return "临时目录不存在"

    now = time.time()
    max_age_seconds = max_age_hours * 3600
    deleted_count = 0

    # 遍历临时目录
    for file_path in temp_dir.iterdir():
        if file_path.is_file():
            # 检查文件修改时间
            file_age = now - file_path.stat().st_mtime

            if file_age > max_age_seconds:
                try:
                    os.remove(file_path)
                    deleted_count += 1
                    core.logger.info(f"已删除过期临时文件: {file_path.name}")
                except Exception as e:
                    core.logger.error(f"删除文件失败: {e}")

    return f"清理完成,删除了 {deleted_count} 个文件"

文件与 AI 沙盒交互

当需要将插件目录中的文件传递给 AI 或接收 AI 生成的文件时,需要使用文件系统 API:

python
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "处理用户上传的文件")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
    """处理用户上传的文件并保存到插件目录"""

    # 1. 获取 AI 传递的文件的真实路径
    host_path = _ctx.fs.get_file(file_path)

    # 2. 读取文件内容
    async with aiofiles.open(host_path, "rb") as f:
        file_content = await f.read()

    # 3. 保存到插件目录
    plugin_dir = plugin.get_plugin_path()
    uploads_dir = plugin_dir / "user_uploads"
    uploads_dir.mkdir(parents=True, exist_ok=True)

    # 使用时间戳作为唯一文件名
    saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
    async with aiofiles.open(saved_file, "wb") as f:
        await f.write(file_content)

    core.logger.success(f"文件已保存到插件目录: {saved_file}")
    return f"文件已处理并保存,大小: {len(file_content)} 字节"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "导出数据文件")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
    """导出数据文件并返回给 AI"""

    # 1. 保存到插件目录
    plugin_dir = plugin.get_plugin_path()
    exports_dir = plugin_dir / "exports"
    exports_dir.mkdir(parents=True, exist_ok=True)

    export_file = exports_dir / filename
    async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
        await f.write(data_content)

    # 2. 转换为 AI 可访问的沙盒路径
    sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)

    return f"数据已导出: {sandbox_path}"

详细的文件传递机制请参阅 文件交互 章节。

两种存储方式的结合使用

在实际应用中,通常会结合使用两种存储方式:

python
from pydantic import BaseModel
from typing import Optional

class ModelInfo(BaseModel):
    """模型信息(存储在 KV)"""
    name: str
    version: str
    file_path: str  # 实际模型文件的路径(相对于插件目录)
    size_bytes: int
    created_at: float

@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "下载模型")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
    """下载机器学习模型并保存元数据"""

    from nekro_agent.api.plugin import dynamic_import_pkg
    requests = dynamic_import_pkg("requests")

    # 1. 下载模型文件到插件目录
    plugin_dir = plugin.get_plugin_path()
    models_dir = plugin_dir / "models"
    models_dir.mkdir(parents=True, exist_ok=True)

    model_file = models_dir / f"{model_name}.model"

    try:
        response = requests.get(model_url, timeout=300)
        response.raise_for_status()
        model_data = response.content

        # 保存模型文件
        async with aiofiles.open(model_file, "wb") as f:
            await f.write(model_data)

        # 2. 保存模型元数据到 KV 存储
        model_info = ModelInfo(
            name=model_name,
            version="1.0.0",
            file_path=f"models/{model_name}.model",  # 相对路径
            size_bytes=len(model_data),
            created_at=time.time()
        )

        await plugin.store.set(
            store_key=f"model_info:{model_name}",
            value=model_info.model_dump_json()
        )

        return f"模型 '{model_name}' 下载成功,大小: {len(model_data)} 字节"

    except Exception as e:
        return f"下载失败: {e}"

@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "加载模型")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
    """加载已下载的模型"""

    # 1. 从 KV 存储获取模型元数据
    model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")

    if not model_info_json:
        return f"模型 '{model_name}' 不存在"

    model_info = ModelInfo.model_validate_json(model_info_json)

    # 2. 从插件目录加载模型文件
    plugin_dir = plugin.get_plugin_path()
    model_file = plugin_dir / model_info.file_path

    if not model_file.exists():
        return f"模型文件不存在: {model_file}"

    # 读取模型文件(示例)
    async with aiofiles.open(model_file, "rb") as f:
        model_data = await f.read()

    return f"模型已加载: {model_info.name} (版本 {model_info.version},大小 {model_info.size_bytes} 字节)"

最佳实践

KV 存储最佳实践

  1. 明确键名策略:使用清晰、有结构的 store_key

    python
    # ✅ 推荐
    "user_prefs:theme"
    "chat_state:topic"
    "cache:api:last_fetch"
    
    # ❌ 不推荐
    "data"
    "temp"
    "x"
  2. 数据序列化:复杂数据使用 Pydantic 或 JSON 序列化

    python
    # ✅ 推荐:使用 Pydantic
    await plugin.store.set(
        user_key=user_id,
        store_key="settings",
        value=UserSettings(...).model_dump_json()
    )
    
    # ✅ 可接受:使用 JSON
    await plugin.store.set(
        store_key="config",
        value=json.dumps(config_dict)
    )
    
    # ❌ 不推荐:存储未序列化的对象
    await plugin.store.set(store_key="obj", value=str(my_object))
  3. 错误处理:处理数据不存在的情况

    python
    # ✅ 推荐
    value = await plugin.store.get(store_key="key")
    if value is not None:
        # 处理数据
        pass
    else:
        # 使用默认值或报错
        value = "default"
  4. 作用域选择:根据数据特性选择合适的作用域

    • 会话临时状态 → chat_key
    • 用户个人设置 → user_key
    • 插件全局配置 → 无 key
  5. 数据清理:定期清理不需要的数据

    python
    @plugin.mount_on_channel_reset()
    async def on_reset(_ctx: AgentCtx):
        """会话重置时清理数据"""
        await plugin.store.delete(
            chat_key=_ctx.from_chat_key,
            store_key="temp_cache"
        )
  6. 大小限制:KV 存储适合小型数据(< 1MB)

    • 大于 1MB 的数据应使用持久化目录
    • 在 KV 中存储文件路径引用

持久化目录最佳实践

  1. 目录结构规划:使用清晰的子目录结构

    python
    plugin_dir/
    ├── cache/          # 缓存文件
    ├── models/         # 模型文件
    ├── exports/        # 导出文件
    ├── temp/           # 临时文件
    ├── user_uploads/   # 用户上传
    └── config.json     # 配置文件
  2. 确保目录存在:操作前创建必要的目录

    python
    # ✅ 推荐
    target_dir = plugin_dir / "subdir"
    target_dir.mkdir(parents=True, exist_ok=True)
    
    # ❌ 不推荐:不检查直接写入
    file_path = plugin_dir / "subdir" / "file.txt"
    # 如果 subdir 不存在会报错
  3. 异步文件操作:使用 aiofiles 进行异步 I/O

    python
    # ✅ 推荐:异步操作
    async with aiofiles.open(file_path, "w") as f:
        await f.write(content)
    
    # ❌ 不推荐:同步操作(阻塞事件循环)
    with open(file_path, "w") as f:
        f.write(content)
  4. 文件命名规范:使用时间戳或 UUID 避免冲突

    python
    # ✅ 推荐
    filename = f"{int(time.time())}_{original_name}"
    filename = f"{uuid.uuid4()}_{original_name}"
    
    # ❌ 不推荐:可能冲突
    filename = "output.txt"
  5. 定期清理:清理临时文件和过期缓存

    python
    @plugin.mount_cleanup_method()
    async def cleanup():
        """插件卸载时清理临时文件"""
        plugin_dir = plugin.get_plugin_path()
        temp_dir = plugin_dir / "temp"
    
        if temp_dir.exists():
            for file in temp_dir.iterdir():
                if file.is_file():
                    os.remove(file)
  6. 错误处理:妥善处理文件操作错误

    python
    try:
        async with aiofiles.open(file_path, "r") as f:
            content = await f.read()
    except FileNotFoundError:
        core.logger.error(f"文件不存在: {file_path}")
        return "文件不存在"
    except Exception as e:
        core.logger.error(f"读取文件失败: {e}")
        return f"读取失败: {e}"
  7. 文件大小控制:避免单个文件过大

    python
    # 检查文件大小
    MAX_FILE_SIZE = 100 * 1024 * 1024  # 100MB
    
    if file_path.stat().st_size > MAX_FILE_SIZE:
        return "文件过大,无法处理"

版本兼容性

插件升级时注意数据结构的兼容性:

python
@plugin.mount_init_method()
async def migrate_data():
    """数据迁移示例"""

    # 检查版本
    version = await plugin.store.get(store_key="data_version")

    if version is None or version < "2.0.0":
        # 执行数据迁移
        core.logger.info("检测到旧版本数据,开始迁移...")

        # 迁移逻辑...

        # 更新版本号
        await plugin.store.set(store_key="data_version", value="2.0.0")
        core.logger.success("数据迁移完成")

通过合理使用 KV 键值存储和插件持久化目录,你的插件可以高效、可靠地管理各种类型的数据。