背景
この4年間に弊社サービスで扱っている企業データは10万社から約380万社まで増えました。
最も大きなテーブルには約7億レコード格納されています。
この大量のデータを処理するためのバッチ処理の工夫を紹介したいと思います。
前提条件
データの性質
更新は少なく追加がほとんどです。
処理対象データはフラグ管理されています。
改善前の処理方式
改善前の処理方式は、処理の起点になるデータを一括して取得して、1レコード単位で関連する処理を実行する方式でした。
begin();
List<Target> targets = selectTargets();
for (Target t : targets) {
//select
//insert
//select
//update
//select
//delete
//etc.. と続く逐次処理
}
commit();
ボトルネックと対応方針
インデックスが正しく貼られている場合、通信コストがボトルネックになってくるのでそこを改善する方法を考えます。
パフォーマンス改善のために、クエリの発行回数を削減することと待機時間の削減するようにトライしました。
普通に処理してるとFusion-IOが遊んでしまうのでFusion-IOに仕事させるために並行処理します。
環境・ライブラリ
Java 8
jOOQ 2.8 (上げたい)
Fusion-IO (HDDやSSDでも速くなります)
その他の制限
binlog方式のレプリケーションをしているためコミット単位は大きくなりすぎないように注意が必要になります。
CQS [コマンドとクエリの分離]
ループ内で逐次実行していたselectをループ外で一括取得してDtoに詰めておきます。
データ取得結果をファーストクラスコレクションにしてキャッシュするデータを置くのもいいと思います。
List<Integer> targetIds = selectTargets();
List<Target> targets = fetchData(targetIds);
List<Result> results = targets.stream().map(t -> process(t)).collect(toList());
begin();
bulkInsert(results);
bulkDelete(results);
bulkUpdate(results);
//etc...
commit();
- テーブルロックがかかる時間が減る
- テストを容易にする(fetchDataとprocessをテストすれば良い)
並列処理
// キーの取得は軽い処理なのでひとまず並列処理しなくていい
List<Integer> targetIds = selectTargets();
List<List<Integer>> chunks = Lists.partition(targetIds, 1000);
chunks.parallelStream().forEach(ids -> {
// 複数テーブルへのselectや重いクエリを実行は並列処理したい
List<Target> targets = fetchData(ids);
List<Result> results = targets.stream().map(t -> process(t)).collect(toList());
Retry.do(3, () -> {
begin();
bulkInsert(results);
bulkDelete(results);
bulkUpdate(results);
//etc...
commit();
});
}
- データの取得とDBの更新をマルチスレッドで実行
- デッドロックしないように実装するが、稀なデッドロックはリトライでカバーする
ここまでの改善で(重複処理の削除やキャッシングも組み合わせて)約200倍性能改善し、
1週間かかる想定だった大量データ投入を1時間で終了できました。
今後の改善
計画中でまだ実施していないもの
Queue or Stream or Messaging基盤の導入
対象データのフラグ管理を廃止してロックを大幅に減らせる。
データ量や遅延に応じて処理の並列度を増やせる。
トランザクションの分解とイベント処理
まとめて処理している更新系の処理を分割して実行効率を上げる。
トランザクションでデータの整合性を担保しないため、参照データの整合性を保つ仕組みが必要になる。
Aggregator(Reactive Messaging Patterns Chapter 7.Message Routing)
のような仕組みでヘッダテーブルへの書き込みを全てのデータが揃ってから実行することで対応したい。
終わり
つっこみお待ちしてます。