はじめに
Milvus は非常に人気のあるオープンソースのベクトルデータベースで、AI や機械学習プロジェクトで広く使われています。
ただし、運用やパフォーマンスチューニングの観点では「トレースと監視」が非常に重要になります。
本記事では、IBM Instana SaaS と Traceloop SDK を活用して、Milvus の操作をエンドツーエンドでトレースする方法を紹介します。
構成
- Milvus v2.2.13(Docker 環境)
- Traceloop SDK v0.38.0
- Instana SaaS (OTLP + Agentless モード)
- Python SDK (pymilvus) v2.5.8
1. Milvus (Docker) 環境の準備
まず、Milvus 環境を docker で立ち上げます。
docker ps
結果(例):
milvusdb/milvus:v2.2.13 → 8080ポート (REST / gRPC)
minio/minio → 9000ポート
quay.io/coreos/etcd → 2379ポート
Milvus 2.2.13 で gRPC 8080ポート が公開されています。
これに Python SDK から接続します。
2. Python 開発環境の準備
必要なライブラリをインストールします。
pip install traceloop-sdk pymilvus
バージョン固定したい場合は下記のようにしてもOK
pip install traceloop-sdk==0.38.0 pymilvus==2.5.8
3. Instana SaaS の設定(Agentless + OTLP)
Instana 側からOTLPエンドポイントを取得し、下記環境変数を設定します。
export TRACELOOP_BASE_URL="https://otlp-XXX-saas.instana.io:443"
export TRACELOOP_HEADERS="x-instana-key=YOUR_INSTANA_AGENT_KEY"
export OTEL_EXPORTER_OTLP_INSECURE=false
Agentless Modeの場合に、OTLPのACCEPTOR に送信するHTTP/gRPC (443ポート)を使う点に注意
4. トレース対応済み Python コード
以下のコードを milvus_tracing_demo.py
として保存します。
import os
import random
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from pymilvus import Collection, utility, connections, FieldSchema, CollectionSchema, DataType
# ========================
# TraceLoop 初期化
# ========================
Traceloop.init(app_name="MilvusDemoJacky")
# ========================
# Milvus操作ラッパー
# ========================
@task(name="connect_to_milvus")
def connect_to_milvus():
connections.connect(
alias="default",
host="127.0.0.1",
port="8080",
user="root",
password="4XYg2XK6sMU4UuBEjHq4EhYE8mSFO3Qq",
secure=True,
server_pem_path="./cert/ca.pem",
server_name="localhost"
)
@task(name="create_collection")
def create_collection(name):
if utility.has_collection(name):
Collection(name).drop()
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=8)
]
schema = CollectionSchema(fields, description="Demo collection with trace")
return Collection(name=name, schema=schema)
@task(name="insert_data")
def insert_data(collection, count=10):
ids = [i for i in range(count)]
vectors = [[random.random() for _ in range(8)] for _ in range(count)]
collection.insert([ids, vectors])
return ids, vectors
@task(name="create_index")
def create_index(collection):
index_params = {
"index_type": "IVF_FLAT",
"metric_type": "L2",
"params": {"nlist": 128},
}
collection.create_index(field_name="vector", index_params=index_params)
@task(name="search_data")
def search_data(collection, vector, expr=None):
collection.load()
results = collection.search(
[vector],
"vector",
param={"metric_type": "L2", "params": {"nprobe": 10}},
limit=3,
expr=expr,
output_fields=["id"]
)
# 結果をtrace対応フォーマットに変換
output = []
for hit in results[0]:
output.append({
"id": hit.id,
"distance": hit.distance
})
return output
@task(name="delete_data")
def delete_data(collection, ids_to_delete):
expr = f"id in [{','.join(str(i) for i in ids_to_delete)}]"
collection.delete(expr=expr)
@task(name="query_data")
def query_data(collection):
result = collection.query(expr="id >= 0", output_fields=["id"])
return result
@task(name="release_collection")
def release_collection(collection):
collection.release()
# ========================
# メインビジネスフロー(ワークフロー)
# ========================
@workflow(name="milvus_workflow")
def main():
connect_to_milvus()
collection_name = "demo_collection_complex"
collection = create_collection(collection_name)
ids, vectors = insert_data(collection, count=10)
create_index(collection)
# 複数回検索
for _ in range(3):
vector_to_search = random.choice(vectors)
results = search_data(collection, vector_to_search)
print("Search result:")
for result in results:
print("ID:", result["id"], "Distance:", result["distance"])
# 一部データを削除
delete_data(collection, ids[:2])
# 残りのデータをクエリする
remaining = query_data(collection)
print("Remaining records:")
for item in remaining:
print("ID:", item['id'])
release_collection(collection)
if __name__ == "__main__":
main()
5. 実行
python3 milvus_tracing_demo.py
実行結果はコンソールと Instana 両方に出力されます。
Traceloop exporting traces to https://otlp-XXX-saas.instana.io:443, authenticating with custom headers
Search result:
ID: 4 Distance: 0.0
ID: 2 Distance: 1.3049341440200806
ID: 7 Distance: 1.3092372417449951
Search result:
ID: 3 Distance: 0.0
ID: 9 Distance: 1.0531097650527954
ID: 2 Distance: 1.2092218399047852
Search result:
ID: 6 Distance: 0.0
ID: 0 Distance: 0.5008214712142944
ID: 9 Distance: 1.8369675874710083
Remaining records:
ID: 0
ID: 1
ID: 2
ID: 3
ID: 4
ID: 5
ID: 6
ID: 7
ID: 8
ID: 9
6. Instana での結果確認
Instana UIの「アプリケーション」の「サービス」で「MilvusDemoJacky」というサービスを見つけて、サービスの詳細を確認することは可能です。
サービス(呼び出し回数、エラー率、レイテンシ)
トレースビュー(呼び出しと階層構造)
- 各 task(connect, insert, search, delete など)が綺麗に trace されている
- 呼び出しツリーでワークフロー全体が可視化
コール一覧(API呼び出し全体の可視化)
- すべての操作が Instana で時系列、処理時間つきで確認可能
🎉 まとめ
Milvus のような Vector DB でも、Instana SaaS + Traceloop SDK を使うことで、
- 詳細なトレース
- エンドツーエンドの可視化
- エラーや遅延の即時検知
が可能になります。
これにより、ML/AIプロジェクトでも**本番品質のObservability(可観測性)**を簡単に実現できます。