QPSとは❓
QPS(Queries Per Second)は文字通り1秒間に成功するクエリの数です。
1クエリ = 1トランザクションとみなす場合、TPS (Transactions Per Second) とも言われます
ベクトル検索では(正確にはベクトル検索だけでなく通常のWebAPIもですが)、事前に設定した
- IaaSのスペック(CPU,メモリ等)
- ベクトルデータ
- クエリ
に対して、連続的に実行して限界QPS(TPS)を計測します。
シンプルな負荷試験ツール🤖
ベクトル検索のベンチマークではvectordbbenchやann-benchmarkが有名ですが、下記を実現したいケースもあります。
- 本番想定のオリジナルなデータ構造でテストしたい
- 上記ツールが対応していないベクトルストアの利用
- テストシナリオをコントロールしたい
そのためシンプルに測定できるツールをPythonで作ってみました。
抽象クラスであるAbstractLoaderTestを用意して緩くinterface化しています:
- execute_query
- 対象のベクトルストアに対して連続実行するクエリを記述します
- run_test
- 下記3点を引数に指定して実行します
- QPS:1秒間に発行するクエリ数。execute_queryを毎秒指定した回数で実行
- duration:ツールの実行時間(秒)
- タイムアウト:処理遅延を許容する閾値(秒)
- 結果ログファイルに成功/失敗回数が集計され、全て成功であればそのQPSは達成となります
- 下記3点を引数に指定して実行します
負荷試験抽象クラスコード
import time
import threading
import logging
import random
import string
import datetime
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor, wait
class AbstractLoaderTest(ABC):
def __init__(self, tps, duration, timeout):
"""
:param tps: Number of operations per second
:param duration: Test execution time in seconds
:param timeout: Timeout value (in seconds) for each batch
"""
self.tps = tps
self.duration = duration
self.timeout = timeout
self.total_queries = 0
self.success_count = 0
self.error_count = 0
self.lock = threading.Lock()
@abstractmethod
def execute_query(self):
"""
Abstract method to execute the test operation.
This method can be overridden for non-database purposes as well.
"""
pass
def run_test(self):
"""
Executes tps operations per second (using execute_query) and evaluates the results within the timeout period.
"""
with ThreadPoolExecutor(max_workers=self.tps * 2) as executor:
for i in range(self.duration):
batch_futures = [executor.submit(
self.execute_query) for _ in range(self.tps)]
batch_start = time.time()
done, not_done = wait(batch_futures, timeout=self.timeout)
for future in done:
try:
future.result()
with self.lock:
self.success_count += 1
except Exception:
with self.lock:
self.error_count += 1
for future in not_done:
with self.lock:
self.error_count += 1
future.cancel()
batch_elapsed = time.time() - batch_start
if batch_elapsed < 1.0:
time.sleep(1.0 - batch_elapsed)
with self.lock:
self.total_queries += self.tps
self.report_results()
def report_results(self):
logging.info("=== Test Results ===")
logging.info("Total operations: %d", self.total_queries)
logging.info("Successful operations: %d", self.success_count)
logging.info("Failed operations: %d", self.error_count)
qps = self.total_queries / self.duration
logging.info("QPS: %.2f", qps)
@staticmethod
def setup_logging():
"""Configure logging (append if the file for the current day exists)"""
current_date = datetime.datetime.now().strftime("%Y%m%d")
log_filename = f"vectordbtest_{current_date}.log"
logging.basicConfig(
level=logging.DEBUG,
filename=log_filename,
filemode='a',
encoding='utf-8',
format='%(asctime)s - %(levelname)s - %(message)s'
)
def _generate_unique_comment(self):
"""Generate a random string for cache busting"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=8))
OCI Database with PostgreSQLで実際に検証📜
ポスグレ構築(OCIコンソール)
OCIコンソールからの構築はこちらに分かりやすくまとまっています。
基本的な流れはpgvectorを適用した構成ファイルの作成後、それを適用したポスグレインスタンスの作成になります。
OCIコンソールとは別にterraformを利用すると一発で作成できるので便利です:
ポスグレ構築(terraform)
resource "oci_psql_configuration" "test_flexible_configuration" {
#Required
compartment_id = var.compartment_id
shape = "VM.Standard.E5.Flex"
db_configuration_overrides {
items {
config_key = "max_connections"
overriden_config_value = "5000"
}
items {
config_key = "oci.admin_enabled_extensions"
overriden_config_value = "vector"
}
}
db_version = "15"
display_name = "terraform test flex configuration"
#Optional
instance_memory_size_in_gbs = "0"
instance_ocpu_count = "0"
is_flexible = true
description = "test configuration created by terraform"
# これにより、更新時は新規作成→DBシステムの参照更新→古い構成削除の順で処理
lifecycle {
create_before_destroy = true
}
}
resource "oci_psql_db_system" "test_db_system" {
compartment_id = var.compartment_id
credentials {
password_details {
password_type = var.db_system_credentials_password_details_password_type
password = var.db_system_credentials_password_details_password
}
username = var.db_system_credentials_username
}
config_id = oci_psql_configuration.test_flexible_configuration.id
apply_config = "RESTART"
db_version = "15"
display_name = var.db_system_display_name
network_details {
subnet_id = var.db_system_network_details_subnet_id
}
shape = "PostgreSQL.VM.Standard.E5.Flex"
instance_ocpu_count = "2"
instance_memory_size_in_gbs = "32"
storage_details {
is_regionally_durable = var.db_system_storage_details_is_regionally_durable
system_type = var.db_system_storage_details_system_type
}
source {
source_type = var.db_system_source_source_type
}
}
ベクトルデータ投入
ベクトルデータは任意の方法で投入します。
今回はlangchainを用いてPDFファイルをpgvectorに取り込んでおきます。
負荷試験ツール実装
先ほどのAbstractLoaderTestを継承した実装クラスを作成します。
import yaml
import psycopg2
from psycopg2 import pool
from abstract_loader_test import AbstractLoaderTest
import logging
class OCI_Postgres_LoadTest(AbstractLoaderTest):
def __init__(self, tps, duration, timeout, config=None):
"""
:param tps: Number of SQL queries per second
:param duration: Test execution time in seconds
:param timeout: Timeout value (in seconds) for each batch
:param config: Configuration content. If not provided, 'config.yaml' will be loaded and logging will be configured automatically.
"""
if config is None:
AbstractLoaderTest.setup_logging()
config = self.load_config('config.yaml')
self.config = config
super().__init__(tps, duration, timeout)
# Create a PostgreSQL connection pool (minconn=1, maxconn is tps * 2)
self.conn_pool = pool.SimpleConnectionPool(
minconn=1,
maxconn=self.tps * 2,
host=self.config['dbhost'],
port=5432,
dbname=self.config['dbname'],
user=self.config['username'],
password=self.config['password']
)
@staticmethod
def load_config(file_path):
"""Load the configuration file (YAML)"""
with open(file_path, 'r', encoding='utf-8') as file:
return yaml.safe_load(file)
@property
def BASE_SQL(self):
"""SQL template string containing the {unique} placeholder."""
return f"""
SELECT * FROM {self.config['table']}
ORDER BY embedding <-> %s::vector
LIMIT 1
-- no-cache: {{unique}}
"""
@property
def EMBEDDING_VECTOR(self):
"""The embedding vector value to be passed to the SQL query."""
return self.config['EMBEDDING_VECTOR']
def get_connection(self):
return self.conn_pool.getconn()
def put_connection(self, conn):
self.conn_pool.putconn(conn)
def close_all(self):
self.conn_pool.closeall()
def execute_query(self):
"""
Retrieves a connection from the pool, executes the SQL query and returns the result.
Appends a unique string to bypass cache.
"""
conn = None
try:
conn = self.get_connection()
with conn.cursor() as cur:
# _generate_unique_comment is defined in AbstractLoaderTest
unique = self._generate_unique_comment()
sql = self.BASE_SQL.format(unique=unique)
cur.execute(sql, (self.EMBEDDING_VECTOR,))
result = cur.fetchall()
conn.commit()
return result
except Exception as e:
logging.error("Error during query execution: %s", e)
raise e
finally:
if conn:
self.put_connection(conn)
if __name__ == '__main__':
# Main execution: create an instance with the required parameters and run the test
tester = OCI_Postgres_LoadTest(tps=100, duration=30, timeout=3)
tester.run_test()
負荷試験実施
上記ポスグレ実装クラスを実行します。
今回はひとまずテスト継続時間30秒、タイムアウト3秒の条件でテストしていきます。
指定したタイムアウトを超える応答があった場合はFailedとしてカウントされるため、
全クエリが成功していることを確認して、その設定QPSが達成されたかを判断します。
2025-03-13 07:41:26,954 - INFO - === Test Results ===
2025-03-13 07:41:26,955 - INFO - Total operations: 3000
2025-03-13 07:41:26,955 - INFO - Successful operations: 3000
2025-03-13 07:41:26,955 - INFO - Failed operations: 0
2025-03-13 07:41:26,955 - INFO - QPS: 100.00
QPS | 結果 |
---|---|
100QPS | ⭕ |
200QPS | ⭕ |
300QPS | ✖️ |
270QPS | ✖️ |
250QPS | ⭕ |
260QPS | ⭕ |
270QPS | ⭕ |
280QPS | ✖️ |
ということで今回の条件においては限界QPS=270という結果になりました。
メトリクスを見ると、失敗時のCPU使用率が100%を超えているため本ケースではCPUがボトルネックとなっています。
※CPU使用率が100%に達しており、安定稼働が難しい状況であるため、このQPSを「限界」とみなします。
この結果は指定条件によって全くと言っていいほど変わってきます。
- テストデータは1PDFファイルのみのため、大量のベクトルデータを入れればここまでQPSは出ないはずです
- 2oCPU(4vCPU)であるため、コア数を増やせばさらにQPSは上げられそうです
- タイムアウトを3秒にしましたが、より判定を甘くすればQPSは上がり、厳しくすればQPSは下がるでしょう
おわりに
ベクトル検索の限界QPSを調べる簡易ツールを作成し、実際にpgvectorで動作確認してみました。
負荷試験は本番リリース前には必須の作業ですが、RAG構築の際のベクトル検索においても同様であるため、本ツールが参考になれば幸いです。