1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

JavaAdvent Calendar 2016

Day 21

並列バッチパターン[MySQL on Fusion-IOのパフォーマンスを引き出す]

Last updated at Posted at 2016-12-21

背景

この4年間に弊社サービスで扱っている企業データは10万社から約380万社まで増えました。
最も大きなテーブルには約7億レコード格納されています。
この大量のデータを処理するためのバッチ処理の工夫を紹介したいと思います。

前提条件

データの性質

更新は少なく追加がほとんどです。
処理対象データはフラグ管理されています。

改善前の処理方式

改善前の処理方式は、処理の起点になるデータを一括して取得して、1レコード単位で関連する処理を実行する方式でした。

begin();
List<Target> targets = selectTargets();
for (Target t : targets) {
    //select
    //insert
    //select
    //update
    //select
    //delete
    //etc.. と続く逐次処理
}
commit();

ボトルネックと対応方針

インデックスが正しく貼られている場合、通信コストがボトルネックになってくるのでそこを改善する方法を考えます。
パフォーマンス改善のために、クエリの発行回数を削減することと待機時間の削減するようにトライしました。
普通に処理してるとFusion-IOが遊んでしまうのでFusion-IOに仕事させるために並行処理します。

環境・ライブラリ

Java 8
jOOQ 2.8 (上げたい)
Fusion-IO (HDDやSSDでも速くなります)

その他の制限

binlog方式のレプリケーションをしているためコミット単位は大きくなりすぎないように注意が必要になります。

CQS [コマンドとクエリの分離]

ループ内で逐次実行していたselectをループ外で一括取得してDtoに詰めておきます。
データ取得結果をファーストクラスコレクションにしてキャッシュするデータを置くのもいいと思います。


List<Integer> targetIds = selectTargets();
List<Target> targets = fetchData(targetIds);
List<Result> results = targets.stream().map(t -> process(t)).collect(toList());

begin();
bulkInsert(results);
bulkDelete(results);
bulkUpdate(results);
//etc...
commit();
  • テーブルロックがかかる時間が減る
  • テストを容易にする(fetchDataとprocessをテストすれば良い)

参考CQS:wikipedia

並列処理

// キーの取得は軽い処理なのでひとまず並列処理しなくていい
List<Integer> targetIds = selectTargets();
List<List<Integer>> chunks = Lists.partition(targetIds, 1000);
chunks.parallelStream().forEach(ids -> {
    // 複数テーブルへのselectや重いクエリを実行は並列処理したい
    List<Target> targets = fetchData(ids);
    List<Result> results = targets.stream().map(t -> process(t)).collect(toList());

    Retry.do(3, () -> {
        begin();
        bulkInsert(results);
        bulkDelete(results);
        bulkUpdate(results);
        //etc...
        commit();
    });
}
  • データの取得とDBの更新をマルチスレッドで実行
  • デッドロックしないように実装するが、稀なデッドロックはリトライでカバーする

ここまでの改善で(重複処理の削除やキャッシングも組み合わせて)約200倍性能改善し、
1週間かかる想定だった大量データ投入を1時間で終了できました。

今後の改善

計画中でまだ実施していないもの

Queue or Stream or Messaging基盤の導入

対象データのフラグ管理を廃止してロックを大幅に減らせる。
データ量や遅延に応じて処理の並列度を増やせる。

トランザクションの分解とイベント処理

まとめて処理している更新系の処理を分割して実行効率を上げる。
トランザクションでデータの整合性を担保しないため、参照データの整合性を保つ仕組みが必要になる。
Aggregator(Reactive Messaging Patterns Chapter 7.Message Routing)
のような仕組みでヘッダテーブルへの書き込みを全てのデータが揃ってから実行することで対応したい。

終わり

つっこみお待ちしてます。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?