出来らあっ!
背景
弊社には、自社で構築・運用・保守を手掛けている社内用のファイルストレージサービスがあります。(たまに社外の人も弊社とのファイル共有で使う)
イメージとしてはBoxの社内版みたいな感じです。Boxほどモダンな作りではないのですが…笑
このサービスにこれまで利用者としては関わりがあったものの、開発者としてはほぼ関わりがなかった筆者ですが、今年度から新規機能開発メンバーに突如アサインされました。
新規機能開発とは?とも思ったのですが、なんでも数年間の間「全文検索をしたいんだ」という話が持ち上がっては消え、持ち上がっては消え、を繰り返していたとのこと。
では、何がボトルネックで全文検索を実装できなかったのかというと、みんな忙しくて十分な検証時間を確保できないというのもあったのですが、色々話を伺うとデータ量がチョット多すぎるネ…という点が大きな要因だった模様です。
というのも、このファイルストレージの運用は基盤刷新前も含めて二十年以上続いているのですが、その合計データサイズが数百TBにものぼるとのことでした。
※3年前くらいに基盤刷新をして現在はAWSサービスを活用した構成(後述)になっています。
クローリングが特に高難易度
さて、全文検索を実現するには、大雑把に言うと次の3つの処理フェーズがあると考えています。異論は認めます。
1. クローリング処理
2. インデクシング処理
3. 検索処理
非常に便利な世の中でして、上記の2や3についてはElasticsearchやOpenSearchあたりで実現できます。
しかし、1のクローリングについては数百TBを相手にするとなるとなかなか手強い…というのは読者の皆様も想像がつくと思います。
全文検索やらクローリング処理やらはズブの素人の筆者(詳しくは過去の投稿をご参照ください)ですが、なんの因果が働いたのかこんな実現可能性もよくわからないこんな領域に「出来らあっ!」の精神で飛び込んでいくことになりました。
ということで、インデクシングや検索で苦労しなかったわけではないのですが、クローリングの処理方式の検討やその実現になかなかのリソースを割いたので、ここから先はクローリング処理の話がメインになると思います。ご了承ください。
システム構成の概略
どうやってクローリング処理を実現したのか?という話をしたいのですがその前に、データソースとなるファイルストレージサービスの構成とクローリング処理の構成からお伝えします。
ファイルストレージサービスの構成
あえてデフォルメしているところもありますが、このサービスは単なるNASがドンッとあるわけではなく、
- ファイルのメタ情報を格納するDBサーバ(Amazon RDS for PostgreSQL)
- ファイルの実体を格納するファイルサーバ(Amazon FSx for Windows File Server)
- アプリケーションサーバ(Amazon EC2)
などで構成されています。
普段、ユーザがこのサービスを使うときはアプリケーションサーバへアクセスして、ファイルを操作したり、フォルダの状態を見たり…、といったことをします。
そしてユーザの操作内容に応じて、アプリケーションサーバがPostgreSQLに問合せてメタ情報(フォルダ構成、ファイル名、ファイルID、アクセス権、バージョン情報…等)を取得・更新します。
もしファイルの実物をアップロード・ダウンロードする場合には、アプリケーションサーバがFSxに問合せてファイルそのものを配置・取得します。
クローリング処理の構成
上記のうち、クローリング処理にとって関係があるのは、
- PostgreSQLに格納されたファイルの 「メタ情報」
- FSxに格納されたファイル内の文字列、即ち 「全文情報」
です。
これらをどうやって取り込むかなのですが、
- メタ情報 → PGSyncというPythonベースのOSSを活用
- 全文情報 → 多重並列実行できるような処理をJavaで自作
という感じで実現しました。
ちなみに、検索インデックスサーバはAWSのOpenSearch Serviceを活用しています。
OpenSearchクラスタをEC2などで構築する手段もありますが、保守性の高さを考慮してマネージドサービスを採用しました。
またSQSは、PostgreSQLに記録されている「該当のファイル実体がFSx上のどこに格納されているか」という情報を、全文情報クローリング処理に受け渡すために、FSx上のパス情報をメッセージとしてキューイングしています。
PGSync
あまりメジャーではないOSSですが、PostgreSQLからElasticsearchやOpenSearchにデータを同期することに特化したソフトウェアです。我々のためにあるのかと思ったくらい今回の用途に合っていると思い、採用しました。
daemonモードで起動すると、全件同期処理が完了後に 変更データキャプチャ(CDC) によるリアルタイム同期処理に移行します。
schema.json
という設定ファイルで、クローリング対象のテーブルとカラムを指定して起動するとそれらをIndex化してくれます。
簡単なJOINであればこの設定ファイルで実現可能です。
更にPostgreSQLから持ってきたデータを少し加工してIndexに格納したいという場合は、Pythonでプラグインを自分で実装すれば割と柔軟に対応できます。
注意点は、CDCを実現するために下記の設定をPostgreSQLに施す必要がある点です。
-
①PGSyncからPostgreSQLへの接続ユーザにrds_superuserを付与する
- PGSyncがsuperuser権限を持つユーザで既存のテーブルにTriggerやFunctionを仕掛けたり、マテリアライズドビューを作るので、アプリケーションからのPostgreSQL接続ユーザの権限も適宜強化する必要があります
- アプリケーションからのPostgreSQL接続ユーザにもrds_superuserを付与したところ、ファイルストレージサービスの基盤担当の先輩に叱られた(至極真っ当)ので、PGSyncがTrigger、Function、Materialized Viewを作ったあと、それらのオブジェクトへの権限を別途付与しました
-
②WALを読むので、logical_replicationをONにする
- WALの肥大化やスロット変更の頻度等には注意する必要があります
上記注意点をクリアできさえすれば簡単にPostgreSQLからOpenSearchにデータ同期ができますので、非常に有用です。
このように環境変数で諸々のパラメータを設定するのですが、裏設定みたいなものもたくさんありますので、適宜活用していきました。
Javaで自作した全文情報のクローリング処理
今回のケースでは数億件のファイルをクローリングする必要があるということで、並列度を高めることが重要です。直列にクローリングしていると最低でも数ヶ月、下手したら1年以上かかる量だと思います…。
全文情報をクローリングできるソフトウェアは有名どころのOSSだと「FESS」があるのですが、FESSは複数スレッドで実行することはできるものの、複数サーバで協調しながら並列に処理するということはできません。(筆者の知る限りでは)
それに加え、FESSは検索オールインワンのソフトとしては機能も豊富で非常に優秀であるものの、今回の用途としてはツボを外してのオーバースペック感もあります。
そこで筆者は、1台のサーバにたくさんCPUコアを積んで複数スレッドで実行しつつ、更にそれを複数台立ち上げることでCPUコア数×サーバ台数で並列化できる仕組みを考えました。
からくりとしてはこんな流れです。
- あらかじめメタ情報を同期するときに、PGSyncのプラグインでPostgreSQLから取り込んだファイル情報からFSx上のパス情報を作り、その情報をSQSにキューイングしておく
- 自作Javaアプリのコンテナを多数立ち上げる(スレッドも複数確保)
- それぞれのコンテナがSQSに溜まったFSx上のパス情報を取得
- 取得したパス情報をもとにSMBプロトコルでFSxに接続しファイルの実体にアクセス
- Apache Tikaでファイル内の全文情報を文字列として抽出
- この文字列をOpenSearchの_update APIを用いて送信
- SQSが空になるまで3.以降を繰り返し
一個一個の処理はそこまで複雑ではないので、全部で500行くらいのボリュームで作れました。
5で全文情報抽出に使ったApache Tikaは、テキスト系ファイルだけでなくMS Office系のファイルやPDFなど様々なファイル形式にも柔軟に対応しており、非常に使い勝手が良かったです。
数百TBのファイル群をIndexに同期するにあたって苦労したこと
元ネタ的には、
「数百TBのファイル群が格納されたファイルストレージの全文検索用Indexを作れるっていったんだよ!」
というセリフも言いたくなるところでしたが、うまくデータをIndexに同期するまでには細かい試行錯誤を繰り返しました。
その一部を紹介しようと思います。
①PGsyncプラグインに処理詰め込みすぎ問題
PGSyncの素敵なところの一つに、所謂CDC的にリアルタイムでDBの情報をIndexに同期でき、さらにその同期処理の都度、任意に実装したプラグインの処理を実行できるという点があります。
そんな便利なPGSyncを動かしながら筆者は思ったのです。
「DBに更新がかかるタイミングで、検索しやすいようにデータをこねくり回そうじゃないか」
「DBに更新がかかるタイミングで、SQSにメッセージを送信してしまおうじゃないか」
「DBに更新がかかるタイミングで、DB上にはレコードして存在するが検索されないデータはIndexから消してしまおうじゃないか」
「DBに更新がかかるタイミングで、サービス上のパス情報を再帰クエリでDBに別途問合せることで生成してしまおうじゃないか」
「DBに更新がかかるタイミングで、…
と、まあこのような具合で通信が発生する処理も含め色々な処理を多数入れ込もうと試みました。
実際リアルタイムに追随するには必要な処理で、1件処理するだけならターンアラウンドタイムとしても気にならないレベルなのですが、問題は数億件の既存ファイルを初期同期する時です。
一連の処理を実行するのに1件あたり0.3秒かかる(実測でだいたいそれくらい)とし、初期同期対象が1億件としたら、
3百万秒=833.3時間=34.7日=約1ヶ月
…ということで、ちょっと時間がかかりすぎてしまいます。
そのため、このクローリングの仕組みでは数億件の初期同期を行うための処理とリアルタイム同期を行うための通常用の処理をコンテナの単位で完全に分けました。
初期同期用のコンテナでは速度重視で、他サーバ・サービスとの通信処理はなるべくまとめるよう、プラグインの実装を工夫しました。
- 例1)SQSへのメッセージ送信は、1件ずつCSVに書き出しておき、PGSyncの処理が終わったらCSVに書き出した内容を読み出してバッチアクションでまとめてSQSに送信しました。
- 例2)サービス上のパス情報の生成は、メタ情報の同期タイミングでは一旦空としておいた上で、全てのパス情報を生成できるSQLを書いて、それをサーバサイドカーソルを使いながら負荷を軽減しつつ実行し、_bulk APIを用いて一括で更新しました。
ちなみに、次のように実行するとPGSyncはdaemon起動(常駐で起動)できるのですが、
pgsync --config /path/to/schema.json --daemon
単発で起動する場合は次のように--daemon
なしで実行します。初期同期用のコンテナではこちらの単発での起動を実行しています。
pgsync --config /path/to/schema.json
このようにすることで、メタ情報の初期同期処理を数日で完了させることに成功しました。(それでも1日以上は必ずかかるのですが…)
②PGSyncメモリリーク疑惑
そんなわけで初期同期用のプラグインを備えたPGSyncを実行したのですが、メモリ使用率の推移を監視しているとどんどん右肩上がりに上昇し続けているではありませんか。
そして、ある時…
これではクローリングが完了できません。
※メモリ使用率のグラフで22時あたりで数分間メモリ使用率が盛り上がっているのは、少し別処理を実行しています
そこで、PGSyncの実装やPGSyncの使っているopensearchpyやSQLAlchemyの実装を穴が空くほどくまなくチェックしました。それはもう血眼です。
そしてついに原因を突き止めたのです。
PGSyncだけに問題があるのかと思いきや、opensearchpyにも疑義のある実装がありました。
ざっくり言うと、_bulk APIのリクエストが成功したときに、「登録に成功したリクエストBodyは具体的にこういう内容だったんだよ~」とdictで返してくれてしまっている実装1がありました。
更に参照をPGSync側が切らない(切れない)実装2となっているために、この「登録に成功したリクエストBodyは具体的にこういう内容だったんだよ~」のdictがGCされずにメモリに溜まり続けてしまう、ということが起きていました。
個人的には「登録に成功したリクエストBodyは具体的にこういう内容だったんだよ~」のdictは情報として不要と考えたので、opensearchpyの実装に一部手を加えてこの問題を力技で解決しました。(opensearchpyはApache License 2.0です。)
ちなみにOOMが発生した時、メモリ解析に使ったのはmemrayというツールで、割と使いやすかったのでお勧めです。
③レプリケーションログ多すぎ問題
なんとか初期同期をやっつけたところで、次はリアルタイム処理用のPGSyncをdaemonで起動します。
この時のPGSyncの挙動としては、"初期同期用PGSync開始時点~リアルタイム処理用PGSync開始時点"の追いつきをしようとします。
どうやって追いつきをしようとするかというと、まずはトランザクションIDを手がかりに、PGSyncの設定ファイルschema.json
で親テーブルとしたテーブルの更新レコードを抽出して同期します。
この同期が終わると、次はDBの論理スロットに出力されているレプリケーションログを全件読み込んで、"初期同期用PGSync開始時点~リアルタイム処理用PGSync開始時点"の間の更新情報を漏らさず同期します。
今回相手にしているファイルストレージサービスでは、ユーザが利用した結果走るトランザクションの他にも定期的にトランザクションが走っていたりもするので、レプリケーションログが1秒で数十件は平気で溜まっていきます。
一方、PGSyncで行われるレプリケーションログの処理はなかなか効率の良いものとは言えない実装となっていました。
※PGSync開発者もそれを自覚しているようで、TODOとして「もうちょっと効率よくしたいな~」的なコメントをコード内に残しています。
つまり、この状況で何が起こるか。
レプリケーションログが生み出される速度が、それを処理する速度を上回るのです。
"初期同期用PGSync開始時点~リアルタイム処理用PGSync開始時点"の間に溜まったレプリケーションログは700万件ほどだったのですが、その件数は時間が経つにつれてどんどん増えていきました。処理をしているにもかかわらず。
数日様子を見ていたのですがこれでは埒が明かないので、一旦PGSyncをストップして考えました。
そして熟考の末、このファイルストレージサービスのトランザクションの走り方にフィットするように、PGSyncのレプリケーションログの処理方法を独自に作り変えることで、またもや力技で解決しました。(PGSyncはMIT Licenseです。)
もともとの処理ではレプリケーションログをチャンクで取り出しているのですが、取り出した後に抜け漏れのないよう注意しながら不要なログはすべて無視するように独自にフィルタリング処理を追加しています。
すると、体感1%くらいしか有効なログはなく、処理を実際に実行してみると6時間くらいでレプリケーションログの処理を完了させることができました。
④文字数が多いファイル問題
ここまではメタ情報のクローリングについてでしたが、この節では全文情報のクローリングについてです。
全文情報のクローリング処理で文字列抽出にApache Tikaを使っています。
処理するファイル内の文字数が多い場合、Apache Tikaに何も設定していないとデフォルト上限である100万文字以上はエラーで弾かれてしまいます。
しかし、弊社内で取り扱われているファイルをざっと見渡すと数百万文字単位のファイルもちらほら。中には数千万文字のファイルもありました。
流石に全てカバーすることは無理にしても、上限100万文字では少し物足りない…ということで、ひとまずApache Tikaの設定で5000万文字を上限とするよう指定しました。(もちろんその分メモリも増強)
しかし、次に耐えられなかったのはOpenSearch Serviceの方でした。
OpenSearchのデフォルトのHTTPペイロード受信サイズ上限は100MBまでです。
※インスタンスタイプによっては10MBまで
日本語のようなマルチバイト文字で5000万文字を送ろうとすると、HTTPヘッダーやその他の情報も含めて100MBをオーバーしてしまうのです。
ここで、セルフホストのOpenSearchであれば上限の設定を変更して対応することもできるのですが、今回使っているのはマネージドサービスであるOpenSearch Serviceです。AWSサポートにリクエストを出すことで緩和できる可能性はあるようなのですが、ノード障害等で物理的にノードが変わってしまった場合などでは緩和した設定が元に戻ってしまうとのことでした。
「本番運用していたらいつの間にか設定が戻っていた!」というのはイマイチなので、送信するペイロードサイズを小さくすることにしました。
クローリング対象のファイルが長文だった場合は、抽出した文字列をチャンクに分割してIndexに送信するように実装しました。
1つ目のチャンクには下記のようなPOSTリクエストをOpenSearchに送信します。
POST <Index名>/_update/<_id>
{
"script": {
"source": "ctx._source.content = params.content",
"params": {"content": "<分割した文字列>"}
}
}
2つ目のチャンク以降は下記のようなPOSTリクエストをOpenSearchに送信します。
POST <Index名>/_update/<_id>
{
"script": {
"source": "ctx._source.content += params.content",
"params": {"content": "<分割した文字列>"}
}
}
1つ目のチャンクではscript
のsource
の指定を=
として_update APIを実行していますが、2つ目以降のチャンクではscript
のsource
の指定を+=
とすることで、追記をしています。
これによってある程度文字数の大きいファイルも送信することができるようになりました。
ちなみに、こうして登録はできるようになったのですが、検索時も気をつけなくてはなりません。
無邪気に何も考えずに検索APIを投げるとレスポンスBodyに数百万文字の全文情報が返ってきてしまうので、そうならないようにリクエスト時にexcludes
の指定やhighlight
の活用も検討することをお勧めします。
⑤リトライ処理とSQSの可視性タイムアウト
大量のファイルの全文情報をクローリングするということで、複数コンテナ×複数スレッドで多重実行ができるような仕組みとしているわけですが、OpenSearch側が大量のリクエストを同時に受け付けられないケースがあります。
HTTPステータスコードで言うと、409 Conflict
や429 Too Many Requests
なんかを返してしまうケースです。
そこで、リトライ処理を実装したうえで、リトライしてもだめだったらSQSのメッセージを削除しないという実装をしています。
ここで重要になるのがSQSの可視性タイムアウトの設定です。
可視性タイムアウトが短すぎると、あるメッセージに対応するファイルの処理をリトライしている最中にそのメッセージがまた取得可能になってしまいます。すると、他のコンテナやスレッドが本来処理中であるメッセージをまた取ってしまい処理してしまいます。
リトライを実行中ということは、OpenSearchにとってはただでさえ負荷が高めなリクエストを受けているのに、OpenSearch上の同じID、同じシャード、同じデータノードに対してまた同時にリクエストを送ることとなり、まさに傷に塩を塗るようなことが起きてしまいます。
(実際に開発環境でこの現象に遭遇し、特定のデータノードを虐めてしまった結果、Indexが再帰不能になりました。)
そういったことが起きないように、SQSの可視性タイムアウトの設定は適切に長めに設定することで、処理中なのに同じSQSのメッセージを同時に複数回読まれないようにする必要があります。
さいごに
思ったより文字が多めの記事となってしまいましたが、ここまで読んでくださりありがとうございます。
素人ながら「出来らあっ!」の精神で数百TBのファイル群が格納されたファイルストレージの全文検索用Index構築に取り組み始めたわけですが、いくつかの困難を乗り越えてなんとか実現することができました。
なかなか実現可能性がわからないテーマでも、とりあえず飛び込んで挑戦してみることも必要なんだなと身に沁みて思った数ヶ月間でした。「出来らあっ!」の精神は大切ですね。
また、そんなテーマにもかかわらず挑戦できる風土を作っていただきつつ、筆者が苦しんでいるときも背中を押し続けてくれた上司やチームの皆さんには本当に感謝です。
-
https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/helpers/actions.py#L198 にて、yieldによって_bulk APIで登録に成功したデータをdictで返しています。しかし、少なくとも今回の用途ではこの情報は不要です。 ↩
-
https://github.com/toluaina/pgsync/blob/d2b8ecd2403de7c0e0b3d7b02ab7f0fec8984024/pgsync/sync.py#L1252 にて、全件同期が終わるまで
self.search_client.bulk
の処理は抜けません。この処理を抜けなければ注釈1で返されるdictは開放されないのですが、このdictが開放されるように実装を変更するのはかなり影響範囲が広がると判断したため、今回opensearchpyの方を変更しました。 ↩