ベクトルデータベースの使用(Qdrant)
Nekro AgentはQdrantベクトルデータベースと統合し、プラグインがそのクライアントに便利にアクセスできるようにします。これにより、プラグインはベクトル類似性検索を活用して強力なセマンティック理解、情報検索、コンテンツ推奨などの機能を実装でき、ベクトルデータベースインスタンスを自分で構築・管理する必要がありません。
ベクトルデータベースとは?
ベクトルデータベースは、高次元ベクトルデータの保存、管理、検索に特化しています。これらのベクトルは通常、深層学習モデル(Transformerモデルなど)によって生成されるテキスト、画像、音声、またはその他のデータの数学的表現(埋め込みと呼ばれる)です。
ベクトル間の距離や類似性(コサイン類似度、ユークリッド距離など)を比較することで、ベクトルデータベースは指定されたクエリベクトルに最も類似したエントリを迅速に見つけ出し、以下を可能にします:
- セマンティック検索: キーワードマッチングだけでなく、クエリのセマンティックな意味を理解して検索します。
- 推奨システム: ユーザーが好んだコンテンツのベクトルに基づいて類似コンテンツを推奨します。
- 質問応答システム: 質問とドキュメントのチャンクをベクトルに変換し、最も関連性の高いドキュメントチャンクを回答ソースとして見つけます。
- 異常検出、クラスタリングなど。
Nekro AgentでのQdrant統合
Nekro Agentはコアサービス内でQdrantクライアントインスタンスを初期化・管理します。プラグインはnekro_agent.api.coreモジュールを通じてこのクライアントにアクセスし、Qdrantサーバーと対話できます。
1. Qdrantクライアントの取得
プラグインコード内(通常はサンドボックスメソッドまたは初期化メソッド内)で、次のようにQdrantクライアントを取得できます:
from typing import Optional
from nekro_agent.api import core
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct, Distance, VectorParams
async def get_my_qdrant_client() -> QdrantClient:
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.error("Qdrantクライアントインスタンスを取得できません!Agentコア設定を確認してください。")
# プラグインロジックに基づいて例外を発生させるかNoneを返すかを決定できます
raise ConnectionError("Qdrantクライアントが利用できません。")
return qdrant_clientcore.get_qdrant_client()は設定されたQdrantClientインスタンスを返す非同期関数で、Qdrantが正しく設定されていないか利用できない場合はNoneを返します。そのため、使用前に確認することが重要です。
また、core.get_qdrant_config()を通じてQdrant接続設定情報(ホスト、ポート、api_keyなど)を取得することもできますが、これは通常Agentコアによって管理され、プラグイン開発者が直接アクセスする必要はほとんどありません。
2. プラグイン固有のコレクション名
異なるプラグインがQdrantで同じコレクション名を使用することによる競合を避けるため、Nekro Agentはヘルパーメソッドplugin.get_vector_collection_name()を提供し、プラグイン用の一意で標準化されたコレクション名を生成します。
# pluginがあなたのNekroPluginインスタンスであると仮定
# from nekro_agent.api.plugin import NekroPlugin
# plugin = NekroPlugin(...)
# プラグインの作者とモジュール名に基づいてデフォルトのコレクション名を生成
default_collection_name = plugin.get_vector_collection_name()
# 出力例(プラグイン作者"MyAuthor"、モジュール名"MyPlugin"の場合):
# "MyAuthor.MyPlugin"
# プラグイン内で特定の目的に使用する接尾辞付きのコレクション名を生成
specific_collection_name = plugin.get_vector_collection_name("user_documents")
# 出力例: "MyAuthor.MyPlugin-documents"プラグインはQdrantでこのメソッドを通じて生成されたコレクション名を作成・使用することが強く推奨されます。これにより、適切な分離が確保され、名前の競合が回避されます。
3. 初期化時のコレクションチェックと作成
プラグイン初期化中、必要なQdrantコレクションが存在するかを確認し、存在しない場合は作成する必要があることがよくあります。これにより、プラグインは後続の操作でコレクションに正しくアクセスできます。
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin # プラグインインスタンスを仮定
from qdrant_client import QdrantClient, models
from typing import Optional # Optionalがインポートされていることを確認
# pluginがあなたのNekroPluginインスタンスであると仮定
# plugin = NekroPlugin(name="MySamplePlugin", module_name="my_sample", author="MyAuthor", ...)
# EMBEDDING_DIMENSION = 1024 # 埋め込みベクトルの次元を仮定
async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
"""プラグインのベクトルデータベースコレクションを初期化"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.warning(f"[{plugin_instance.name}] Qdrantクライアントを取得できません、ベクトルデータベースコレクションの初期化をスキップします")
return
collection_name = plugin_instance.get_vector_collection_name()
try:
collections_response = await qdrant_client.get_collections()
existing_collections = [col.name for col in collections_response.collections]
if collection_name not in existing_collections:
core.logger.info(f"[{plugin_instance.name}] ベクトルデータベースコレクションを作成中: {collection_name}")
await qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=embedding_dimension, # ベクトル次元、埋め込みモデルと一致させる必要があります
distance=models.Distance.COSINE, # 距離メトリック、テキストにはCOSINEが一般的に使用されます
),
)
core.logger.success(f"[{plugin_instance.name}] ベクトルデータベースコレクション {collection_name} の作成に成功しました")
else:
core.logger.info(f"[{plugin_instance.name}] ベクトルデータベースコレクション {collection_name} は既に存在します")
except Exception as e:
core.logger.error(f"[{plugin_instance.name}] ベクトルデータベースコレクション {collection_name} の初期化に失敗しました: {e}")
# プラグイン初期化ロジックで呼び出し(例: @plugin.mount_init_method())
# async def on_plugin_init():
# # pluginはあなたのプラグインインスタンスである必要があります
# # MY_PLUGIN_EMBEDDING_DIMENSIONはプラグインで設定された次元です
# await init_vector_db_collection(plugin, MY_PLUGIN_EMBEDDING_DIMENSION)説明:
plugin_instance.get_vector_collection_name(): プラグイン固有のコレクション名を取得します。qdrant_client.get_collections(): Qdrant内のすべてのコレクションのリストを取得します。qdrant_client.create_collection(): 新しいコレクションを作成します。collection_name: コレクションの名前。vectors_config: ベクトルパラメータを定義します。size: ベクトルの次元で、埋め込みモデルが生成するベクトル次元と一致させる必要があります。distance: ベクトル類似性を計算するための距離関数で、一般的なオプションにはDistance.COSINE(コサイン類似度)、Distance.EUCLID(ユークリッド距離)、Distance.DOT(内積)があります。テキスト埋め込みの場合、コサイン類似度が通常最適な選択です。
4. 埋め込みモデルと次元の設定
テキストやその他のデータをベクトルに変換するには、埋め込みモデルを使用する必要があります。Nekro Agentでは、プラグイン設定で埋め込みモデルグループとその次元を指定できます。
プラグイン設定クラス(nekro_agent.api.plugin.ConfigBaseを継承)に次のフィールドを追加します:
from pydantic import Field
from nekro_agent.api.plugin import ConfigBase
class MyPluginConfig(ConfigBase):
EMBEDDING_MODEL_GROUP: str = Field(
default="text-embedding", # デフォルトモデルグループ名、実際の状況に応じて変更
title="埋め込みモデルグループ",
description="テキスト埋め込みベクトルを生成するために使用されるモデルグループの名前。",
json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
)
EMBEDDING_DIMENSION: int = Field(
default=1024, # デフォルト次元、選択したモデルグループの実際の出力次元に応じて変更
title="埋め込みベクトル次元",
description="埋め込みモデルが出力するベクトル次元。",
)
# ... その他の設定項目説明:
EMBEDDING_MODEL_GROUP: Nekro Agentコア設定で定義されたモデルグループ名を指定するための文字列フィールド。json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"}: このメタデータは、Nekro Agent管理インターフェースでモデルグループ選択ドロップダウンを提供し、これが埋め込みタイプのモデルであることを示すために使用されます。
EMBEDDING_DIMENSION: 埋め込みモデルの出力ベクトル次元を表す整数フィールド。この値はinit_vector_db_collectionのVectorParamsのsizeパラメータと実際のモデル出力次元と厳密に一致させる必要があります。そうでない場合、エラーが発生します。
設定の取得:
# pluginがあなたのNekroPluginインスタンスであると仮定
# my_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
# embedding_model_group_name = my_config.EMBEDDING_MODEL_GROUP
# embedding_dimension = my_config.EMBEDDING_DIMENSION5. テキスト埋め込みの生成
テキストを取得した後、対応する埋め込みモデルサービスを呼び出してベクトルに変換する必要があります。Nekro Agentは、モデルグループを通じて設定されたOpenAI互換の埋め込みインターフェースを呼び出すためのツールを提供します。
from typing import List, Optional # インポートを確認
from nekro_agent.api import core
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings # ヘルパー関数をインポート
# MyPluginConfigがあなたのプラグイン設定クラスであると仮定
# plugin_config: MyPluginConfig = plugin.get_config(MyPluginConfig)
async def generate_text_embedding(text_to_embed: str, plugin_config: MyPluginConfig) -> List[float]:
"""
設定された埋め込みモデルを使用してテキストの埋め込みベクトルを生成します。
"""
try:
# 1. モデルグループ設定情報を取得
model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
if not model_group_info:
core.logger.error(f"'{plugin_config.EMBEDDING_MODEL_GROUP}'という名前のモデルグループ設定が見つかりません。")
raise ValueError(f"埋め込みモデルグループ '{plugin_config.EMBEDDING_MODEL_GROUP}' が見つかりません。")
# 2. 埋め込みモデルを呼び出してベクトルを生成
# 注意: gen_openai_embeddingsの`model`パラメータは通常、モデルグループ内の特定のチャット/補完モデル名ですが、
# API呼び出しはリクエストタイプ(ここでは埋め込み)に基づいて埋め込みエンドポイントにルーティングされます。
# `dimensions`パラメータはオプションで、モデルがサポートしていて出力次元を指定したい場合に使用します(text-embedding-3-smallなどの一部のモデルでサポート)。
# 不確かな場合は、まずdimensionsを省略するか、EMBEDDING_DIMENSIONと一致することを確認してください。
embedding_vector: List[float] = await gen_openai_embeddings(
model=model_group_info.CHAT_MODEL, # またはモデルグループ内のAPI情報取得に適したモデル名
input=text_to_embed,
api_key=model_group_info.API_KEY,
base_url=model_group_info.BASE_URL,
dimensions=plugin_config.EMBEDDING_DIMENSION # 次元を明示的に指定
)
vector_dimension = len(embedding_vector)
core.logger.debug(f"埋め込みベクトルを生成: '{text_to_embed[:20]}...', 次元: {vector_dimension}")
# 3. 次元が設定と一致することを確認
if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
core.logger.error(
f"埋め込みベクトル次元が一致しません!設定された次元: {plugin_config.EMBEDDING_DIMENSION}, "
f"実際の次元: {vector_dimension}. "
f"プラグイン設定EMBEDDING_DIMENSIONまたはモデル設定を確認してください。"
)
# 状況に応じて、ここで例外を発生させるか、他のエラーハンドリングを実行する必要があります
raise ValueError(f"埋め込み次元が一致しません。")
return embedding_vector
except Exception as e:
core.logger.error(f"テキスト埋め込みの生成に失敗しました: {e}")
raise # または処理して特定のタイプの例外を再発生させる
# 使用例:
# async def some_function():
# # plugin_configは読み込まれたプラグイン設定インスタンスである必要があります
# my_text = "ベクトル化する必要があるテキストです"
# try:
# vector = await generate_text_embedding(my_text, plugin_config)
# # 次に、ベクトルをQdrantに保存できます
# except Exception as e:
# core.logger.error(f"テキスト '{my_text}' の処理に失敗しました: {e}")説明:
core_config.get_model_group_info(): モデルグループ名に基づいてAPI KeyとBase URLを含む詳細な設定を取得します。gen_openai_embeddings(): これはNekro Agentが提供するOpenAI形式の埋め込みAPIを呼び出すためのヘルパー関数です。model: 通常、モデルグループ内のモデル名。APIサービスはリクエストタイプ(ここでは埋め込み)に基づいて正しいエンドポイントにルーティングします。input: 埋め込み対象のテキスト。dimensions: (オプション)一部のモデルは出力次元の指定をサポートしています。この値がEMBEDDING_DIMENSION設定とQdrantコレクションの次元と一致することを確認してください。
- 次元検証: 埋め込みを生成した後、返されたベクトルの次元がプラグイン設定の
EMBEDDING_DIMENSION(これはQdrantコレクションが期待する次元でもあります)と一致することを確認してください。不一致は保存失敗や不正確な検索結果を引き起こします。
6. Qdrantへのデータ保存
テキストのベクトル表現を取得したら、他のメタデータと一緒にQdrantに保存できます。Qdrantの各保存単位は「ポイント」と呼ばれます。
from typing import List, Dict, Any, Optional # インポートを確認
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct # インポートを確認
import hashlib # 例のID変換に使用
# qdrant_clientが取得したクライアントインスタンスであると仮定
# collection_nameはあなたのプラグインコレクション名
# embedding_vectorは前のステップで生成されたベクトル
# original_data_idはプラグイン内でのデータの一意の識別子(UUIDやデータベース主キーなど)
async def store_vector_data(
qdrant_client: QdrantClient,
collection_name: str,
original_data_id: str, # プラグイン内でのデータの一意のID
embedding_vector: List[float],
payload_data: Dict[str, Any] # ベクトルと一緒に保存するその他のメタデータ
):
"""ベクトルとメタデータをQdrantに保存"""
try:
# QdrantのポイントIDは数値またはUUIDである必要があります。
# original_data_idが文字列(UUID形式以外)の場合、数値に変換する必要があります。
# 例: 元のIDのハッシュ値(切り捨てて整数に変換)をQdrantポイントIDとして使用
# 注意: ハッシュの競合は可能ですが、切り捨てられたMD5では確率は低いです。
# より堅牢なアプローチはUUIDを使用するか、プラグイン内でoriginal_data_idから数値IDへのマッピングを維持することです。
qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16] # 最初の16進文字を取得
qdrant_point_id = int(qdrant_point_id_hex, 16) # 整数に変換
points_to_upsert = [
models.PointStruct(
id=qdrant_point_id, # ポイントの一意のID、整数またはUUIDである必要があります
vector=embedding_vector,
payload={
"original_id": original_data_id, # 関連付けのために元のIDをペイロードに保存
**payload_data # その他のメタデータをペイロードに展開
# 例: "text_content": "これは元のコンテンツです...", "source": "doc1.txt"
}
)
]
response = await qdrant_client.upsert(
collection_name=collection_name,
points=points_to_upsert,
wait=True # 操作完了と確認を待機するためにTrueに設定できます
)
core.logger.info(f"Qdrantへのデータ保存に成功しました、元のID: {original_data_id}, QdrantポイントID: {qdrant_point_id}. ステータス: {response.status}")
except Exception as e:
core.logger.error(f"Qdrantへのデータ保存に失敗しました(元のID: {original_data_id}): {e}")
raise
# 使用例:
# async def example_usage_store():
# # qdrant_client, collection_name, vector, plugin_configは定義されている必要があります
# my_payload = {"description": "これはサンプルドキュメントです", "category": "test"}
# original_id = "doc_abc_123"
# # vector = await generate_text_embedding("サンプルドキュメントコンテンツ", plugin_config) # ベクトルを生成
# # await store_vector_data(qdrant_client, collection_name, original_id, vector, my_payload)
# pass説明:
models.PointStruct: データポイントを定義するために使用されます。id: ポイントの一意の識別子。QdrantはIDが整数(uint64)またはUUID文字列であることを要求します。 内部IDが他のタイプの文字列の場合、Qdrant互換のIDに変換する必要があります(例えば、ハッシュ化してその一部を整数として使用するか、UUIDを使用)。emotion.pyでは、16進文字列IDが整数に変換されています:int(emotion_id, 16)。上記の例ではhashlibを使用して整数IDを生成しています。vector: データの埋め込みベクトル。payload: ベクトルに関連付けられた任意のメタデータを保存するための辞書。このメタデータは検索中にフィルタリングまたは返すことができます。元のデータIDもペイロードに保存することをお勧めします。これにより、QdrantポイントIDが内部IDと異なる場合でも、ペイロードを通じて元の関連付けを取得できます。
qdrant_client.upsert(): データポイントの挿入または更新に使用されます。同じidを持つポイントが既に存在する場合、更新されます。それ以外の場合、新しいポイントが作成されます。wait=True: オプションパラメータで、Trueに設定すると、操作はQdrantが書き込み完了を確認するまで待機します。
7. Qdrantからのデータ検索
ベクトルデータベースのコア機能は、クエリベクトルに基づいて類似エントリを検索することです。
from typing import List, Optional # インポートを確認
from qdrant_client import QdrantClient, models
async def search_similar_vectors(
qdrant_client: QdrantClient,
collection_name: str,
query_vector: List[float],
top_k: int = 5, # 上位k個の最も類似した結果を返す
score_threshold: Optional[float] = None # オプションの類似性スコアしきい値
) -> List[models.ScoredPoint]:
"""Qdrantでクエリベクトルに類似したエントリを検索"""
try:
search_results = await qdrant_client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=top_k,
score_threshold=score_threshold, # 例えば、類似性が0.75より高い結果のみを返す
with_payload=True # 結果にペイロードデータを含めるためにTrueに設定
)
core.logger.info(f"Qdrantが{len(search_results)}個の結果を見つけました。")
# 各search_resultはScoredPointオブジェクトで、id、score、payload、vector(要求された場合)を含みます
# for hit in search_results:
# core.logger.debug(f"見つかったポイント: {hit.id}, スコア: {hit.score}, ペイロード: {hit.payload}")
return search_results
except Exception as e:
core.logger.error(f"Qdrant検索に失敗しました: {e}")
raise
# 使用例:
# async def example_usage_search():
# # qdrant_client, collection_name, plugin_configは定義されている必要があります
# query_text = "プラグインをどのように設定すればよいですか?"
# try:
# # query_vector = await generate_text_embedding(query_text, plugin_config)
# # results = await search_similar_vectors(qdrant_client, collection_name, query_vector)
# # for result in results:
# # # result.payloadから元のデータIDを取得
# # original_id = result.payload.get("original_id")
# # core.logger.info(f"見つかったドキュメントID: {original_id}, 類似性スコア: {result.score}")
# except Exception as e:
# core.logger.error(f"検索に失敗しました: {e}")説明:
qdrant_client.search(): 指定されたクエリベクトルに最も類似したベクトルを検索します。query_vector: 検索対象のベクトル。limit: 返す結果の最大数。score_threshold: (オプション)類似性スコアのしきい値。このスコアより低い結果はフィルタリングされます。with_payload:Trueに設定すると、結果にペイロードデータが含まれます。
ScoredPoint: 各検索結果はScoredPointオブジェクトで、以下を含みます:id: ポイントID。score: 類似性スコア(0〜1の範囲で、1が最も類似)。payload: 保存されたメタデータ辞書。vector: (要求された場合)ベクトル自体。
8. 完全な使用例
以下は、プラグインでQdrantを使用する完全なワークフローの例です。初期化、埋め込み生成、データ保存、類似コンテンツの検索を含みます。
from typing import List, Dict, Any, Optional
from nekro_agent.api import core
from nekro_agent.api.plugin import NekroPlugin, ConfigBase, SandboxMethodType
from nekro_agent.api.schemas import AgentCtx
from pydantic import Field
from qdrant_client import QdrantClient, models
from qdrant_client.http.models import PointStruct
import hashlib
# プラグイン設定
class MyPluginConfig(ConfigBase):
EMBEDDING_MODEL_GROUP: str = Field(
default="text-embedding",
title="埋め込みモデルグループ",
description="テキスト埋め込みベクトルを生成するために使用されるモデルグループの名前。",
json_schema_extra={"ref_model_groups": True, "required": True, "model_type": "embedding"},
)
EMBEDDING_DIMENSION: int = Field(
default=1024,
title="埋め込みベクトル次元",
description="埋め込みモデルが出力するベクトル次元。",
)
# プラグインインスタンス
plugin = NekroPlugin(
name="Document Search Plugin",
module_name="doc_search",
description="ドキュメント検索とセマンティック理解のためのプラグイン",
author="MyAuthor",
version="1.0.0"
)
# プラグイン初期化
@plugin.mount_init_method()
async def on_plugin_init():
"""プラグイン初期化時にベクトルデータベースコレクションを準備"""
plugin_config = plugin.get_config(MyPluginConfig)
await init_vector_db_collection(plugin, plugin_config.EMBEDDING_DIMENSION)
# ドキュメントを追加するサンドボックスメソッド
@plugin.mount_sandbox_method(
method_type=SandboxMethodType.TOOL,
name="add_document",
description="検索可能なドキュメントをベクトルデータベースに追加します"
)
async def add_document(_ctx: AgentCtx, doc_id: str, title: str, content: str) -> str:
"""ドキュメントをベクトルデータベースに追加"""
try:
# 設定を取得
plugin_config = plugin.get_config(MyPluginConfig)
# Qdrantクライアントを取得
qdrant_client = await core.get_qdrant_client()
if not qdrant_client:
return "エラー: Qdrantクライアントを取得できません"
# テキストから埋め込みベクトルを生成
combined_text = f"{title}\n\n{content}"
embedding_vector = await generate_text_embedding(combined_text, plugin_config)
# メタデータを準備
payload = {
"title": title,
"content": content,
"added_at": str(_ctx.get_current_time()) # 現在時刻を取得
}
# Qdrantに保存
collection_name = plugin.get_vector_collection_name()
await store_vector_data(qdrant_client, collection_name, doc_id, embedding_vector, payload)
return f"ドキュメント '{title}'(ID: {doc_id})が正常に追加されました"
except Exception as e:
core.logger.error(f"ドキュメント追加エラー: {e}")
return f"ドキュメント追加に失敗しました: {e}"
# ドキュメントを検索するサンドボックスメソッド
@plugin.mount_sandbox_method(
method_type=SandboxMethodType.TOOL,
name="search_documents",
description="クエリに基づいて関連ドキュメントを検索します"
)
async def search_documents(_ctx: AgentCtx, query: str, limit: int = 5) -> str:
"""クエリに基づいて関連ドキュメントを検索"""
try:
# 設定を取得
plugin_config = plugin.get_config(MyPluginConfig)
# Qdrantクライアントを取得
qdrant_client = await core.get_qdrant_client()
if not qdrant_client:
return "エラー: Qdrantクライアントを取得できません"
# クエリから埋め込みベクトルを生成
query_vector = await generate_text_embedding(query, plugin_config)
# 類似ドキュメントを検索
collection_name = plugin.get_vector_collection_name()
search_results = await search_similar_vectors(
qdrant_client,
collection_name,
query_vector,
top_k=limit,
score_threshold=0.7 # 類似性スコアが0.7以上の結果のみ
)
if not search_results:
return "関連ドキュメントが見つかりませんでした"
# 結果を整形
result_text = f"クエリ '{query}' に対して {len(search_results)} 件の関連ドキュメントが見つかりました:\n\n"
for i, result in enumerate(search_results, 1):
payload = result.payload
title = payload.get("title", "不明なタイトル")
content = payload.get("content", "")
score = result.score
# コンテンツを短く切り詰め
content_preview = content[:200] + "..." if len(content) > 200 else content
result_text += f"{i}. **{title}** (類似性: {score:.2f})\n"
result_text += f" {content_preview}\n\n"
return result_text
except Exception as e:
core.logger.error(f"ドキュメント検索エラー: {e}")
return f"ドキュメント検索に失敗しました: {e}"
# 以下は上記で定義されたヘルパー関数の実装(簡略化版)
async def init_vector_db_collection(plugin_instance: NekroPlugin, embedding_dimension: int):
"""プラグインのベクトルデータベースコレクションを初期化"""
qdrant_client: Optional[QdrantClient] = await core.get_qdrant_client()
if not qdrant_client:
core.logger.warning(f"[{plugin_instance.name}] Qdrantクライアントを取得できません、ベクトルデータベースコレクションの初期化をスキップします")
return
collection_name = plugin_instance.get_vector_collection_name()
try:
collections_response = await qdrant_client.get_collections()
existing_collections = [col.name for col in collections_response.collections]
if collection_name not in existing_collections:
core.logger.info(f"[{plugin_instance.name}] ベクトルデータベースコレクションを作成中: {collection_name}")
await qdrant_client.create_collection(
collection_name=collection_name,
vectors_config=models.VectorParams(
size=embedding_dimension,
distance=models.Distance.COSINE,
),
)
core.logger.success(f"[{plugin_instance.name}] ベクトルデータベースコレクション {collection_name} の作成に成功しました")
except Exception as e:
core.logger.error(f"[{plugin_instance.name}] ベクトルデータベースコレクション {collection_name} の初期化に失敗しました: {e}")
async def generate_text_embedding(text_to_embed: str, plugin_config: MyPluginConfig) -> List[float]:
"""設定された埋め込みモデルを使用してテキストの埋め込みベクトルを生成します。"""
from nekro_agent.api.core import config as core_config, ModelConfigGroup
from nekro_agent.services.agent.openai import gen_openai_embeddings
try:
model_group_info: Optional[ModelConfigGroup] = core_config.get_model_group_info(plugin_config.EMBEDDING_MODEL_GROUP)
if not model_group_info:
core.logger.error(f"'{plugin_config.EMBEDDING_MODEL_GROUP}'という名前のモデルグループ設定が見つかりません。")
raise ValueError(f"埋め込みモデルグループ '{plugin_config.EMBEDDING_MODEL_GROUP}' が見つかりません。")
embedding_vector: List[float] = await gen_openai_embeddings(
model=model_group_info.CHAT_MODEL,
input=text_to_embed,
api_key=model_group_info.API_KEY,
base_url=model_group_info.BASE_URL,
dimensions=plugin_config.EMBEDDING_DIMENSION
)
vector_dimension = len(embedding_vector)
if vector_dimension != plugin_config.EMBEDDING_DIMENSION:
core.logger.error(
f"埋め込みベクトル次元が一致しません!設定された次元: {plugin_config.EMBEDDING_DIMENSION}, "
f"実際の次元: {vector_dimension}."
)
raise ValueError(f"埋め込み次元が一致しません。")
return embedding_vector
except Exception as e:
core.logger.error(f"テキスト埋め込みの生成に失敗しました: {e}")
raise
async def store_vector_data(
qdrant_client: QdrantClient,
collection_name: str,
original_data_id: str,
embedding_vector: List[float],
payload_data: Dict[str, Any]
):
"""ベクトルとメタデータをQdrantに保存"""
try:
qdrant_point_id_hex = hashlib.md5(original_data_id.encode()).hexdigest()[:16]
qdrant_point_id = int(qdrant_point_id_hex, 16)
points_to_upsert = [
models.PointStruct(
id=qdrant_point_id,
vector=embedding_vector,
payload={
"original_id": original_data_id,
**payload_data
}
)
]
response = await qdrant_client.upsert(
collection_name=collection_name,
points=points_to_upsert,
wait=True
)
core.logger.info(f"Qdrantへのデータ保存に成功しました、元のID: {original_data_id}, QdrantポイントID: {qdrant_point_id}. ステータス: {response.status}")
except Exception as e:
core.logger.error(f"Qdrantへのデータ保存に失敗しました(元のID: {original_data_id}): {e}")
raise
async def search_similar_vectors(
qdrant_client: QdrantClient,
collection_name: str,
query_vector: List[float],
top_k: int = 5,
score_threshold: Optional[float] = None
) -> List[models.ScoredPoint]:
"""Qdrantでクエリベクトルに類似したエントリを検索"""
try:
search_results = await qdrant_client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=top_k,
score_threshold=score_threshold,
with_payload=True
)
core.logger.info(f"Qdrantが{len(search_results)}個の結果を見つけました。")
return search_results
except Exception as e:
core.logger.error(f"Qdrant検索に失敗しました: {e}")
raiseこの例は、プラグインでQdrantを使用する完全なワークフローを示しています。初期化、埋め込み生成、データ保存、類似コンテンツの検索を含みます。
