前提
- 大量データ処理スクリプトとは?
- 以下の運用スクリプトのイメージ
- 処理対象は、RailsのARモデル
- メモリ消費の観点から、対象データを一括取得できない
- 処理対象が多いため、並行・並列処理を行いたい
- 以下の運用スクリプトのイメージ
- 各種バージョンは以下
- ruby 2.7.6p219
- rails (6.1.7.2)
- parallel (1.22.1)
実装時のチェックリスト
毎回以下のようなことを確認している
-
作業全体の流れは確認できているか
- やりたいことはスクリプトだけでは完結しないことが多い
- 例えば、CSV出力してAthenaやjqで集計するなど
- 全体の流れを踏まえて、最適なスクリプトを実装したい
- 例えば、集計時のことを考慮したフォーマット(例. CSV, JSON, ...)にするなど
- やりたいことはスクリプトだけでは完結しないことが多い
-
期限内に処理完了できるか
- 期待するスループットが出せる実装になっていること
- 特にボトルネックへの対応は考慮したい
- 前提となるサーバ側のスペックが足りないとスケールしない
- サンプリングして大まかなスループットを計測しておくと良い
- 実行時に外から並列数等を渡せると調整しやすい
- 期待するスループットが出せる実装になっていること
-
処理状況の確認を行えるか
- ログ出力が適切に行えていること
- 処理開始/完了や処理過程、スループットを把握できる
- ログ出力が確認できること
- 例えば、定期削除されるディレクトリ配下に出力しない
- ログ出力が適切に行えていること
-
並列処理時に問題が起きないか
- 何かしらファイル出力する場合、スレッドセーフになっていること
- 例えば、単純な標準出力ではなく、loggerを使う
- データストア側も並列処理対応されていること
- 接続プールに関する設定を行う
-
parallelの作法に倣う
- フォークする(プロセスのコピーを生成する)際に connection を貼り直す
- 各スレッドは、connection pool(connection のキャッシュ)を使う
-
rails の接続プール数を調整する
- config/database.yml の pool を「スレッド数 + 1」
- sidekiq は、スレッドとは別にプロセスでプールを使うらしい
- config/database.yml の pool を「スレッド数 + 1」
- DB 側のコネクション許容数を調整する
- MySQL なら max_connections?
- 実行数 * プロセス数 * 接続プール数
- MySQL なら max_connections?
-
parallelの作法に倣う
- 必要な箇所でトランザクションを貼る
- かつ適切な分離レベルになっていること
- 接続プールに関する設定を行う
- 何かしらファイル出力する場合、スレッドセーフになっていること
-
再実行時の考慮はできているか
- 仮に処理失敗した時の手順が整理されていること
- 処理完了はスキップして、未完了のもののみ処理したい
- また、上述した内容について考慮できていないことが分かって軌道修正したい時にも
- 仮に処理失敗した時の手順が整理されていること
テンプレート
基本的な作りをベースにして必要な箇所を修正する
execute.rb
class Application < Rails::Application
...
# ログ出力時に時刻も載せる
config.log_formatter = Logger::Formatter.new
end
execute.rb
# 実行コマンドは以下
# nohup bin/rails r execute.rb -e production > tmp/execute.log &
require 'optparse'
Rails.logger = Logger.new(STDOUT)
class Executor
def initialize(in_processes:, in_threads:, batch_size:)
@in_processes = in_processes
@in_threads = in_threads
@batch_size = batch_size
end
def execute!
[処理対象のモデル].find_in_batches(batch_size: batch_size) do |per_batch_size|
Parallel.each(per_batch_size.in_groups(in_processes, false), in_processes: in_processes) do |per_process|
@reconnected ||= ApplicationRecord.connection.reconnect! || true
Parallel.each(per_process, in_threads: in_threads) do |per_thread|
Rails.logger.info("message: #{per_thread.id} process start.")
ActiveRecord::Base.connection_pool.with_connection do
# [メインの処理]
end
Rails.logger.info("message: #{per_thread.id} process finish.")
rescue StandardError => e
Rails.logger.error(
message: 'Executor is failed.',
error_class: e.class,
error_message: e.message,
backtrace: e.backtrace,
target_id: per_thread.id
)
# 処理失敗したら後でまとめて再実行するので一旦スルーする
end
end
end
end
end
command_name = File.basename($PROGRAM_NAME)
Rails.logger.info(message: "#{command_name} start.")
begin
params = ARGV.getopts('', 'in_processes:1', 'in_threads:1', 'batch_size:10000')
in_processes = params['in_processes'].to_i
in_threads = params['in_threads'].to_i
batch_size = params['batch_size'].to_i
Executor.new(in_processes: in_processes, in_threads: in_threads, batch_size: batch_size).execute!
rescue StandardError => e
Rails.logger.error(
message: "#{command_name} is failed.",
error_class: e.class,
error_message: e.message,
backtrace: e.backtrace
)
raise
end
Rails.logger.info(message: "#{command_name} finish.")
おまけ
- 検証用ブランチを作っておくと共有しやすい
- 修正内容は
https://github.com/[リポジトリ]/compare/[ベースのブランチ]...[検証用ブランチ]
で確認 - 他環境での取得は
git fetch; git checkout [検証用ブランチ]
- 修正内容は
スクリプト実行後に使うコマンド
$ ps aux | grep 1716
# 処理されているか
$ tail -f ~.log
# 進み具合
$ watch -n3 'wc -l ~.log'