4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ベクトル検索の限界QPSを測定する簡易ツール

Last updated at Posted at 2025-03-13

QPSとは❓

QPS(Queries Per Second)は文字通り1秒間に成功するクエリの数です。
1クエリ = 1トランザクションとみなす場合、TPS (Transactions Per Second) とも言われます

ベクトル検索では(正確にはベクトル検索だけでなく通常のWebAPIもですが)、事前に設定した

  • IaaSのスペック(CPU,メモリ等)
  • ベクトルデータ
  • クエリ

に対して、連続的に実行して限界QPS(TPS)を計測します。

シンプルな負荷試験ツール🤖

ベクトル検索のベンチマークではvectordbbenchann-benchmarkが有名ですが、下記を実現したいケースもあります。

  • 本番想定のオリジナルなデータ構造でテストしたい
  • 上記ツールが対応していないベクトルストアの利用
  • テストシナリオをコントロールしたい

そのためシンプルに測定できるツールをPythonで作ってみました。
抽象クラスであるAbstractLoaderTestを用意して緩くinterface化しています:

  • execute_query
    • 対象のベクトルストアに対して連続実行するクエリを記述します
  • run_test
    • 下記3点を引数に指定して実行します
      • QPS:1秒間に発行するクエリ数。execute_queryを毎秒指定した回数で実行
      • duration:ツールの実行時間(秒)
      • タイムアウト:処理遅延を許容する閾値(秒)
    • 結果ログファイルに成功/失敗回数が集計され、全て成功であればそのQPSは達成となります

負荷試験抽象クラスコード

abstract_loader_test.py
import time
import threading
import logging
import random
import string
import datetime
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, wait

class AbstractLoaderTest(ABC):
    def __init__(self, tps, duration, timeout):
        """
        :param tps: Number of operations per second
        :param duration: Test execution time in seconds
        :param timeout: Timeout value (in seconds) for each batch
        """
        self.tps = tps
        self.duration = duration
        self.timeout = timeout
        self.total_queries = 0
        self.success_count = 0
        self.error_count = 0
        self.lock = threading.Lock()

    @abstractmethod
    def execute_query(self):
        """
        Abstract method to execute the test operation.
        This method can be overridden for non-database purposes as well.
        """
        pass

    def run_test(self):
        """
        Executes tps operations per second (using execute_query) and evaluates the results within the timeout period.
        """
        with ThreadPoolExecutor(max_workers=self.tps * 2) as executor:
            for i in range(self.duration):
                batch_futures = [executor.submit(
                    self.execute_query) for _ in range(self.tps)]
                batch_start = time.time()
                done, not_done = wait(batch_futures, timeout=self.timeout)

                for future in done:
                    try:
                        future.result()
                        with self.lock:
                            self.success_count += 1
                    except Exception:
                        with self.lock:
                            self.error_count += 1

                for future in not_done:
                    with self.lock:
                        self.error_count += 1
                    future.cancel()

                batch_elapsed = time.time() - batch_start
                if batch_elapsed < 1.0:
                    time.sleep(1.0 - batch_elapsed)
                with self.lock:
                    self.total_queries += self.tps

        self.report_results()

    def report_results(self):
        logging.info("=== Test Results ===")
        logging.info("Total operations: %d", self.total_queries)
        logging.info("Successful operations: %d", self.success_count)
        logging.info("Failed operations: %d", self.error_count)
        qps = self.total_queries / self.duration
        logging.info("QPS: %.2f", qps)

    @staticmethod
    def setup_logging():
        """Configure logging (append if the file for the current day exists)"""
        current_date = datetime.datetime.now().strftime("%Y%m%d")
        log_filename = f"vectordbtest_{current_date}.log"
        logging.basicConfig(
            level=logging.DEBUG,
            filename=log_filename,
            filemode='a',
            encoding='utf-8',
            format='%(asctime)s - %(levelname)s - %(message)s'
        )

    def _generate_unique_comment(self):
        """Generate a random string for cache busting"""
        return ''.join(random.choices(string.ascii_letters + string.digits, k=8))

OCI Database with PostgreSQLで実際に検証📜

ポスグレ構築(OCIコンソール)

OCIコンソールからの構築はこちらに分かりやすくまとまっています。
基本的な流れはpgvectorを適用した構成ファイルの作成後、それを適用したポスグレインスタンスの作成になります。

OCIコンソールとは別にterraformを利用すると一発で作成できるので便利です:

ポスグレ構築(terraform)

構成ファイルとインスタンスの作成
resource "oci_psql_configuration" "test_flexible_configuration" {
  #Required
  compartment_id = var.compartment_id
  shape          = "VM.Standard.E5.Flex"
  db_configuration_overrides {
    items {
      config_key             = "max_connections"
      overriden_config_value = "5000"
    }
    items {
      config_key             = "oci.admin_enabled_extensions"
      overriden_config_value = "vector"
    }
  }
  db_version   = "15"
  display_name = "terraform test flex configuration"
  #Optional
  instance_memory_size_in_gbs = "0"
  instance_ocpu_count         = "0"
  is_flexible                 = true
  description                 = "test configuration created by terraform"
  # これにより、更新時は新規作成→DBシステムの参照更新→古い構成削除の順で処理
  lifecycle {
    create_before_destroy = true
  }
}

resource "oci_psql_db_system" "test_db_system" {
  compartment_id = var.compartment_id

  credentials {
    password_details {
      password_type = var.db_system_credentials_password_details_password_type
      password      = var.db_system_credentials_password_details_password
    }
    username = var.db_system_credentials_username
  }
  config_id   = oci_psql_configuration.test_flexible_configuration.id
  apply_config = "RESTART"
  db_version   = "15"
  display_name = var.db_system_display_name

  network_details {
    subnet_id = var.db_system_network_details_subnet_id
  }
  shape                       = "PostgreSQL.VM.Standard.E5.Flex"
  instance_ocpu_count         = "2"
  instance_memory_size_in_gbs = "32"

  storage_details {
    is_regionally_durable = var.db_system_storage_details_is_regionally_durable
    system_type           = var.db_system_storage_details_system_type
  }

  source {
    source_type = var.db_system_source_source_type
  }
}

ベクトルデータ投入

ベクトルデータは任意の方法で投入します。
今回はlangchainを用いてPDFファイルをpgvectorに取り込んでおきます。

負荷試験ツール実装

先ほどのAbstractLoaderTestを継承した実装クラスを作成します。

oci_postgres_load_test.py
import yaml
import psycopg2
from psycopg2 import pool
from abstract_loader_test import AbstractLoaderTest
import logging


class OCI_Postgres_LoadTest(AbstractLoaderTest):
    def __init__(self, tps, duration, timeout, config=None):
        """
        :param tps: Number of SQL queries per second
        :param duration: Test execution time in seconds
        :param timeout: Timeout value (in seconds) for each batch
        :param config: Configuration content. If not provided, 'config.yaml' will be loaded and logging will be configured automatically.
        """
        if config is None:
            AbstractLoaderTest.setup_logging()
            config = self.load_config('config.yaml')
        self.config = config
        super().__init__(tps, duration, timeout)
        # Create a PostgreSQL connection pool (minconn=1, maxconn is tps * 2)
        self.conn_pool = pool.SimpleConnectionPool(
            minconn=1,
            maxconn=self.tps * 2,
            host=self.config['dbhost'],
            port=5432,
            dbname=self.config['dbname'],
            user=self.config['username'],
            password=self.config['password']
        )

    @staticmethod
    def load_config(file_path):
        """Load the configuration file (YAML)"""
        with open(file_path, 'r', encoding='utf-8') as file:
            return yaml.safe_load(file)

    @property
    def BASE_SQL(self):
        """SQL template string containing the {unique} placeholder."""
        return f"""
SELECT * FROM {self.config['table']} 
ORDER BY embedding <-> %s::vector
LIMIT 1
-- no-cache: {{unique}}
"""

    @property
    def EMBEDDING_VECTOR(self):
        """The embedding vector value to be passed to the SQL query."""
        return self.config['EMBEDDING_VECTOR']

    def get_connection(self):
        return self.conn_pool.getconn()

    def put_connection(self, conn):
        self.conn_pool.putconn(conn)

    def close_all(self):
        self.conn_pool.closeall()

    def execute_query(self):
        """
        Retrieves a connection from the pool, executes the SQL query and returns the result.
        Appends a unique string to bypass cache.
        """
        conn = None
        try:
            conn = self.get_connection()
            with conn.cursor() as cur:
                # _generate_unique_comment is defined in AbstractLoaderTest
                unique = self._generate_unique_comment()
                sql = self.BASE_SQL.format(unique=unique)
                cur.execute(sql, (self.EMBEDDING_VECTOR,))
                result = cur.fetchall()
            conn.commit()
            return result
        except Exception as e:
            logging.error("Error during query execution: %s", e)
            raise e
        finally:
            if conn:
                self.put_connection(conn)


if __name__ == '__main__':
    # Main execution: create an instance with the required parameters and run the test
    tester = OCI_Postgres_LoadTest(tps=100, duration=30, timeout=3)
    tester.run_test()


負荷試験実施

上記ポスグレ実装クラスを実行します。
今回はひとまずテスト継続時間30秒、タイムアウト3秒の条件でテストしていきます。

指定したタイムアウトを超える応答があった場合はFailedとしてカウントされるため、
全クエリが成功していることを確認して、その設定QPSが達成されたかを判断します。

100QPS時は全て成功!
2025-03-13 07:41:26,954 - INFO - === Test Results ===
2025-03-13 07:41:26,955 - INFO - Total operations: 3000
2025-03-13 07:41:26,955 - INFO - Successful operations: 3000
2025-03-13 07:41:26,955 - INFO - Failed operations: 0
2025-03-13 07:41:26,955 - INFO - QPS: 100.00
QPS 結果
100QPS
200QPS
300QPS ✖️
270QPS ✖️
250QPS
260QPS
270QPS
280QPS ✖️

ということで今回の条件においては限界QPS=270という結果になりました。
メトリクスを見ると、失敗時のCPU使用率が100%を超えているため本ケースではCPUがボトルネックとなっています。
※CPU使用率が100%に達しており、安定稼働が難しい状況であるため、このQPSを「限界」とみなします。
image.png

この結果は指定条件によって全くと言っていいほど変わってきます。

  • テストデータは1PDFファイルのみのため、大量のベクトルデータを入れればここまでQPSは出ないはずです
  • 2oCPU(4vCPU)であるため、コア数を増やせばさらにQPSは上げられそうです
  • タイムアウトを3秒にしましたが、より判定を甘くすればQPSは上がり、厳しくすればQPSは下がるでしょう

おわりに

ベクトル検索の限界QPSを調べる簡易ツールを作成し、実際にpgvectorで動作確認してみました。
負荷試験は本番リリース前には必須の作業ですが、RAG構築の際のベクトル検索においても同様であるため、本ツールが参考になれば幸いです。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?