0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[9.2] DiskBBQ: IVFでメモリ削減する大規模ベクトル検索

Last updated at Posted at 2025-12-04

はじめに

ベクトル検索の世界では、これまで HNSW を代表とするグラフベースのインデックスが広く使われており、Elasticsearchでもint8やbitの量子化を行ってきました。

こうした中で Elasticsearch がv9.2で導入したのが、IVF(Inverted File)をベースにした DiskBBQ です。DiskBBQ は名前だけ聞くと特殊な新構造に見えますが、その中身は

  • まずベクトル空間を クラスタリング(k-means など)して「センター」を作る
  • そして各センターごとに 近いベクトルのリスト(倒立リスト = Inverted File)を持つ

という、クラスタリング+倒立リスト型(IVF 型)のインデックスをディスク利用前提で設計し直したものです。

ちなみにDiskBBQという名前ですが、当然ながら計算する時はメモリは使います。それが非常に少ないというのも特徴です。

詳しくはこちらのブログを参照してください。
https://www.elastic.co/search-labs/blog/diskbbq-elasticsearch-introduction

2025/12/05: DiskBBQの利用にはEnterpriseライセンスが必要です

やってみよう

弊社コンサルタントが記載した下記のブログのプログラムを少し変えて使います。
https://qiita.com/daixque/items/419a04880af052d79fb9

変更点はざっくりと

  • DiskBBQをMappingに加える
  • インデックスにかかる時間と検索にかかる時間を出力
  • force mergeにAsyncを取り入れた
    • 検索時間などに違いを出すにはベクトルの数を増やす必要があり、実際に増やすとforce mergeがTimeoutまでに終わらないというのがわかった
  • Primary Shardの数を変えられるようにした。これはForce MergeがShardごとにシングルスレッドになってしまうので、並列処理させたかった
  • メモリのStatsは取得も表示もしない。なぜならDiskBBQのStatsがまだ実装されていないから
  • 100万ベクトルとかを一気に作るとプログラムが死んだので、バッチ処理に変更
import os
import time
import secrets

from elasticsearch import Elasticsearch
from dotenv import load_dotenv
import numpy as np
from tqdm import tqdm

# Load environment variables from .env file (ES_URL and ES_API_KEY)
load_dotenv()

TEST_SPECS = [
    ("vec_float_flat", "flat"),
    ("vec_float_hnsw", "hnsw"),
    ("vec_int8_hnsw", "int8_hnsw"),
    ("vec_bbq_hnsw", "bbq_hnsw"),
    ("vec_bbq_disk", "bbq_disk")
]

NUM_VECTORS = 1_000_000   # ← 好きな件数に
DIM = 1024
M = 32
EF_CONSTRUCTION = 100
BULK_BATCH_SIZE = 5000        # bulk あたりの件数 (ES への送信単位)
INGEST_GEN_BATCH_SIZE = 10000 # 生成単位
SEARCH_NUM = 10
TOP_K = 10


def create_index(es, index_name, index_type):
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
    body = {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 0
        },
        "mappings": {
            "properties": {
                "vector": {
                    "type": "dense_vector",
                    "dims": DIM,
                    "similarity": "cosine",
                    "element_type": "float",
                    "index_options": {
                        "type": index_type
                    }
                }
            }
        }
    }
    if index_type in ["hnsw", "int8_hnsw", "bbq_hnsw"]:
        body["mappings"]["properties"]["vector"]["index_options"].update({
            "m": M,
            "ef_construction": EF_CONSTRUCTION
        })
    es.indices.create(index=index_name, body=body)
    print(f"Created index: {index_name} with type: {index_type}")


def force_merge_and_wait(es, index_name):
    # 非同期でforcemerge開始
    response = es.indices.forcemerge(
        index=index_name,
        max_num_segments=1,
        wait_for_completion=False
    )
    task_id = response['task']
    print(f"Triggered async forcemerge, task id: {task_id}")

    # タスク完了まで待機ポーリング
    while True:
        task_status = es.tasks.get(task_id=task_id)
        if task_status.get('completed', False):
            print(f"Force merge completed for {index_name}")
            break
        else:
            print(f"Force merge ongoing for {index_name}...")
            time.sleep(2)


def ingest_vector(es, index_name, num_vectors, rng):
    """
    この関数内では必ず引数 rng から乱数を生成すること。
    np.random.rand は使わない。
    """
    print(f"Ingesting {num_vectors} vectors into {index_name}")
    ingest_start_time = time.time()

    with tqdm(total=num_vectors, desc=f"Bulk indexing to {index_name}", unit="docs") as pbar:
        current_id = 0
        while current_id < num_vectors:
            # 今回の「生成バッチ」のサイズ(最後だけ端数になる可能性あり)
            remaining = num_vectors - current_id
            gen_batch_size = min(INGEST_GEN_BATCH_SIZE, remaining)

            # 実行ごとに違う rng からベクタを生成
            # ただし同一実行内ではどのインデックスでも同じシードから生成されるので
            # doc_id → ベクトルは全インデックス共通になる
            batch_vectors = rng.random((gen_batch_size, DIM))

            # 生成された塊を更に BULK_BATCH_SIZE ずつ分割して bulk index
            for offset in range(0, gen_batch_size, BULK_BATCH_SIZE):
                start = offset
                end = min(offset + BULK_BATCH_SIZE, gen_batch_size)

                bulk_body = []
                for i in range(start, end):
                    doc_id = current_id + i  # 一意なID
                    bulk_body.append({"index": {"_index": index_name, "_id": str(doc_id)}})
                    bulk_body.append({"vector": batch_vectors[i].tolist()})

                es.bulk(body=bulk_body)
                pbar.update(end - start)

            current_id += gen_batch_size

    ingest_end_time = time.time()
    ingest_duration = ingest_end_time - ingest_start_time
    print(f"Ingestion completed in {ingest_duration:.2f} seconds.")

    es.indices.refresh(index=index_name)
    es.indices.flush(index=index_name)
    force_merge_and_wait(es, index_name)
    print(f"Completed ingestion of {num_vectors} vectors into {index_name}")
    return ingest_duration


def search_vector(es, index_name, queries, top_k=10):
    print(f"Searching {len(queries)} queries on {index_name} with top_k={top_k}")
    durations = []
    for q in queries:
        body = {
            "size": top_k,
            "query": {
                "knn": {
                    "field": "vector",
                    "query_vector": q,
                    "k": top_k
                }
            }
        }
        start = time.time()
        es.search(index=index_name, body=body)
        end = time.time()
        durations.append((end - start) * 1000)
    return sum(durations) / len(durations) if durations else 0


def main():
    es_url = os.getenv("ES_URL", "http://localhost:9200")
    es_api_key = os.getenv("ES_API_KEY")
    if es_api_key:
        es = Elasticsearch(es_url, api_key=es_api_key)
    else:
        es = Elasticsearch(es_url)

    # この実行専用の run_seed を毎回ランダムに決める
    # → プログラムを再実行すると run_seed が変わり、
    #    ベクトルも検索クエリも毎回違うデータになる
    run_seed = secrets.randbits(32)
    print(f"[INFO] run_seed = {run_seed}")

    # (任意)もし特定の run_seed を再現したいときは、上をコメントアウトして
    # run_seed = 1234567890  # などと固定すればよい

    # まず古いインデックスを全部削除
    print("Deleting all leftover indexes before testing...")
    for index_name, _ in TEST_SPECS:
        es.indices.delete(index=index_name, ignore=[400, 404])
    print("Deletion complete.\n")

    # 検索クエリ用 RNG
    # 用途を分けるために run_seed + 1 を使っているだけで、
    # 性質的には run_seed と同じ
    print(f"Generating {SEARCH_NUM} random query vectors with {DIM} dimensions...")
    query_rng = np.random.default_rng(run_seed + 1)
    search_queries = [query_rng.random(DIM).tolist() for _ in range(SEARCH_NUM)]

    results = []

    for index_name, index_type in TEST_SPECS:
        create_index(es, index_name, index_type)

        # 各インデックスごとに同じ run_seed から RNG を作成
        # → 同じ実行の中では全インデックスで doc_id → ベクトルが同じになる
        rng = np.random.default_rng(run_seed)

        ingest_time = ingest_vector(es, index_name, NUM_VECTORS, rng)
        avg_search_time = search_vector(es, index_name, search_queries, top_k=TOP_K)

        results.append({
            "index_type": index_type,
            "ingest_time": f"{ingest_time:.2f}",
            "search_time": f"{avg_search_time:.1f}"
        })

        # 計測が終わったインデックスは削除(ディスク・メモリ節約)
        es.indices.delete(index=index_name)
        print(f"Deleted index: {index_name} (type: {index_type})\n")

    print("## Parameters\n")
    print(f"- Number of vectors: {NUM_VECTORS}")
    print(f"- Dimensions: {DIM}")
    print(f"- HNSW M: {M}")
    print(f"- HNSW ef_construction: {EF_CONSTRUCTION}")
    print(f"- Ingest generation batch size: {INGEST_GEN_BATCH_SIZE}")
    print(f"- Bulk batch size: {BULK_BATCH_SIZE}")
    print("\n## Ingestion & Search Time\n")
    print("| Index type    | ingest_time (sec) | search_time (ms) |")
    print("|-------------- |------------------:|-----------------:|")
    for r in results:
        print(f"| {r['index_type']:14} | {r['ingest_time']:>18} | {r['search_time']:>16} |")


if __name__ == "__main__":
    main()

ベクトル数を100万とかにするとForce Mergeが結構長い時間かかりました。
PCでやってるのでいたしかたないですが。

結果

こちらが結果です

ベクトル数:10,000

| Index type    | ingest_time (sec) | search_time (ms) |
|-------------- |------------------:|-----------------:|
| flat           |               5.04 |              2.9 |
| hnsw           |               5.12 |              2.7 |
| int8_hnsw      |               5.17 |              2.6 |
| bbq_hnsw       |               5.13 |              2.1 |
| bbq_disk       |               4.84 |              2.3 |

ベクトル数:100,000

| Index type    | ingest_time (sec) | search_time (ms) |
|-------------- |------------------:|-----------------:|
| flat           |              49.41 |              7.0 |
| hnsw           |              63.56 |              5.1 |
| int8_hnsw      |              63.43 |              3.1 |
| bbq_hnsw       |              64.08 |              3.3 |
| bbq_disk       |              49.38 |              2.7 |

ベクトル数:1,000,000

| Index type    | ingest_time (sec) | search_time (ms) |
|-------------- |------------------:|-----------------:|
| flat           |             495.70 |            826.1 |
| hnsw           |             813.60 |            204.1 |
| int8_hnsw      |             810.70 |            202.1 |
| bbq_hnsw       |             818.41 |             62.0 |
| bbq_disk       |             530.34 |             26.4 |

今回はRecallとかは測定していないので、単純にIngestとSearchが早いから良い、とはなりませんが、ある一定の目安にはなると思います。

まとめ

Elasticsearchではベクトル検索を行う際に様々なオプションを用意しています。
これを使えば完璧、というのはありません。
皆様の用途に応じて使い分けをしてください。
必要なメモリリソースを極限まで減らすのであれば今回のDiskBBQですが、Recallとのバランスを見ながら検証を行い、選んでいただくのが良いと考えています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?