🔍 OpenSearch × RDB 連携実装ガイド
💡 アーキテクチャの前提
- 検索の実行環境: OpenSearch
- データのマスタ: RDB
- 構成方針: RDBのデータをOpenSearchへ同期し、検索時のパフォーマンスを最適化する。
0. ローカル開発環境の準備 (Docker)
ローカルでOpenSearch本体と、データ確認用のGUIツール(Dashboards)を立ち上げるための docker-compose.yml の定義例です。
※ローカル開発用にセキュリティプラグインは無効化しています。
# docker-compose.yml
services:
# OpenSearch Server
opensearch:
image: opensearchproject/opensearch:1.3
platform: linux/amd64
container_name: opensearch
networks:
- app-network
environment:
- discovery.type=single-node
- plugins.security.disabled=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
volumes:
- opensearch_volume:/usr/share/opensearch/data
ulimits:
memlock:
soft: -1
hard: -1
security_opt:
- seccomp:unconfined
cap_add:
- IPC_LOCK
# OpenSearch Dashboards (GUIツール)
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:1.3
container_name: os_dashboards
ports:
- "5601:5601"
environment:
OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"
depends_on:
- opensearch
networks:
- app-network
networks:
app-network:
driver: bridge
external-network:
external: true
volumes:
opensearch_volume:
driver: local
1. クライアントの生成
OpenSearchへの接続クライアントをファクトリパターンで生成します。
ローカル(開発環境)では通常のHTTP接続、AWS環境ではセキュリティを担保するために IAM認証(SigV4署名) を使用する構成にしています。
📄 クライアントファクトリの実装
// app/Factories/OpenSearchClientFactory.php
class OpenSearchClientFactory
{
public static function create(): Client
{
$clientBuilder = ClientBuilder::create()
->setHosts([config('searchEngine.host')]);
// AWS環境ではSigV4署名ハンドラーをセットアップ
if (!app()->isLocal()) {
$clientBuilder->setHandler(self::buildSigV4Handler());
}
return $clientBuilder->build();
}
/**
* AWS環境用のSigV4署名付きリクエストハンドラーを生成
* ※ AWS Managed OpenSearchへのアクセスに必要な認証処理
*/
private static function buildSigV4Handler(): callable
{
$provider = CredentialProvider::defaultProvider();
$region = config('openSearch.region', 'ap-northeast-1');
$guzzleClient = new GuzzleClient(['http_errors' => false]);
return function (array $request) use ($provider, $region, $guzzleClient) {
// PSR-7リクエストに変換し、AWS SigV4で署名を付与
$psr7Request = new Request(
$request['http_method'],
(new Uri($request['uri']))->withScheme('https')->withHost($request['headers']['host'][0]),
$request['headers'],
$request['body']
);
$signer = new SignatureV4('es', $region);
$signedRequest = $signer->signRequest($psr7Request, $provider()->wait());
// 署名付きリクエストを実行
$response = $guzzleClient->send($signedRequest);
return new CompletedFutureArray([
'status' => $response->getStatusCode(),
'headers' => $response->getHeaders(),
'body' => $response->getBody()->detach(),
// ... その他必要なレスポンスマッピング ...
]);
};
}
}
⚙️ 依存関係と設定
ServiceProviderでシングルトン登録を行い、アプリケーション全体でクライアントインスタンスを使い回す(接続コストを減らす)ようにします。
// app/Providers/AppServiceProvider.php
$this->app->singleton(Client::class, function () {
return OpenSearchClientFactory::create();
});
2. マッピングとアナライザの定義
インデックスの設定(マッピングやアナライザ)は、config で一元管理します。
// config/openSearch.php
return [
'region' => env('OPENSEARCH_REGION', 'ap-northeast-1'),
'indices' => [
'items' => [
'settings' => [
'index' => [
'analysis' => [
// N-gramトークナイザーの定義
'tokenizer' => [
'my_ngram_tokenizer' => [
'type' => 'ngram',
'min_gram' => 1,
'max_gram' => 2,
'token_chars' => ['letter', 'digit', 'punctuation', 'symbol'],
],
],
// カスタムアナライザーの定義
'analyzer' => [
'my_ngram_analyzer' => [
'type' => 'custom',
'tokenizer' => 'my_ngram_tokenizer',
'filter' => ['lowercase'],
],
],
],
],
],
'mappings' => [
'properties' => [
'id' => [
'type' => 'integer',
],
'name' => [
'type' => 'text',
'analyzer' => 'my_ngram_analyzer',
'search_analyzer' => 'my_ngram_analyzer',
],
// その他必要なフィールド定義...
],
],
],
],
];
📝 補足: N-gramアナライザーを採用する理由
テキストの「部分一致検索(例:ユーザーが『東京』と入力した時に『東京都庁』をヒットさせる等)」を確実に行うために採用しています。
min_gram=1,max_gram=2と設定することで、「東京都庁」は[東, 京, 都, 庁, 東京, 京都, 都庁]と細かく分解してインデックス化され、検索の取りこぼしを防ぐことができます。
⚠️ N-gramの特性上、「京都」で検索すると「東京都庁」もヒットします。
精度よりも網羅性(取りこぼしを防ぐこと)を優先する設計です。
3. インデックスへのデータ投入(Repository)
データの登録・更新・インデックス管理などの具体的な操作は Repository レイヤーにカプセル化します。
// app/Repositories/SearchEngine/SearchIndexRepository.php
class SearchIndexRepository
{
// メモリ枯渇を防ぐための分割処理サイズ
private const CHUNK_SIZE = 300;
public function __construct(
private Client $client,
) {}
/**
* バルクインサートを実行する
* ※ 大量データを1件ずつ送信するとネットワーク負荷が高いため、Bulk APIを使用
*/
public function bulkInsert(string $indexName, array $documents): array
{
$params = ['body' => []];
$errors = [];
foreach ($documents as $doc) {
// アクション・メタデータ
$params['body'][] = [
'index' => [
'_index' => $indexName,
'_id' => $doc['id'],
],
];
// ドキュメント本体
$params['body'][] = $doc['body'];
}
$response = $this->client->bulk($params);
// エラーハンドリング(部分的な失敗を収集)
if ($response['errors'] ?? false) {
foreach ($response['items'] as $item) {
if (isset($item['index']['error'])) {
$errors[] = [
'id' => $item['index']['_id'],
'error' => json_encode($item['index']['error'], JSON_UNESCAPED_UNICODE),
];
}
}
}
return $errors;
}
/**
* インデックスを作成する
*/
public function createIndex(string $indexName, array $config): void
{
$this->client->indices()->create([
'index' => $indexName,
'body' => $config,
]);
}
/**
* エイリアスをアトミック(瞬時)に切り替える
*/
public function switchAlias(string $aliasName, array $removeIndexNames, string $addIndexName): void
{
$actions = [];
// 旧インデックスからのエイリアス解除
foreach ($removeIndexNames as $removeIndexName) {
$actions[] = ['remove' => ['index' => $removeIndexName, 'alias' => $aliasName]];
}
// 新インデックスへのエイリアス付与
$actions[] = ['add' => ['index' => $addIndexName, 'alias' => $aliasName]];
$this->client->indices()->updateAliases([
'body' => ['actions' => $actions],
]);
}
}
4. 全量再構築(ゼロダウンタイム)
マッピングの変更などでインデックスを全再構築する場合、「エイリアスのアトミックスワップ」 を利用してユーザーの検索を一切止めずに(ダウンタイムなしで)切り替えます。
🔄 実行のしくみ
- 新インデックス作成 (例:
items_20260413_120000)- データ投入 (新インデックスに対して、RDBからデータをチャンク分割してバルクインサート)
- エイリアス切替 (検索側が参照している
itemsという名前を、古い方から新しい方へ瞬時に付け替える)- クリーンアップ (不要になった旧インデックスを削除)
// app/Services/SearchEngine/RebuildIndexService.php
class RebuildIndexService
{
private const ALIAS_NAME = 'items';
public function execute(): bool
{
$config = config('openSearch.indices.' . self::ALIAS_NAME);
// 1. 新インデックス作成(タイムスタンプ付きで重複回避)
$newIndexName = self::ALIAS_NAME . '_' . date('Ymd_His');
$this->searchIndexRepository->createIndex($newIndexName, $config);
// 2. 全データをバルクインサート
// ※ 内部でDBから Chunk() で取得し、Repositoryの bulkInsert() に渡す想定
$success = $this->bulkInsertAll($newIndexName);
if (!$success) {
// 失敗時は作成途中の新インデックスを削除してロールバック
$this->searchIndexRepository->deleteIndex($newIndexName);
return false;
}
// 3. エイリアスを新インデックスに切替(ゼロダウンタイム)
$oldIndexNames = $this->searchIndexRepository->getIndicesByAlias(self::ALIAS_NAME);
$this->searchIndexRepository->switchAlias(self::ALIAS_NAME, $oldIndexNames, $newIndexName);
// 4. 旧インデックスを削除
foreach ($oldIndexNames as $oldIndex) {
$this->searchIndexRepository->deleteIndex($oldIndex);
}
return true;
}
}
📋 構成のまとめ
| レイヤー | クラス・ファイル名 | 役割 |
|---|---|---|
| Docker | docker-compose.yml |
OpenSearchとDashboardsのローカル起動 |
| Factory | OpenSearchClientFactory |
クライアント生成(ローカル: HTTP / AWS: SigV4認証) |
| Config | config/openSearch.php |
マッピング・アナライザーの定義一元管理 |
| Repository | SearchIndexRepository |
OpenSearchへのCRUD操作(Bulk Insertなど) |
| Service | RebuildIndexService |
全量再構築(エイリアススワップを利用した無停止更新) |