数据存储
插件开发中经常需要持久化数据,例如用户偏好、会话状态、缓存信息、配置文件、模型文件等。Nekro Agent 为插件提供了两种互补的数据存储方式,满足不同的存储需求。
存储方式概览
Nekro Agent 插件系统提供两种数据存储方式:
1. KV 键值存储 (plugin.store)
- 存储类型:数据库支持的键值对存储
- 数据形式:字符串类型(需序列化复杂数据)
- 适用场景:小型结构化数据、配置项、状态信息
- 访问方式:通过
plugin.storeAPI 异步访问 - 数据作用域:支持会话级、用户级、插件全局三种作用域
2. 插件持久化目录 (plugin.get_plugin_path())
- 存储类型:文件系统目录
- 数据形式:任意文件和二进制数据
- 适用场景:大文件、二进制数据、模型文件、资源文件
- 访问方式:通过
pathlib.Path进行文件系统操作 - 数据作用域:插件独占目录,需自行管理子目录结构
选择指南
| 存储需求 | 推荐方式 | 原因 |
|---|---|---|
| 用户偏好设置 | KV 存储 | 小型结构化数据,支持作用域隔离 |
| 会话状态 | KV 存储 | 需要会话级别的数据隔离 |
| 配置缓存 | KV 存储 | 快速读写,易于查询和更新 |
| 图片、音频、视频 | 持久化目录 | 大型二进制文件 |
| 机器学习模型 | 持久化目录 | 大文件,不适合数据库存储 |
| 日志文件 | 持久化目录 | 持续追加,文件操作更高效 |
| 临时文件 | 持久化目录 | 文件系统操作更灵活 |
| 数据集文件 | 持久化目录 | 大量数据,可能需要流式处理 |
方式一:KV 键值存储
概述
plugin.store 提供了一组异步方法来操作存储在数据库中的键值对数据。其主要特点包括:
- 键值存储:简单直观的 KV 存储模型
- 数据隔离:每个插件拥有独立的命名空间,避免键名冲突
- 作用域数据:支持三种数据作用域
- 会话特定数据 (
chat_key): 数据与特定的聊天会话绑定 - 用户特定数据 (
user_key): 数据与特定的用户绑定(跨会话) - 插件全局数据 (无 key): 数据属于插件本身,不与任何特定会话或用户关联
- 会话特定数据 (
- 字符串存储:底层存储字符串,复杂数据需要序列化
核心 API
1. 设置数据 (set)
向存储中添加或更新键值对。
async def set(
self,
chat_key: str = "", # 可选,会话标识
user_key: str = "", # 可选,用户标识
store_key: str = "", # 必需,存储的键名
value: str = "" # 必需,要存储的值 (字符串)
) -> Literal[0, 1]: # 返回 1 表示成功,0 表示失败示例:
from nekro_agent.api.schemas import AgentCtx
from nekro_agent.api import core
import json
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_preference", "保存用户偏好")
async def save_preference(_ctx: AgentCtx, key: str, value: str) -> str:
"""保存用户偏好设置"""
# 存储会话特定数据
await plugin.store.set(
chat_key=_ctx.from_chat_key,
store_key="last_command",
value="/weather London"
)
# 存储用户特定偏好(跨会话)
user_prefs = {"theme": "dark", "notifications": True}
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="preferences",
value=json.dumps(user_prefs)
)
# 存储插件全局配置
await plugin.store.set(
store_key="plugin_last_updated",
value=str(time.time())
)
return "偏好设置已保存"2. 获取数据 (get)
根据键名从存储中检索数据。
async def get(
self,
chat_key: str = "", # 可选,会话标识
user_key: str = "", # 可选,用户标识
store_key: str = "" # 必需,存储的键名
) -> Optional[str]: # 返回存储的字符串值,不存在则返回 None示例:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_preference", "获取用户偏好")
async def get_preference(_ctx: AgentCtx, key: str) -> str:
"""获取用户偏好设置"""
# 获取会话特定数据
last_command = await plugin.store.get(
chat_key=_ctx.from_chat_key,
store_key="last_command"
)
# 获取用户偏好,如果不存在则使用默认值
prefs_str = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="preferences"
)
if prefs_str:
user_preferences = json.loads(prefs_str)
else:
# 默认值
user_preferences = {"theme": "light", "notifications": False}
# 获取插件全局数据
timestamp_str = await plugin.store.get(store_key="plugin_last_updated")
last_updated = float(timestamp_str) if timestamp_str else None
return f"偏好: {user_preferences}"3. 删除数据 (delete)
根据键名从存储中移除键值对。
async def delete(
self,
chat_key: str = "", # 可选,会话标识
user_key: str = "", # 可选,用户标识
store_key: str = "" # 必需,存储的键名
) -> Literal[0, 1]: # 返回 1 表示成功,0 表示失败示例:
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "clear_cache", "清除缓存")
async def clear_cache(_ctx: AgentCtx) -> str:
"""清除会话缓存数据"""
# 删除会话的特定缓存
await plugin.store.delete(
chat_key=_ctx.from_chat_key,
store_key="session_cache_data"
)
# 删除用户的某个设置
await plugin.store.delete(
user_key=_ctx.from_user_id,
store_key="old_setting"
)
return "缓存已清除"4. 检查数据是否存在
通过 get 方法返回值判断键是否存在:
value = await plugin.store.get(chat_key=_ctx.from_chat_key, store_key="my_key")
if value is not None:
core.logger.info("'my_key' 存在于存储中")
else:
core.logger.info("'my_key' 不存在")存储结构化数据
由于 KV 存储只支持字符串,存储复杂数据结构时需要序列化。推荐使用 Pydantic 模型:
from pydantic import BaseModel
from typing import List, Dict, Optional
import time
class Note(BaseModel):
id: str
title: str
content: str
created_at: float
tags: List[str] = []
class UserNotes(BaseModel):
notes: Dict[str, Note] = {}
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "add_note", "添加笔记")
async def add_note(_ctx: AgentCtx, note_id: str, title: str, content: str, tags_str: str = "") -> str:
"""为当前用户添加笔记"""
# 1. 获取现有笔记数据
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if user_notes_json:
user_notes_data = UserNotes.model_validate_json(user_notes_json)
else:
user_notes_data = UserNotes()
# 2. 创建新笔记
new_note = Note(
id=note_id,
title=title,
content=content,
created_at=time.time(),
tags=tags_str.split(',') if tags_str else []
)
user_notes_data.notes[note_id] = new_note
# 3. 序列化并存储
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="all_notes",
value=user_notes_data.model_dump_json()
)
return f"笔记 '{title}' 已添加,ID: {note_id}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "get_note", "获取笔记")
async def get_note(_ctx: AgentCtx, note_id: str) -> str:
"""获取指定 ID 的笔记内容"""
user_notes_json = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="all_notes"
)
if not user_notes_json:
return "用户没有任何笔记"
user_notes_data = UserNotes.model_validate_json(user_notes_json)
note = user_notes_data.notes.get(note_id)
if note:
return f"标题: {note.title}\n内容: {note.content}\n标签: {', '.join(note.tags)}"
return f"未找到 ID 为 '{note_id}' 的笔记"方式二:插件持久化目录
概述
plugin.get_plugin_path() 返回一个 pathlib.Path 对象,指向插件专属的文件系统目录。每个插件拥有独立的目录,通常位于:
data/plugins/<plugin_author>.<plugin_module_name>/这种方式适合存储大文件、二进制数据、需要复杂文件结构的场景。
获取插件目录
from pathlib import Path
from nekro_agent.api import core
# 获取插件数据目录
plugin_dir: Path = plugin.get_plugin_path()
core.logger.info(f"插件数据目录: {plugin_dir}")
# 示例输出:data/plugins/my_author.my_plugin/基础文件操作
示例 1:保存文本文件
import aiofiles
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_log", "保存日志")
async def save_log(_ctx: AgentCtx, log_content: str) -> str:
"""保存日志到文件"""
# 获取插件目录
plugin_dir = plugin.get_plugin_path()
# 创建日志子目录
logs_dir = plugin_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
# 保存日志文件
log_file = logs_dir / f"log_{int(time.time())}.txt"
async with aiofiles.open(log_file, "w", encoding="utf-8") as f:
await f.write(log_content)
core.logger.info(f"日志已保存: {log_file}")
return f"日志已保存到 {log_file.name}"示例 2:保存二进制文件
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "save_image", "保存图片")
async def save_image(_ctx: AgentCtx, image_data: bytes, filename: str) -> str:
"""保存图片文件"""
plugin_dir = plugin.get_plugin_path()
images_dir = plugin_dir / "images"
images_dir.mkdir(parents=True, exist_ok=True)
image_path = images_dir / filename
async with aiofiles.open(image_path, "wb") as f:
await f.write(image_data)
return f"图片已保存: {image_path}"示例 3:读取文件
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "read_config", "读取配置文件")
async def read_config(_ctx: AgentCtx) -> str:
"""读取插件配置文件"""
plugin_dir = plugin.get_plugin_path()
config_file = plugin_dir / "config.json"
# 检查文件是否存在
if not config_file.exists():
return "配置文件不存在"
# 读取文件
async with aiofiles.open(config_file, "r", encoding="utf-8") as f:
config_content = await f.read()
return f"配置内容: {config_content}"高级文件操作示例
示例 4:管理资源文件
from typing import List
@plugin.mount_init_method()
async def init_plugin_resources():
"""插件初始化时准备资源目录结构"""
plugin_dir = plugin.get_plugin_path()
# 创建多个子目录
directories = [
plugin_dir / "cache",
plugin_dir / "models",
plugin_dir / "exports",
plugin_dir / "temp",
plugin_dir / "user_uploads"
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
core.logger.info(f"已创建目录: {directory}")
# 创建默认配置文件(如果不存在)
default_config = plugin_dir / "config.json"
if not default_config.exists():
default_settings = {
"version": "1.0.0",
"enabled": True,
"cache_ttl": 3600
}
async with aiofiles.open(default_config, "w") as f:
await f.write(json.dumps(default_settings, indent=2))
core.logger.success(f"已创建默认配置文件: {default_config}")
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "list_exports", "列出导出文件")
async def list_exports(_ctx: AgentCtx) -> str:
"""列出所有导出的文件"""
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
if not exports_dir.exists():
return "导出目录不存在"
# 列出目录中的文件
files = [f.name for f in exports_dir.iterdir() if f.is_file()]
if not files:
return "暂无导出文件"
return f"导出文件列表:\n" + "\n".join(f"- {f}" for f in files)示例 5:下载和缓存外部资源
import hashlib
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "cache_resource", "缓存外部资源")
async def cache_external_resource(_ctx: AgentCtx, url: str) -> str:
"""下载并缓存外部资源"""
# 动态导入 requests
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests>=2.25.0")
plugin_dir = plugin.get_plugin_path()
cache_dir = plugin_dir / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
# 使用 URL 的哈希作为文件名
url_hash = hashlib.md5(url.encode()).hexdigest()
cache_file = cache_dir / f"{url_hash}.cache"
# 检查缓存是否存在
if cache_file.exists():
core.logger.info(f"使用缓存文件: {cache_file}")
async with aiofiles.open(cache_file, "rb") as f:
cached_data = await f.read()
return f"已从缓存加载,大小: {len(cached_data)} 字节"
# 下载资源
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
content = response.content
# 保存到缓存
async with aiofiles.open(cache_file, "wb") as f:
await f.write(content)
core.logger.success(f"资源已下载并缓存: {cache_file}")
return f"资源已下载,大小: {len(content)} 字节"
except Exception as e:
core.logger.error(f"下载资源失败: {e}")
return f"下载失败: {e}"示例 6:清理临时文件
import time
import os
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "cleanup_temp", "清理临时文件")
async def cleanup_temp_files(_ctx: AgentCtx, max_age_hours: int = 24) -> str:
"""清理超过指定时间的临时文件"""
plugin_dir = plugin.get_plugin_path()
temp_dir = plugin_dir / "temp"
if not temp_dir.exists():
return "临时目录不存在"
now = time.time()
max_age_seconds = max_age_hours * 3600
deleted_count = 0
# 遍历临时目录
for file_path in temp_dir.iterdir():
if file_path.is_file():
# 检查文件修改时间
file_age = now - file_path.stat().st_mtime
if file_age > max_age_seconds:
try:
os.remove(file_path)
deleted_count += 1
core.logger.info(f"已删除过期临时文件: {file_path.name}")
except Exception as e:
core.logger.error(f"删除文件失败: {e}")
return f"清理完成,删除了 {deleted_count} 个文件"文件与 AI 沙盒交互
当需要将插件目录中的文件传递给 AI 或接收 AI 生成的文件时,需要使用文件系统 API:
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "process_user_file", "处理用户上传的文件")
async def process_user_file(_ctx: AgentCtx, file_path: str) -> str:
"""处理用户上传的文件并保存到插件目录"""
# 1. 获取 AI 传递的文件的真实路径
host_path = _ctx.fs.get_file(file_path)
# 2. 读取文件内容
async with aiofiles.open(host_path, "rb") as f:
file_content = await f.read()
# 3. 保存到插件目录
plugin_dir = plugin.get_plugin_path()
uploads_dir = plugin_dir / "user_uploads"
uploads_dir.mkdir(parents=True, exist_ok=True)
# 使用时间戳作为唯一文件名
saved_file = uploads_dir / f"{int(time.time())}_{host_path.name}"
async with aiofiles.open(saved_file, "wb") as f:
await f.write(file_content)
core.logger.success(f"文件已保存到插件目录: {saved_file}")
return f"文件已处理并保存,大小: {len(file_content)} 字节"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "export_data", "导出数据文件")
async def export_data(_ctx: AgentCtx, data_content: str, filename: str) -> str:
"""导出数据文件并返回给 AI"""
# 1. 保存到插件目录
plugin_dir = plugin.get_plugin_path()
exports_dir = plugin_dir / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
export_file = exports_dir / filename
async with aiofiles.open(export_file, "w", encoding="utf-8") as f:
await f.write(data_content)
# 2. 转换为 AI 可访问的沙盒路径
sandbox_path = await _ctx.fs.mixed_forward_file(export_file, file_name=filename)
return f"数据已导出: {sandbox_path}"详细的文件传递机制请参阅 文件交互 章节。
两种存储方式的结合使用
在实际应用中,通常会结合使用两种存储方式:
from pydantic import BaseModel
from typing import Optional
class ModelInfo(BaseModel):
"""模型信息(存储在 KV)"""
name: str
version: str
file_path: str # 实际模型文件的路径(相对于插件目录)
size_bytes: int
created_at: float
@plugin.mount_sandbox_method(SandboxMethodType.BEHAVIOR, "download_model", "下载模型")
async def download_model(_ctx: AgentCtx, model_url: str, model_name: str) -> str:
"""下载机器学习模型并保存元数据"""
from nekro_agent.api.plugin import dynamic_import_pkg
requests = dynamic_import_pkg("requests")
# 1. 下载模型文件到插件目录
plugin_dir = plugin.get_plugin_path()
models_dir = plugin_dir / "models"
models_dir.mkdir(parents=True, exist_ok=True)
model_file = models_dir / f"{model_name}.model"
try:
response = requests.get(model_url, timeout=300)
response.raise_for_status()
model_data = response.content
# 保存模型文件
async with aiofiles.open(model_file, "wb") as f:
await f.write(model_data)
# 2. 保存模型元数据到 KV 存储
model_info = ModelInfo(
name=model_name,
version="1.0.0",
file_path=f"models/{model_name}.model", # 相对路径
size_bytes=len(model_data),
created_at=time.time()
)
await plugin.store.set(
store_key=f"model_info:{model_name}",
value=model_info.model_dump_json()
)
return f"模型 '{model_name}' 下载成功,大小: {len(model_data)} 字节"
except Exception as e:
return f"下载失败: {e}"
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "load_model", "加载模型")
async def load_model(_ctx: AgentCtx, model_name: str) -> str:
"""加载已下载的模型"""
# 1. 从 KV 存储获取模型元数据
model_info_json = await plugin.store.get(store_key=f"model_info:{model_name}")
if not model_info_json:
return f"模型 '{model_name}' 不存在"
model_info = ModelInfo.model_validate_json(model_info_json)
# 2. 从插件目录加载模型文件
plugin_dir = plugin.get_plugin_path()
model_file = plugin_dir / model_info.file_path
if not model_file.exists():
return f"模型文件不存在: {model_file}"
# 读取模型文件(示例)
async with aiofiles.open(model_file, "rb") as f:
model_data = await f.read()
return f"模型已加载: {model_info.name} (版本 {model_info.version},大小 {model_info.size_bytes} 字节)"最佳实践
KV 存储最佳实践
明确键名策略:使用清晰、有结构的
store_keypython# ✅ 推荐 "user_prefs:theme" "chat_state:topic" "cache:api:last_fetch" # ❌ 不推荐 "data" "temp" "x"数据序列化:复杂数据使用 Pydantic 或 JSON 序列化
python# ✅ 推荐:使用 Pydantic await plugin.store.set( user_key=user_id, store_key="settings", value=UserSettings(...).model_dump_json() ) # ✅ 可接受:使用 JSON await plugin.store.set( store_key="config", value=json.dumps(config_dict) ) # ❌ 不推荐:存储未序列化的对象 await plugin.store.set(store_key="obj", value=str(my_object))错误处理:处理数据不存在的情况
python# ✅ 推荐 value = await plugin.store.get(store_key="key") if value is not None: # 处理数据 pass else: # 使用默认值或报错 value = "default"作用域选择:根据数据特性选择合适的作用域
- 会话临时状态 →
chat_key - 用户个人设置 →
user_key - 插件全局配置 → 无 key
- 会话临时状态 →
数据清理:定期清理不需要的数据
python@plugin.mount_on_channel_reset() async def on_reset(_ctx: AgentCtx): """会话重置时清理数据""" await plugin.store.delete( chat_key=_ctx.from_chat_key, store_key="temp_cache" )大小限制:KV 存储适合小型数据(< 1MB)
- 大于 1MB 的数据应使用持久化目录
- 在 KV 中存储文件路径引用
持久化目录最佳实践
目录结构规划:使用清晰的子目录结构
pythonplugin_dir/ ├── cache/ # 缓存文件 ├── models/ # 模型文件 ├── exports/ # 导出文件 ├── temp/ # 临时文件 ├── user_uploads/ # 用户上传 └── config.json # 配置文件确保目录存在:操作前创建必要的目录
python# ✅ 推荐 target_dir = plugin_dir / "subdir" target_dir.mkdir(parents=True, exist_ok=True) # ❌ 不推荐:不检查直接写入 file_path = plugin_dir / "subdir" / "file.txt" # 如果 subdir 不存在会报错异步文件操作:使用
aiofiles进行异步 I/Opython# ✅ 推荐:异步操作 async with aiofiles.open(file_path, "w") as f: await f.write(content) # ❌ 不推荐:同步操作(阻塞事件循环) with open(file_path, "w") as f: f.write(content)文件命名规范:使用时间戳或 UUID 避免冲突
python# ✅ 推荐 filename = f"{int(time.time())}_{original_name}" filename = f"{uuid.uuid4()}_{original_name}" # ❌ 不推荐:可能冲突 filename = "output.txt"定期清理:清理临时文件和过期缓存
python@plugin.mount_cleanup_method() async def cleanup(): """插件卸载时清理临时文件""" plugin_dir = plugin.get_plugin_path() temp_dir = plugin_dir / "temp" if temp_dir.exists(): for file in temp_dir.iterdir(): if file.is_file(): os.remove(file)错误处理:妥善处理文件操作错误
pythontry: async with aiofiles.open(file_path, "r") as f: content = await f.read() except FileNotFoundError: core.logger.error(f"文件不存在: {file_path}") return "文件不存在" except Exception as e: core.logger.error(f"读取文件失败: {e}") return f"读取失败: {e}"文件大小控制:避免单个文件过大
python# 检查文件大小 MAX_FILE_SIZE = 100 * 1024 * 1024 # 100MB if file_path.stat().st_size > MAX_FILE_SIZE: return "文件过大,无法处理"
版本兼容性
插件升级时注意数据结构的兼容性:
@plugin.mount_init_method()
async def migrate_data():
"""数据迁移示例"""
# 检查版本
version = await plugin.store.get(store_key="data_version")
if version is None or version < "2.0.0":
# 执行数据迁移
core.logger.info("检测到旧版本数据,开始迁移...")
# 迁移逻辑...
# 更新版本号
await plugin.store.set(store_key="data_version", value="2.0.0")
core.logger.success("数据迁移完成")通过合理使用 KV 键值存储和插件持久化目录,你的插件可以高效、可靠地管理各种类型的数据。