插件实例与生命周期
每个 Nekro Agent 插件的核心都是一个 NekroPlugin
类的实例。这个实例不仅定义了插件的基本信息,还作为各种功能和事件回调的挂载点。理解插件实例的创建和其完整的生命周期对于开发稳定且功能完善的插件至关重要。
NekroPlugin
实例
在你的插件主文件 (例如 plugin.py
) 中,首先需要导入并创建一个 NekroPlugin
实例:
from nekro_agent.api.plugin import NekroPlugin
plugin = NekroPlugin(
name="我的酷炫插件",
module_name="my_cool_plugin",
description="这个插件能做很多酷炫的事情。",
version="1.0.0",
author="开发者名称",
url="https://github.com/developer/my_cool_plugin",
support_adapter=["onebot_v11", "telegram"], # 可选,支持的适配器列表
is_builtin=False, # 通常第三方插件为 False
is_package=False # 如果你的插件是一个复杂的包结构,可能需要调整
)
核心参数解析:
name
(str): 插件的显示名称,将出现在 Nekro Agent 的用户界面中。module_name
(str): 插件的模块名称,必须是 Python 模块的有效名称,并且在所有插件中保持唯一。通常与插件的目录名一致。它用于内部识别和管理插件。description
(str): 插件的简短描述,说明其主要功能。version
(str): 插件的版本号,建议遵循语义化版本规范 (如1.0.0
)。author
(str): 插件的作者或开发团队名称,必须由英文字母、数字、下划线组成,且不能以数字开头。url
(str): 指向插件项目仓库、文档或主页的 URL。support_adapter
(List[str], 可选): 插件支持的适配器列表,如["onebot_v11", "telegram"]
。如果未指定,则默认支持所有适配器。is_builtin
(bool, 可选, 默认为False
): 标记插件是否为 Nekro Agent 内置插件。is_package
(bool, 可选, 默认为False
): 标记插件是否作为一个 Python 包进行管理。
NekroPlugin
实例 (plugin
) 将作为后续所有装饰器的调用对象,用于挂载沙盒方法、配置、生命周期回调等。
插件生命周期
插件的生命周期描述了从插件被系统发现、加载、运行到最终卸载的整个过程。Nekro Agent 提供了一系列装饰器,允许你在生命周期的特定阶段执行自定义逻辑。
1. 初始化 (@plugin.mount_init_method()
)
当 Nekro Agent 首次加载插件时(通常在 Agent 启动或插件被动态启用时),会执行通过 @plugin.mount_init_method()
装饰器注册的异步函数。这个阶段非常适合执行只需要进行一次的设置工作。
用途:
- 资源准备:创建或连接数据库、初始化文件系统(如创建插件所需的数据目录)。
- 状态初始化:设置插件的初始状态、加载默认配置或数据。
- 外部系统连接:如果插件需要与外部服务交互,可以在此建立连接。
- 环境验证:检查必要的依赖项或环境变量是否满足。
示例:
from nekro_agent.api import core
from qdrant_client import models as qdrant_models # 假设使用 qdrant
@plugin.mount_init_method()
async def initialize_plugin():
core.logger.info(f"插件 '{plugin.name}' 正在初始化...")
# 示例:初始化向量数据库集合
try:
client = await core.get_qdrant_client() # 获取 Qdrant 客户端
collection_name = plugin.get_vector_collection_name("my_data") # 为插件生成唯一的集合名
collections = await client.get_collections()
collection_names = [collection.name for collection in collections.collections]
if collection_name not in collection_names:
core.logger.info(f"正在创建向量数据库集合: {collection_name}")
await client.create_collection(
collection_name=collection_name,
vectors_config=qdrant_models.VectorParams(
size=768, # 示例维度,根据你的 embedding 模型调整
distance=qdrant_models.Distance.COSINE,
),
)
core.logger.success(f"集合 {collection_name} 创建成功")
else:
core.logger.info(f"集合 {collection_name} 已存在")
except Exception as e:
core.logger.error(f"插件 '{plugin.name}' 初始化向量数据库失败: {e}")
core.logger.success(f"插件 '{plugin.name}' 初始化完成。")
注意:初始化方法应该是异步的 (async def
)。如果初始化过程中发生错误,应妥善处理并记录日志,确保 Agent 能够继续运行。
2. 会话重置回调 (@plugin.mount_on_channel_reset()
)
当一个特定的聊天会话被重置时(例如用户清空了聊天历史或会话状态),通过 @plugin.mount_on_channel_reset()
装饰器注册的异步函数会被调用。这个回调会接收一个 AgentCtx
对象,其中包含了被重置会话的相关信息。
用途:
- 清理与特定会话相关的插件数据(如缓存、状态)。
- 重置该会话的插件特定设置。
示例:
from nekro_agent.api.schemas import AgentCtx
@plugin.mount_on_channel_reset()
async def handle_channel_reset(_ctx: AgentCtx):
core.logger.info(f"插件 '{plugin.name}' 收到会话 {_ctx.from_chat_key} 重置事件。")
# 示例:清除该会话的插件特定缓存
# await plugin.store.delete(chat_key=ctx.from_chat_key, store_key="session_cache")
core.logger.info(f"会话 {_ctx.from_chat_key} 的插件特定数据已清理。")
注意:滥用此回调可能会影响 Agent 的性能或正常对话流程。大多数插件功能应通过沙盒方法实现。
3. 用户消息回调 (@plugin.mount_on_user_message()
)
当用户发送消息时,通过 @plugin.mount_on_user_message()
装饰器注册的异步函数会被调用。这个回调可以用于监听、预处理或拦截用户消息。
用途:
- 消息预处理:在 AI 处理之前对用户消息进行处理、过滤或增强。
- 权限控制:根据用户身份或消息内容进行访问控制。
- 消息统计:记录用户的消息使用情况或行为分析。
- 自动回复:对特定类型的消息提供即时响应。
示例:
from nekro_agent.api.message import ChatMessage
from nekro_agent.api.signal import MsgSignal
@plugin.mount_on_user_message()
async def handle_user_message(_ctx: AgentCtx, message: ChatMessage) -> MsgSignal | None:
"""处理用户消息的回调函数"""
# 示例:消息内容过滤
if "禁止词" in message.content_text:
core.logger.warning(f"用户 {_ctx.from_user_id} 发送了包含禁止词的消息")
return MsgSignal.BLOCK_ALL # 阻止消息被记录到历史并阻止后续处理
# 示例:特殊命令处理
if message.content_text.startswith("!plugin:"):
command = message.content_text[8:].strip()
await _ctx.send_text(f"收到插件命令: {command}")
return MsgSignal.BLOCK_TRIGGER # 允许消息被记录但阻止触发AI处理
# 示例:消息统计
await plugin.store.set(
user_key=_ctx.from_user_id,
store_key="message_count",
value=str(int(await plugin.store.get(user_key=_ctx.from_user_id, store_key="message_count") or "0") + 1)
)
# 返回 None 表示消息正常处理
return None
返回值说明:
None
或MsgSignal.CONTINUE
: 消息正常处理,继续传递给后续流程MsgSignal.BLOCK_TRIGGER
: 允许消息被记录到历史,但阻止消息触发后续处理MsgSignal.BLOCK_ALL
: 阻止消息被记录到历史,同时也会阻止消息触发后续处理
4. 系统消息回调 (@plugin.mount_on_system_message()
)
当系统发送消息时(如管理员命令、系统通知等),通过 @plugin.mount_on_system_message()
装饰器注册的异步函数会被调用。
用途:
- 系统事件监听:监听系统级别的消息和事件。
- 管理功能集成:响应管理员命令或系统状态变化。
- 日志记录:记录重要的系统消息。
示例:
@plugin.mount_on_system_message()
async def handle_system_message(_ctx: AgentCtx, message: str) -> MsgSignal | None:
"""处理系统消息的回调函数"""
core.logger.info(f"插件 '{plugin.name}' 收到管理员命令: {admin_command}")
return None
5. 提示词注入 (@plugin.mount_prompt_inject_method()
)
提示词注入允许插件在 AI 处理用户消息之前,向系统提示词中动态添加内容。这是影响 AI 行为和提供上下文信息的重要机制。
用途:
- 上下文增强:为 AI 提供当前会话的状态信息、用户偏好等。
- 角色定义:为 AI 设定特定的角色或行为模式。
- 功能提示 为 AI 提供更详细的插件使用指导说明。
示例:
@plugin.mount_prompt_inject_method(
name="user_preference_injection",
description="向 AI 注入用户偏好和状态信息"
)
async def inject_user_context(_ctx: AgentCtx) -> str:
"""注入用户上下文信息到 AI 提示词中"""
# 获取用户偏好
user_preferences = await plugin.store.get(
user_key=_ctx.from_user_id,
store_key="preferences"
)
# 获取会话状态
session_mode = await plugin.store.get(
chat_key=_ctx.from_chat_key,
store_key="mode"
)
prompt_parts = []
if user_preferences:
prompt_parts.append(f"用户偏好设置:{user_preferences}")
if session_mode:
prompt_parts.append(f"当前会话模式:{session_mode}")
return "\n".join(prompt_parts)
详细的提示词注入机制请参阅 提示词注入 章节。
6. 清理 (@plugin.mount_cleanup_method()
)
当插件被卸载或 Nekro Agent 关闭时,会执行通过 @plugin.mount_cleanup_method()
装饰器注册的异步函数。这个阶段用于释放插件占用的所有资源,确保系统干净地关闭或重载插件。
用途:
- 释放资源:关闭数据库连接、释放文件句柄、停止后台任务等。
- 重置状态:清空全局变量或缓存,确保下次加载时是全新的状态。
- 外部系统断开连接:如果插件与外部服务保持长连接,应在此处断开。
示例:
@plugin.mount_cleanup_method()
async def cleanup_plugin():
core.logger.info(f"插件 '{plugin.name}' 正在清理...")
# 示例:关闭可能存在的数据库连接池或清除临时文件
# await close_database_connections()
# await clear_temporary_files()
core.logger.success(f"插件 '{plugin.name}' 清理完成。")
重要性: 正确的清理对于插件的稳定性和可维护性至关重要,尤其是在开发和调试阶段频繁重载插件时。如果未能正确清理资源,可能导致:
- 资源泄漏:如内存泄漏、文件句柄未释放。
- 状态冲突:重载后的插件可能继承了旧插件的残留状态。
- 行为异常:插件表现不符合预期。
确保你的初始化和清理方法成对出现,管理好插件的资源和状态。
高级特性
动态方法收集过滤器 (@plugin.mount_collect_methods()
)
动态方法收集过滤器允许插件根据上下文(如用户权限、会话状态等)动态决定哪些沙盒方法在当前状态可用。这对于实现权限控制和个性化功能非常有用。
用途:
- 权限控制:根据用户身份限制可用功能。
- 条件功能:根据会话状态或配置动态启用/禁用功能。
- 个性化体验:为不同用户提供不同的功能集合。
示例:
@plugin.mount_collect_methods()
async def collect_available_methods(_ctx: AgentCtx) -> List[SandboxMethod]:
"""根据上下文动态收集可用方法"""
# 检查会话类型
if _ctx.channel_type == "group":
# 群聊专用功能
return [group_method_xxx, group_method_yyy]
else:
return [private_method_xxx, private_method_yyy]
插件文档支持
NekroPlugin
支持自动加载和显示插件文档,提升用户体验:
文档加载优先级:
- 插件目录下的
README.md
文件 - 插件模块的
__doc__
字符串
示例:
"""
我的酷炫插件
这个插件提供了以下功能:
- 用户偏好管理
- 智能推荐系统
- 数据分析工具
## 使用方法
1. 配置 API 密钥
2. 设置用户偏好
3. 开始使用插件功能
## 注意事项
- 确保网络连接正常
- API 密钥需要定期更新
"""
from nekro_agent.api.plugin import NekroPlugin
plugin = NekroPlugin(
name="我的酷炫插件",
# ... 其他参数
)
# 插件实现代码...
向量数据库集合命名
插件提供了便捷的向量数据库集合命名方法,确保不同插件之间的数据隔离:
# 获取插件专用的集合名称
collection_name = plugin.get_vector_collection_name("user_embeddings")
# 返回: "author.module_name-user_embeddings"
# 获取插件默认集合名称
default_collection = plugin.get_vector_collection_name()
# 返回: "author.module_name"
插件存储访问
每个插件都有独立的持久化 KV 存储,通过 plugin.store
属性访问。
# 在任何插件方法中使用存储
@plugin.mount_sandbox_method(SandboxMethodType.TOOL, "save_data", "保存数据")
async def save_data(_ctx: AgentCtx, key: str, value: str) -> str:
await plugin.store.set(
chat_key=_ctx.from_chat_key,
store_key=key,
value=value
)
return f"数据已保存: {key} = {value}"
详细的存储使用方法请参阅 数据存储 章节。
插件配置访问
插件可以通过 plugin.config
属性访问其配置:
@plugin.mount_sandbox_method(SandboxMethodType.AGENT, "get_status", "获取插件状态")
async def get_plugin_status(_ctx: AgentCtx) -> str:
config = plugin.get_config()
return f"""
插件状态:
- 名称: {plugin.name}
- 版本: {plugin.version}
- 是否启用: {plugin.is_enabled}
- API 端点: {config.api_endpoint if hasattr(config, 'api_endpoint') else '未配置'}
"""
通过合理使用这些特性,你可以创建出功能强大、用户友好的 Nekro Agent 插件。