Skip to content

使用向量数据库 (Qdrant)

Nekro Agent 集成了 Qdrant 向量数据库,并为插件提供了访问其客户端的便捷方式。这使得插件能够利用向量相似性搜索来实现强大的语义理解、信息检索、内容推荐等功能,而无需自行搭建和管理向量数据库实例。

什么是向量数据库?

向量数据库专门用于存储、管理和搜索高维向量数据。这些向量通常是文本、图片、音频或其他数据的数学表示(称为嵌入, Embeddings),由深度学习模型(如 Transformer 模型)生成。

通过比较向量之间的距离或相似度(如余弦相似度、欧氏距离),向量数据库可以快速找到与给定查询向量最相似的条目,从而实现:

  • 语义搜索:不仅仅是关键词匹配,而是理解查询的语义含义进行搜索。
  • 推荐系统:根据用户已喜欢的内容向量,推荐相似的内容。
  • 问答系统:将问题和文档块转换为向量,找到与问题最相关的文档块作为答案来源。
  • 异常检测聚类等。

Nekro Agent 中的 Qdrant 集成

Nekro Agent 在其核心服务中初始化并管理了一个 Qdrant 客户端实例。插件可以通过 nekro_agent.api.core模块来获取这个客户端,并利用它与 Qdrant 服务器进行交互。

1. 获取 Qdrant 客户端

在你的插件代码中(通常是在沙盒方法或初始化方法内),你可以像这样获取 Qdrant 客户端:

python
from typing import Optional
from nekro_agent.api import core
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct, Distance, VectorParams

async def get_my_qdrant_client() -> QdrantClient:
    qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
    if not qdrant_client:
        core.logger.error("无法获取 Qdrant 客户端实例!请检查 Agent 核心配置。")
        # 可以根据插件逻辑决定是抛出异常还是返回 None
        raise ConnectionError("Qdrant client is not available.")
    return qdrant_client

core.get_qdrant_client() 是一个异步函数,它会返回一个已配置好的 QdrantClient 实例,或者在 Qdrant 未正确配置或不可用时返回 None 。因此,在使用前进行检查非常重要。

你还可以通过 core.get_qdrant_config() 获取 Qdrant 的连接配置信息(如 host, port, api_key 等),但这通常由 Agent 核心管理,插件开发者较少直接需要。

2. 插件专属的集合名称

为了避免不同插件在 Qdrant 中使用相同的集合名称造成冲突,Nekro Agent 提供了一个辅助方法 plugin.get_vector_collection_name() 来为你的插件生成一个唯一的、标准化的集合名称。

python
# 假设 plugin 是你的 NekroPlugin 实例
# from nekro_agent.api.plugin import NekroPlugin
# plugin = NekroPlugin(...) 

# 生成一个默认的集合名称,基于插件的作者和模块名
default_collection_name = plugin.get_vector_collection_name()
# 示例输出 (假设插件作者 "MyAuthor", 模块名 "MyPlugin"): 
# "MyAuthor.MyPlugin"

# 为插件内的特定用途生成带有后缀的集合名称
specific_collection_name = plugin.get_vector_collection_name("user_documents")
# 示例输出: "MyAuthor.MyPlugin-documents"

强烈建议插件在 Qdrant 中创建和使用通过此方法生成的集合名称,以确保良好的隔离性和避免命名冲突。

3. 初始化时检查和创建集合

在插件初始化时,通常需要检查所需的 Qdrant 集合是否存在,如果不存在则创建它。这可以确保插件在后续操作中能够正确访问集合。

python
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin # 假设的插件实例
from qdrant_client import QdrantClient, models
from typing import Optional # 确保导入 Optional

# 假设 plugin 是你的 NekroPlugin 实例
# plugin = NekroPlugin(name="MySamplePlugin", module_name="my_sample", author="MyAuthor", ...)
# EMBEDDING_DIMENSION = 1024 # 假设你的嵌入向量维度

async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
    """初始化插件的向量数据库集合"""
    qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
    if not qdrant_client:
        core.logger.warning(f"[{plugin_instance.name}] 无法获取Qdrant客户端,跳过向量数据库集合初始化")
        return

    collection_name = plugin_instance.get_vector_collection_name()

    try:
        collections_response = await qdrant_client.get_collections()
        existing_collections = [col.name for col in collections_response.collections]

        if collection_name not in existing_collections:
            core.logger.info(f"[{plugin_instance.name}] 正在创建向量数据库集合: {collection_name}")
            await qdrant_client.create_collection(
                collection_name=collection_name,
                vectors_config=models.VectorParams(
                    size=embedding_dimension,  # 向量维度,需要与你的嵌入模型一致
                    distance=models.Distance.COSINE,  # 距离度量方式,COSINE 常用于文本
                ),
            )
            core.logger.success(f"[{plugin_instance.name}] 向量数据库集合 {collection_name} 创建成功")
        else:
            core.logger.info(f"[{plugin_instance.name}] 向量数据库集合 {collection_name} 已存在")
    except Exception as e:
        core.logger.error(f"[{plugin_instance.name}] 初始化向量数据库集合 {collection_name} 失败: {e}")

# 在你的插件初始化逻辑中调用 (例如 @plugin.mount_init_method())
# async def on_plugin_init():
#     # plugin 应该是你的插件实例
#     # MY_PLUGIN_EMBEDDING_DIMENSION 是你插件配置的维度
#     await init_vector_db_collection(plugin, MY_PLUGIN_EMBEDDING_DIMENSION)

说明:

  • plugin_instance.get_vector_collection_name(): 获取插件专属的集合名称。
  • qdrant_client.get_collections(): 获取 Qdrant 中所有集合的列表。
  • qdrant_client.create_collection(): 创建一个新的集合。
    • collection_name: 集合的名称。
    • vectors_config: 定义向量的参数。
      • size: 向量的维度,必须与你使用的嵌入模型生成的向量维度一致。
      • distance: 计算向量相似度的距离函数,常用的有 Distance.COSINE (余弦相似度), Distance.EUCLID (欧氏距离), Distance.DOT (点积)。对于文本嵌入,余弦相似度通常是最佳选择。

4. 配置嵌入模型和维度

为了将文本或其他数据转换为向量,你需要使用嵌入模型。Nekro Agent 允许你在插件配置中指定所使用的嵌入模型组及其维度。

在你的插件配置类 (继承自 nekro_agent.api.plugin.ConfigBase) 中添加如下字段:

python
from pydantic import Field
from nekro_agent.api.plugin import ConfigBase

class MyPluginConfig(ConfigBase):
    EMBEDDING_MODEL_GROUP: str = Field(
        default="text-embedding", # 默认的模型组名,请根据实际情况修改
        title="嵌入模型组",
        description="用于生成文本嵌入向量的模型组名称。",
        json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
    )
    EMBEDDING_DIMENSION: int = Field(
        default=1024, # 默认维度,请根据你选择的模型组的实际输出维度修改
        title="嵌入向量维度",
        description="嵌入模型输出的向量维度。",
    )
    # ... 其他配置项

说明:

  • EMBEDDING_MODEL_GROUP: 字符串字段,用于指定在 Nekro Agent 核心配置中定义的模型组名称。
    • json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"}: 这部分元数据用于在 Nekro Agent 的管理界面中提供模型组选择的下拉列表,并指明这是一个嵌入类型的模型。
  • EMBEDDING_DIMENSION: 整数字段,表示你的嵌入模型输出向量的维度。这个值必须与 init_vector_db_collectionVectorParamssize 参数以及实际模型输出的维度严格匹配,否则会导致错误。

获取配置:

python
# 假设 plugin 是你的 NekroPlugin 实例
# my_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
# embedding_model_group_name = my_config.EMBEDDING_MODEL_GROUP
# embedding_dimension = my_config.EMBEDDING_DIMENSION

5. 生成文本嵌入 (Embeddings)

获取到文本后,你需要调用相应的嵌入模型服务将其转换为向量。Nekro Agent 提供了工具来调用通过模型组配置的 OpenAI 兼容的嵌入接口。

python
from typing import List, Optional # 确保导入
from nekro_agent.api import core
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings # 导入辅助函数

# 假设 MyPluginConfig 是你的插件配置类
# plugin_config: MyPluginConfig = plugin.get_config(MyPluginConfig)

async def generate_text_embedding(text_to_embed: str, plugin_config: MyPluginConfig) -> List[float]:
    """
    使用配置的嵌入模型生成文本的嵌入向量。
    """
    try:
        # 1. 获取模型组配置信息
        model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
        if not model_group_info:
            core.logger.error(f"无法找到名为 '{plugin_config.EMBEDDING_MODEL_GROUP}' 的模型组配置。")
            raise ValueError(f"Embedding model group '{plugin_config.EMBEDDING_MODEL_GROUP}' not found.")

        # 2. 调用嵌入模型生成向量
        # 注意:gen_openai_embeddings 的 `model` 参数通常是模型组中的具体 chat/completion 模型名,
        # 但API调用会根据 base_url 路由到嵌入端点。
        # `dimensions` 参数是可选的,如果模型支持且你想指定输出维度(某些模型如 text-embedding-3-small 支持)。
        # 如果不确定,可以先不传 dimensions,或确保其与 EMBEDDING_DIMENSION 一致。
        embedding_vector: List[float] = await gen_openai_embeddings(
            model=model_group_info.CHAT_MODEL, # 或者模型组中适合用于获取API信息的模型名称
            input=text_to_embed,
            api_key=model_group_info.API_KEY,
            base_url=model_group_info.BASE_URL,
            dimensions=plugin_config.EMBEDDING_DIMENSION # 明确指定维度
        )

        vector_dimension = len(embedding_vector)
        core.logger.debug(f"生成嵌入向量: '{text_to_embed[:20]}...', 维度: {vector_dimension}")

        # 3. 验证维度是否与配置一致
        if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
            core.logger.error(
                f"嵌入向量维度不匹配!配置维度: {plugin_config.EMBEDDING_DIMENSION}, "
                f"实际获取维度: {vector_dimension}. "
                f"请检查插件配置 EMBEDDING_DIMENSION 或模型设置。"
            )
            # 根据实际情况,这里可能需要抛出异常或进行其他错误处理
            raise ValueError(f"Embedding dimension mismatch.")
        
        return embedding_vector
    except Exception as e:
        core.logger.error(f"生成文本嵌入失败: {e}")
        raise  # 或者处理后重新抛出特定类型的异常

# 使用示例:
# async def some_function():
#     # plugin_config 应该是你加载后的插件配置实例
#     my_text = "这是一段需要被向量化的文本"
#     try:
#         vector = await generate_text_embedding(my_text, plugin_config)
#         # 接下来可以将 vector 存入 Qdrant
#     except Exception as e:
#         core.logger.error(f"处理文本 '{my_text}' 失败: {e}")

说明:

  • core_config.get_model_group_info(): 根据模型组名称获取详细配置,包括 API Key 和 Base URL。
  • gen_openai_embeddings(): 这是 Nekro Agent 提供的一个辅助函数,用于调用 OpenAI 格式的嵌入 API。
    • model: 通常是模型组中的一个模型名称,API 服务会根据请求类型(这里是嵌入)路由到正确的端点。
    • input: 需要被嵌入的文本。
    • dimensions: (可选) 部分模型支持指定输出维度。确保这个值与你的 EMBEDDING_DIMENSION 配置和 Qdrant 集合的维度一致。
  • 维度校验:生成嵌入后,务必检查返回向量的维度是否与插件配置中的 EMBEDDING_DIMENSION 一致,这也是 Qdrant 集合所期望的维度。不一致会导致存储失败或搜索结果不准确。

6. 存储数据到 Qdrant

一旦你有了文本的向量表示,就可以将其连同其他元数据一起存储到 Qdrant 中。每个存储单元在 Qdrant 中称为一个 "Point"。

python
from typing import List, Dict, Any, Optional # 确保导入
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct # 确保导入
import hashlib # 用于示例ID转换

# 假设 qdrant_client 是已获取的客户端实例
# collection_name 是你的插件集合名称
# embedding_vector 是上一步生成的向量
# original_data_id 是你的数据在插件内部的唯一标识符,例如一个UUID或者数据库主键

async def store_vector_data(
    qdrant_client: QdrantClient,
    collection_name: str,
    original_data_id: str, # 插件内部数据的唯一ID
    embedding_vector: List[float],
    payload_data: Dict[str, Any] # 其他需要与向量一起存储的元数据
):
    """将向量和元数据存储到Qdrant"""
    try:
        # Qdrant 的 Point ID 必须是数字或UUID。
        # 如果你的 original_data_id 是字符串(非UUID格式),需要转换为数字。
        # 示例:使用原始ID的哈希值(截断并转为整数)作为Qdrant Point ID
        # 注意:哈希冲突是可能的,尽管对于截断的MD5来说概率较低。
        # 更健壮的方案是使用UUID,或者在插件内部维护一个 original_data_id到数字ID的映射。
        qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16] # 取前16位十六进制字符
        qdrant_point_id = int(qdrant_point_id_hex, 16) # 转换为整数

        points_to_upsert = [
            models.PointStruct(
                id=qdrant_point_id, # Point 的唯一ID,必须是整数或UUID
                vector=embedding_vector,
                payload={
                    "original_id": original_data_id, # 将原始ID存储在payload中,便于关联
                    **payload_data # 将你的其他元数据展开到payload中
                    # 例如: "text_content": "这是原始内容...", "source": "doc1.txt"
                }
            )
        ]

        response = await qdrant_client.upsert(
            collection_name=collection_name,
            points=points_to_upsert,
            wait=True # 可以设置为 True 以等待操作完成并确认
        )
        core.logger.info(f"成功存储数据到Qdrant, Original ID: {original_data_id}, Qdrant Point ID: {qdrant_point_id}. Status: {response.status}")
    except Exception as e:
        core.logger.error(f"存储数据到Qdrant失败 (Original ID: {original_data_id}): {e}")
        raise

# 使用示例:
# async def example_usage_store():
#     # qdrant_client, collection_name, vector, plugin_config 需已定义
#     my_payload = {"description": "这是一个示例文档", "category": "test"}
#     original_id = "doc_abc_123" 
#     # vector = await generate_text_embedding("示例文档内容", plugin_config) # 生成向量
#     # await store_vector_data(qdrant_client, collection_name, original_id, vector, my_payload)
#     pass

说明:

  • models.PointStruct: 用于定义一个数据点。
    • id: 点的唯一标识符。Qdrant 要求 ID 是一个整数 (uint64) 或一个 UUID 字符串。 如果你的内部 ID 是其他类型的字符串,你需要将其转换为 Qdrant 兼容的 ID(例如,通过哈希后取一部分转为整数,或者使用 UUID)。emotion.py 中是将一个十六进制字符串ID转换为整数:int(emotion_id, 16)。上面的示例使用 hashlib 生成一个整数ID。
    • vector: 数据的嵌入向量。
    • payload: 一个字典,用于存储与向量相关的任意元数据。这些元数据可以在搜索时被过滤或返回。建议将你的原始数据ID也存储在 payload 中,这样即使 Qdrant 的 Point ID 与你的内部 ID 不同,你也可以通过 payload 找回原始关联。
  • qdrant_client.upsert(): 用于插入或更新数据点。如果具有相同 id 的点已存在,它将被更新;否则,将创建一个新点。
    • wait=True: 可选参数,如果设为 True,操作会等待 Qdrant 确认写入完成。

7. 从 Qdrant 搜索数据

向量数据库的核心功能是根据查询向量搜索相似的条目。

python
from typing import List, Optional # 确保导入
from qdrant_client import QdrantClient, models

async def search_similar_vectors(
    qdrant_client: QdrantClient,
    collection_name: str,
    query_vector: List[float],
    top_k: int = 5, # 返回最相似的 top_k 个结果
    score_threshold: Optional[float] = None # 可选的相似度得分阈值
) -> List[models.ScoredPoint]:
    """在Qdrant中搜索与查询向量相似的条目"""
    try:
        search_results = await qdrant_client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=top_k,
            score_threshold=score_threshold, # 例如,只返回相似度高于0.75的结果
            with_payload=True # 设置为 True 以在结果中包含 payload 数据
        )
        core.logger.info(f"Qdrant 搜索到 {len(search_results)} 个结果。")
        # 每个 search_result 是一个 ScoredPoint 对象,包含 id, score, payload, vector (如果请求)
        # for hit in search_results:
        #     core.logger.debug(f"Found point: {hit.id}, score: {hit.score}, payload: {hit.payload}")
        return search_results
    except Exception as e:
        core.logger.error(f"Qdrant 搜索失败: {e}")
        raise

# 使用示例:
# async def example_usage_search():
#     # qdrant_client, collection_name, plugin_config 需已定义
#     query_text = "我应该如何配置插件?"
#     # query_embedding_vector = await generate_text_embedding(query_text, plugin_config)
#     # results = await search_similar_vectors(qdrant_client, collection_name, query_embedding_vector, top_k=3)
#     # for res in results:
#     #     original_id = res.payload.get("original_id") if res.payload else None
#     #     print(f"相似条目原始ID: {original_id}, 描述: {res.payload.get('description')}, 相似度: {res.score}")
#     pass

说明:

  • qdrant_client.search(): 执行向量搜索。
    • query_vector: 用于查询的嵌入向量。
    • limit: 返回最相似结果的数量。
    • score_threshold: (可选) 一个浮点数,只有相似度得分高于此阈值的结果才会被返回。得分的范围和意义取决于创建集合时选择的距离度量 (Distance)。对于 COSINE,得分范围通常是 0 到 1 (或 -1 到 1,取决于实现,Qdrant 的余弦相似度是越高越好,接近1表示非常相似)。
    • with_payload=True: 如果为 True,搜索结果将包含每个点的 payload 数据。
    • with_vector=True: (可选) 如果为 True,搜索结果将包含每个点的向量本身。
  • 返回结果是 models.ScoredPoint对象的列表,每个对象包含 id, score, payload, 和 vector (如果请求的话)。

8. 从 Qdrant 删除数据

如果需要从集合中移除某些数据点,可以使用 delete 方法。

python
from typing import List # 确保导入
from qdrant_client import QdrantClient, models
import hashlib # 用于示例ID转换

async def delete_vector_data_by_original_ids(
    qdrant_client: QdrantClient,
    collection_name: str,
    original_ids_to_delete: List[str] # 需要删除的数据的 原始ID 列表
):
    """从Qdrant中删除指定原始ID对应的数据点"""
    if not original_ids_to_delete:
        core.logger.info("没有提供需要删除的原始ID,跳过删除操作。")
        return

    # 将原始ID转换为Qdrant Point ID
    qdrant_point_ids_to_delete: List[int] = []
    for original_id in original_ids_to_delete:
        qdrant_point_id_hex = hashlib.md5(original_id.encode()).hexdigest()[:16]
        qdrant_point_ids_to_delete.append(int(qdrant_point_id_hex, 16))
    
    if not qdrant_point_ids_to_delete:
        core.logger.info("没有有效的Qdrant Point ID进行删除。")
        return

    try:
        response = await qdrant_client.delete(
            collection_name=collection_name,
            points_selector=models.PointIdsList(points=qdrant_point_ids_to_delete),
            wait=True
        )
        core.logger.info(f"尝试从Qdrant删除 {len(qdrant_point_ids_to_delete)} 个数据点 (基于原始ID列表). Status: {response.status}")
    except Exception as e:
        core.logger.error(f"从Qdrant删除数据失败: {e}")
        raise

# 使用示例:
# async def example_usage_delete():
#     # qdrant_client, collection_name 需已定义
#     original_ids_to_remove = ["doc_abc_123", "doc_xyz_789"]
#     # await delete_vector_data_by_original_ids(qdrant_client, collection_name, original_ids_to_remove)
#     pass

说明:

  • qdrant_client.delete(): 删除数据点。
  • points_selector: 用于指定要删除哪些点。最常用的是 models.PointIdsList(points=[id1, id2,...]),其中 points 是一个包含 Qdrant Point ID (整数或UUID) 的列表。在上面的示例中,我们根据原始ID列表生成了对应的 Qdrant Point ID 列表。
  • 也可以使用过滤器 (models.Filter()) 来删除满足特定 payload条件的数据点(例如,payload={"original_id": "some_id"})。

通过上述方法,你的 Nekro Agent 插件可以有效地利用 Qdrant 向量数据库进行高级的语义数据处理和检索,极大地增强 AI 的能力和用户体验。记得在实际开发中仔细处理错误和边界情况。