はじめに
クライアントからCSVをFTPでもらい、DBに取り込む機能はシステムでは、良くあるユースケースだと思います。
サービス開始当初は、クライアントの規模も小さく、CSV取り込みの処理時間を気にする必要ないのですが、サービスが大きくなると、処理時間が長くなってテコ入れする必要があるのは、よくあるケースだと思います。
ここでは、CSV取り込みの高速化に対するアプローチを紹介します。
ETL
CSV取り込み処理は、設計のアプローチとしては、ETL(Extract/Transform/Load)になると思います。
参考までにETLのライブラリは以下になります。
- Embulk (https://www.embulk.org/)
- Apache Airflow (https://airflow.apache.org/)
- Luigi (https://github.com/spotify/luigi)
- AWS Glue (https://aws.amazon.com/jp/glue/)
ただ、どのライブラリを選んでもインフラをどうするか?と言う問題が発生します。
AWSだと、Fargate(ECS/EKS)
、EC2
のどちらを選んでも定義できるvCPU数(限界)は決まってしまいます。
種類 | CPU(vCPU) |
---|---|
ECS(EKS) on Fargate |
16 |
ECS(EKS) on EC2 |
96 |
EC2 |
96 |
このため、クラスタ機能をもつETLツールを選ぶ必要です。
抽出(Extract)
- このPhaseでは、CSV読み込みの高速化を考えます。
- CSV読み込みは、言語の性能をもろに受けます。ETLライブラリを選定するときの参考になると思います。
- 以下はChatGPT4Proの回答です。
言語 | ライブラリ/実装 | 処理時間 (秒) |
---|---|---|
Go |
csv.Reader(並列処理あり) | 1.5 - 2.0 秒 |
Rust |
csv クレート(並列処理あり) | 1.0 - 1.5 秒 |
Python |
pandas.read_csv()(Cエンジン) | 2.0 - 3.0 秒 |
Java |
opencsv / univocity-parsers | 1.8 - 2.5 秒 |
Ruby |
CSV.read(純粋な Ruby 実装) | 5.0 - 10.0 秒 |
- このため、Extract部分だけ別言語にするのも選択肢になると思います。
参考
変換(Transform)
- このPhaseでは、データベースに変換するロジックを実装なります。このため、どれだけ並列して動かせるかがポイントなります。(=ETLのクラスタ機能が、このphaseで一番大切になります)
余談
- 私は、Fargateのworker処理で実装したことがあります。SQSのキュー数でオートスケールする形を取りましたが、振り返ってみると、ETLのクラスタ機能を使うのが一番シンプルかと思います。
書き出し(Load)
- このPhaseでは、DBへの書き込みの高速化を考えます。
- ここでのポイントはDBコネクション数とロックになります。
DBコネクション数
- DBのmasterは通常1つなので、DBのコネクション数の限界はシステムの限界になりますので、注意が必要です。
- 一方で、AuroraMySQLのServerlessを選ぶと、コネクション数(vCPU)をカジュアルに変更できます。
ポイント
- テクニック的にはプロセス単位でコネクション数を制限(制御)できれば、AuroraMySQLのコネクション数を超えることなく、DBに書き込めると思います。
参考
- Aurora Serverless v2 の最大接続数 (max_connections)を計算しよう
- Amazon Aurora Serverless v2で監視すべきCloudWatchメトリクス
ロック
- マルチプロセスから大量に更新しようとすると
Lock wait timeout exceeded
が発生することがあります。原因は、意図しないテーブルロックが発生するからです。
例
以下のテーブルがあったとします。
CREATE TABLE users (
id INT NOT NULL,
name VARCHAR(255),
PRIMARY KEY (id)
) ENGINE=InnoDB;
以下のように、インデックスを貼っていないカラムで検索すると、フルスキャンが発生しテーブルロックになります。
BEGIN;
SELECT * FROM users WHERE name = 'Alice' FOR UPDATE;
ポイント
- ETLツールを使っているなら、ETLツールに任せましょう。
- ETLツールを使っていない場合は、RDBのトランザクションを諦めて、Redis/Valkeyを使ったMutexロックをお勧めします。
参考
Embulk
- テンポラリテーブルを作ってから目的のテーブルにWriteすることで、ロックを回避しています。
※ うる覚えなので、間違ってたらごめんなさい m(_ _)m