使用向量数据库 (Qdrant)
Nekro Agent 集成了 Qdrant 向量数据库,并为插件提供了访问其客户端的便捷方式。这使得插件能够利用向量相似性搜索来实现强大的语义理解、信息检索、内容推荐等功能,而无需自行搭建和管理向量数据库实例。
什么是向量数据库?
向量数据库专门用于存储、管理和搜索高维向量数据。这些向量通常是文本、图片、音频或其他数据的数学表示(称为嵌入, Embeddings),由深度学习模型(如 Transformer 模型)生成。
通过比较向量之间的距离或相似度(如余弦相似度、欧氏距离),向量数据库可以快速找到与给定查询向量最相似的条目,从而实现:
- 语义搜索:不仅仅是关键词匹配,而是理解查询的语义含义进行搜索。
- 推荐系统:根据用户已喜欢的内容向量,推荐相似的内容。
- 问答系统:将问题和文档块转换为向量,找到与问题最相关的文档块作为答案来源。
- 异常检测、聚类等。
Nekro Agent 中的 Qdrant 集成
Nekro Agent 在其核心服务中初始化并管理了一个 Qdrant 客户端实例。插件可以通过 nekro_agent.api.core
模块来获取这个客户端,并利用它与 Qdrant 服务器进行交互。
1. 获取 Qdrant 客户端
在你的插件代码中(通常是在沙盒方法或初始化方法内),你可以像这样获取 Qdrant 客户端:
from typing import Optional
from nekro_agent.api import core
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct, Distance, VectorParams
async def get_my_qdrant_client() -> QdrantClient:
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.error("无法获取 Qdrant 客户端实例!请检查 Agent 核心配置。")
# 可以根据插件逻辑决定是抛出异常还是返回 None
raise ConnectionError("Qdrant client is not available.")
return qdrant_client
core.get_qdrant_client()
是一个异步函数,它会返回一个已配置好的 QdrantClient
实例,或者在 Qdrant 未正确配置或不可用时返回 None
。因此,在使用前进行检查非常重要。
你还可以通过 core.get_qdrant_config()
获取 Qdrant 的连接配置信息(如 host, port, api_key 等),但这通常由 Agent 核心管理,插件开发者较少直接需要。
2. 插件专属的集合名称
为了避免不同插件在 Qdrant 中使用相同的集合名称造成冲突,Nekro Agent 提供了一个辅助方法 plugin.get_vector_collection_name()
来为你的插件生成一个唯一的、标准化的集合名称。
# 假设 plugin 是你的 NekroPlugin 实例
# from nekro_agent.api.plugin import NekroPlugin
# plugin = NekroPlugin(...)
# 生成一个默认的集合名称,基于插件的作者和模块名
default_collection_name = plugin.get_vector_collection_name()
# 示例输出 (假设插件作者 "MyAuthor", 模块名 "MyPlugin"):
# "MyAuthor.MyPlugin"
# 为插件内的特定用途生成带有后缀的集合名称
specific_collection_name = plugin.get_vector_collection_name("user_documents")
# 示例输出: "MyAuthor.MyPlugin-documents"
强烈建议插件在 Qdrant 中创建和使用通过此方法生成的集合名称,以确保良好的隔离性和避免命名冲突。
3. 初始化时检查和创建集合
在插件初始化时,通常需要检查所需的 Qdrant 集合是否存在,如果不存在则创建它。这可以确保插件在后续操作中能够正确访问集合。
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin # 假设的插件实例
from qdrant_client import QdrantClient, models
from typing import Optional # 确保导入 Optional
# 假设 plugin 是你的 NekroPlugin 实例
# plugin = NekroPlugin(name="MySamplePlugin", module_name="my_sample", author="MyAuthor", ...)
# EMBEDDING_DIMENSION = 1024 # 假设你的嵌入向量维度
async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
"""初始化插件的向量数据库集合"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.warning(f"[{plugin_instance.name}] 无法获取Qdrant客户端,跳过向量数据库集合初始化")
return
collection_name = plugin_instance.get_vector_collection_name()
try:
collections_response = await qdrant_client.get_collections()
existing_collections = [col.name for col in collections_response.collections]
if collection_name not in existing_collections:
core.logger.info(f"[{plugin_instance.name}] 正在创建向量数据库集合: {collection_name}")
await qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=embedding_dimension, # 向量维度,需要与你的嵌入模型一致
distance=models.Distance.COSINE, # 距离度量方式,COSINE 常用于文本
),
)
core.logger.success(f"[{plugin_instance.name}] 向量数据库集合 {collection_name} 创建成功")
else:
core.logger.info(f"[{plugin_instance.name}] 向量数据库集合 {collection_name} 已存在")
except Exception as e:
core.logger.error(f"[{plugin_instance.name}] 初始化向量数据库集合 {collection_name} 失败: {e}")
# 在你的插件初始化逻辑中调用 (例如 @plugin.mount_init_method())
# async def on_plugin_init():
# # plugin 应该是你的插件实例
# # MY_PLUGIN_EMBEDDING_DIMENSION 是你插件配置的维度
# await init_vector_db_collection(plugin, MY_PLUGIN_EMBEDDING_DIMENSION)
说明:
plugin_instance.get_vector_collection_name()
: 获取插件专属的集合名称。qdrant_client.get_collections()
: 获取 Qdrant 中所有集合的列表。qdrant_client.create_collection()
: 创建一个新的集合。collection_name
: 集合的名称。vectors_config
: 定义向量的参数。size
: 向量的维度,必须与你使用的嵌入模型生成的向量维度一致。distance
: 计算向量相似度的距离函数,常用的有Distance.COSINE
(余弦相似度),Distance.EUCLID
(欧氏距离),Distance.DOT
(点积)。对于文本嵌入,余弦相似度通常是最佳选择。
4. 配置嵌入模型和维度
为了将文本或其他数据转换为向量,你需要使用嵌入模型。Nekro Agent 允许你在插件配置中指定所使用的嵌入模型组及其维度。
在你的插件配置类 (继承自 nekro_agent.api.plugin.ConfigBase
) 中添加如下字段:
from pydantic import Field
from nekro_agent.api.plugin import ConfigBase
class MyPluginConfig(ConfigBase):
EMBEDDING_MODEL_GROUP: str = Field(
default="text-embedding", # 默认的模型组名,请根据实际情况修改
title="嵌入模型组",
description="用于生成文本嵌入向量的模型组名称。",
json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
)
EMBEDDING_DIMENSION: int = Field(
default=1024, # 默认维度,请根据你选择的模型组的实际输出维度修改
title="嵌入向量维度",
description="嵌入模型输出的向量维度。",
)
# ... 其他配置项
说明:
EMBEDDING_MODEL_GROUP
: 字符串字段,用于指定在 Nekro Agent 核心配置中定义的模型组名称。json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"}
: 这部分元数据用于在 Nekro Agent 的管理界面中提供模型组选择的下拉列表,并指明这是一个嵌入类型的模型。
EMBEDDING_DIMENSION
: 整数字段,表示你的嵌入模型输出向量的维度。这个值必须与init_vector_db_collection
中VectorParams
的size
参数以及实际模型输出的维度严格匹配,否则会导致错误。
获取配置:
# 假设 plugin 是你的 NekroPlugin 实例
# my_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
# embedding_model_group_name = my_config.EMBEDDING_MODEL_GROUP
# embedding_dimension = my_config.EMBEDDING_DIMENSION
5. 生成文本嵌入 (Embeddings)
获取到文本后,你需要调用相应的嵌入模型服务将其转换为向量。Nekro Agent 提供了工具来调用通过模型组配置的 OpenAI 兼容的嵌入接口。
from typing import List, Optional # 确保导入
from nekro_agent.api import core
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings # 导入辅助函数
# 假设 MyPluginConfig 是你的插件配置类
# plugin_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
async def generate_text_embedding(text_to_embed: str, plugin_config: MyPluginConfig) -> List[float]:
"""
使用配置的嵌入模型生成文本的嵌入向量。
"""
try:
# 1. 获取模型组配置信息
model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
if not model_group_info:
core.logger.error(f"无法找到名为 '{plugin_config.EMBEDDING_MODEL_GROUP}' 的模型组配置。")
raise ValueError(f"Embedding model group '{plugin_config.EMBEDDING_MODEL_GROUP}' not found.")
# 2. 调用嵌入模型生成向量
# 注意:gen_openai_embeddings 的 `model` 参数通常是模型组中的具体 chat/completion 模型名,
# 但API调用会根据 base_url 路由到嵌入端点。
# `dimensions` 参数是可选的,如果模型支持且你想指定输出维度(某些模型如 text-embedding-3-small 支持)。
# 如果不确定,可以先不传 dimensions,或确保其与 EMBEDDING_DIMENSION 一致。
embedding_vector: List[float] = await gen_openai_embeddings(
model=model_group_info.CHAT_MODEL, # 或者模型组中适合用于获取API信息的模型名称
input=text_to_embed,
api_key=model_group_info.API_KEY,
base_url=model_group_info.BASE_URL,
dimensions=plugin_config.EMBEDDING_DIMENSION # 明确指定维度
)
vector_dimension = len(embedding_vector)
core.logger.debug(f"生成嵌入向量: '{text_to_embed[:20]}...', 维度: {vector_dimension}")
# 3. 验证维度是否与配置一致
if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
core.logger.error(
f"嵌入向量维度不匹配!配置维度: {plugin_config.EMBEDDING_DIMENSION}, "
f"实际获取维度: {vector_dimension}. "
f"请检查插件配置 EMBEDDING_DIMENSION 或模型设置。"
)
# 根据实际情况,这里可能需要抛出异常或进行其他错误处理
raise ValueError(f"Embedding dimension mismatch.")
return embedding_vector
except Exception as e:
core.logger.error(f"生成文本嵌入失败: {e}")
raise # 或者处理后重新抛出特定类型的异常
# 使用示例:
# async def some_function():
# # plugin_config 应该是你加载后的插件配置实例
# my_text = "这是一段需要被向量化的文本"
# try:
# vector = await generate_text_embedding(my_text, plugin_config)
# # 接下来可以将 vector 存入 Qdrant
# except Exception as e:
# core.logger.error(f"处理文本 '{my_text}' 失败: {e}")
说明:
core_config.get_model_group_info()
: 根据模型组名称获取详细配置,包括 API Key 和 Base URL。gen_openai_embeddings()
: 这是 Nekro Agent 提供的一个辅助函数,用于调用 OpenAI 格式的嵌入 API。model
: 通常是模型组中的一个模型名称,API 服务会根据请求类型(这里是嵌入)路由到正确的端点。input
: 需要被嵌入的文本。dimensions
: (可选) 部分模型支持指定输出维度。确保这个值与你的EMBEDDING_DIMENSION
配置和 Qdrant 集合的维度一致。
- 维度校验:生成嵌入后,务必检查返回向量的维度是否与插件配置中的
EMBEDDING_DIMENSION
一致,这也是 Qdrant 集合所期望的维度。不一致会导致存储失败或搜索结果不准确。
6. 存储数据到 Qdrant
一旦你有了文本的向量表示,就可以将其连同其他元数据一起存储到 Qdrant 中。每个存储单元在 Qdrant 中称为一个 "Point"。
from typing import List, Dict, Any, Optional # 确保导入
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct # 确保导入
import hashlib # 用于示例ID转换
# 假设 qdrant_client 是已获取的客户端实例
# collection_name 是你的插件集合名称
# embedding_vector 是上一步生成的向量
# original_data_id 是你的数据在插件内部的唯一标识符,例如一个UUID或者数据库主键
async def store_vector_data(
qdrant_client: QdrantClient,
collection_name: str,
original_data_id: str, # 插件内部数据的唯一ID
embedding_vector: List[float],
payload_data: Dict[str, Any] # 其他需要与向量一起存储的元数据
):
"""将向量和元数据存储到Qdrant"""
try:
# Qdrant 的 Point ID 必须是数字或UUID。
# 如果你的 original_data_id 是字符串(非UUID格式),需要转换为数字。
# 示例:使用原始ID的哈希值(截断并转为整数)作为Qdrant Point ID
# 注意:哈希冲突是可能的,尽管对于截断的MD5来说概率较低。
# 更健壮的方案是使用UUID,或者在插件内部维护一个 original_data_id到数字ID的映射。
qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16] # 取前16位十六进制字符
qdrant_point_id = int(qdrant_point_id_hex, 16) # 转换为整数
points_to_upsert = [
models.PointStruct(
id=qdrant_point_id, # Point 的唯一ID,必须是整数或UUID
vector=embedding_vector,
payload={
"original_id": original_data_id, # 将原始ID存储在payload中,便于关联
**payload_data # 将你的其他元数据展开到payload中
# 例如: "text_content": "这是原始内容...", "source": "doc1.txt"
}
)
]
response = await qdrant_client.upsert(
collection_name=collection_name,
points=points_to_upsert,
wait=True # 可以设置为 True 以等待操作完成并确认
)
core.logger.info(f"成功存储数据到Qdrant, Original ID: {original_data_id}, Qdrant Point ID: {qdrant_point_id}. Status: {response.status}")
except Exception as e:
core.logger.error(f"存储数据到Qdrant失败 (Original ID: {original_data_id}): {e}")
raise
# 使用示例:
# async def example_usage_store():
# # qdrant_client, collection_name, vector, plugin_config 需已定义
# my_payload = {"description": "这是一个示例文档", "category": "test"}
# original_id = "doc_abc_123"
# # vector = await generate_text_embedding("示例文档内容", plugin_config) # 生成向量
# # await store_vector_data(qdrant_client, collection_name, original_id, vector, my_payload)
# pass
说明:
models.PointStruct
: 用于定义一个数据点。id
: 点的唯一标识符。Qdrant 要求 ID 是一个整数 (uint64) 或一个 UUID 字符串。 如果你的内部 ID 是其他类型的字符串,你需要将其转换为 Qdrant 兼容的 ID(例如,通过哈希后取一部分转为整数,或者使用 UUID)。emotion.py
中是将一个十六进制字符串ID转换为整数:int(emotion_id, 16)
。上面的示例使用hashlib
生成一个整数ID。vector
: 数据的嵌入向量。payload
: 一个字典,用于存储与向量相关的任意元数据。这些元数据可以在搜索时被过滤或返回。建议将你的原始数据ID也存储在 payload 中,这样即使 Qdrant 的 Point ID 与你的内部 ID 不同,你也可以通过 payload 找回原始关联。
qdrant_client.upsert()
: 用于插入或更新数据点。如果具有相同id
的点已存在,它将被更新;否则,将创建一个新点。wait=True
: 可选参数,如果设为True
,操作会等待 Qdrant 确认写入完成。
7. 从 Qdrant 搜索数据
向量数据库的核心功能是根据查询向量搜索相似的条目。
from typing import List, Optional # 确保导入
from qdrant_client import QdrantClient, models
async def search_similar_vectors(
qdrant_client: QdrantClient,
collection_name: str,
query_vector: List[float],
top_k: int = 5, # 返回最相似的 top_k 个结果
score_threshold: Optional[float] = None # 可选的相似度得分阈值
) -> List[models.ScoredPoint]:
"""在Qdrant中搜索与查询向量相似的条目"""
try:
search_results = await qdrant_client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=top_k,
score_threshold=score_threshold, # 例如,只返回相似度高于0.75的结果
with_payload=True # 设置为 True 以在结果中包含 payload 数据
)
core.logger.info(f"Qdrant 搜索到 {len(search_results)} 个结果。")
# 每个 search_result 是一个 ScoredPoint 对象,包含 id, score, payload, vector (如果请求)
# for hit in search_results:
# core.logger.debug(f"Found point: {hit.id}, score: {hit.score}, payload: {hit.payload}")
return search_results
except Exception as e:
core.logger.error(f"Qdrant 搜索失败: {e}")
raise
# 使用示例:
# async def example_usage_search():
# # qdrant_client, collection_name, plugin_config 需已定义
# query_text = "我应该如何配置插件?"
# # query_embedding_vector = await generate_text_embedding(query_text, plugin_config)
# # results = await search_similar_vectors(qdrant_client, collection_name, query_embedding_vector, top_k=3)
# # for res in results:
# # original_id = res.payload.get("original_id") if res.payload else None
# # print(f"相似条目原始ID: {original_id}, 描述: {res.payload.get('description')}, 相似度: {res.score}")
# pass
说明:
qdrant_client.search()
: 执行向量搜索。query_vector
: 用于查询的嵌入向量。limit
: 返回最相似结果的数量。score_threshold
: (可选) 一个浮点数,只有相似度得分高于此阈值的结果才会被返回。得分的范围和意义取决于创建集合时选择的距离度量 (Distance
)。对于COSINE
,得分范围通常是 0 到 1 (或 -1 到 1,取决于实现,Qdrant 的余弦相似度是越高越好,接近1表示非常相似)。with_payload=True
: 如果为True
,搜索结果将包含每个点的payload
数据。with_vector=True
: (可选) 如果为True
,搜索结果将包含每个点的向量本身。
- 返回结果是
models.ScoredPoint
对象的列表,每个对象包含id
,score
,payload
, 和vector
(如果请求的话)。
8. 从 Qdrant 删除数据
如果需要从集合中移除某些数据点,可以使用 delete
方法。
from typing import List # 确保导入
from qdrant_client import QdrantClient, models
import hashlib # 用于示例ID转换
async def delete_vector_data_by_original_ids(
qdrant_client: QdrantClient,
collection_name: str,
original_ids_to_delete: List[str] # 需要删除的数据的 原始ID 列表
):
"""从Qdrant中删除指定原始ID对应的数据点"""
if not original_ids_to_delete:
core.logger.info("没有提供需要删除的原始ID,跳过删除操作。")
return
# 将原始ID转换为Qdrant Point ID
qdrant_point_ids_to_delete: List[int] = []
for original_id in original_ids_to_delete:
qdrant_point_id_hex = hashlib.md5(original_id.encode()).hexdigest()[:16]
qdrant_point_ids_to_delete.append(int(qdrant_point_id_hex, 16))
if not qdrant_point_ids_to_delete:
core.logger.info("没有有效的Qdrant Point ID进行删除。")
return
try:
response = await qdrant_client.delete(
collection_name=collection_name,
points_selector=models.PointIdsList(points=qdrant_point_ids_to_delete),
wait=True
)
core.logger.info(f"尝试从Qdrant删除 {len(qdrant_point_ids_to_delete)} 个数据点 (基于原始ID列表). Status: {response.status}")
except Exception as e:
core.logger.error(f"从Qdrant删除数据失败: {e}")
raise
# 使用示例:
# async def example_usage_delete():
# # qdrant_client, collection_name 需已定义
# original_ids_to_remove = ["doc_abc_123", "doc_xyz_789"]
# # await delete_vector_data_by_original_ids(qdrant_client, collection_name, original_ids_to_remove)
# pass
说明:
qdrant_client.delete()
: 删除数据点。points_selector
: 用于指定要删除哪些点。最常用的是models.PointIdsList(points=[id1, id2,...])
,其中points
是一个包含 Qdrant Point ID (整数或UUID) 的列表。在上面的示例中,我们根据原始ID列表生成了对应的 Qdrant Point ID 列表。- 也可以使用过滤器 (
models.Filter()
) 来删除满足特定 payload条件的数据点(例如,payload={"original_id": "some_id"}
)。
通过上述方法,你的 Nekro Agent 插件可以有效地利用 Qdrant 向量数据库进行高级的语义数据处理和检索,极大地增强 AI 的能力和用户体验。记得在实际开发中仔细处理错误和边界情况。