1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

大量データ処理スクリプト実装時に気を付けたいこと

Posted at

前提

  • 大量データ処理スクリプトとは?
    • 以下の運用スクリプトのイメージ
      • 処理対象は、RailsのARモデル
      • メモリ消費の観点から、対象データを一括取得できない
      • 処理対象が多いため、並行・並列処理を行いたい
  • 各種バージョンは以下
    • ruby 2.7.6p219
    • rails (6.1.7.2)
    • parallel (1.22.1)

実装時のチェックリスト

毎回以下のようなことを確認している

  • 作業全体の流れは確認できているか
    • やりたいことはスクリプトだけでは完結しないことが多い
      • 例えば、CSV出力してAthenaやjqで集計するなど
    • 全体の流れを踏まえて、最適なスクリプトを実装したい
      • 例えば、集計時のことを考慮したフォーマット(例. CSV, JSON, ...)にするなど
  • 期限内に処理完了できるか
    • 期待するスループットが出せる実装になっていること
      • 特にボトルネックへの対応は考慮したい
      • 前提となるサーバ側のスペックが足りないとスケールしない
    • サンプリングして大まかなスループットを計測しておくと良い
      • 実行時に外から並列数等を渡せると調整しやすい
  • 処理状況の確認を行えるか
    • ログ出力が適切に行えていること
      • 処理開始/完了や処理過程、スループットを把握できる
    • ログ出力が確認できること
      • 例えば、定期削除されるディレクトリ配下に出力しない
  • 並列処理時に問題が起きないか
    • 何かしらファイル出力する場合、スレッドセーフになっていること
      • 例えば、単純な標準出力ではなく、loggerを使う
    • データストア側も並列処理対応されていること
      • 接続プールに関する設定を行う
        • parallelの作法に倣う
          • フォークする(プロセスのコピーを生成する)際に connection を貼り直す
          • 各スレッドは、connection pool(connection のキャッシュ)を使う
        • rails の接続プール数を調整する
          • config/database.yml の pool を「スレッド数 + 1」
            • sidekiq は、スレッドとは別にプロセスでプールを使うらしい
        • DB 側のコネクション許容数を調整する
          • MySQL なら max_connections?
            • 実行数 * プロセス数 * 接続プール数
      • 必要な箇所でトランザクションを貼る
        • かつ適切な分離レベルになっていること
  • 再実行時の考慮はできているか
    • 仮に処理失敗した時の手順が整理されていること
      • 処理完了はスキップして、未完了のもののみ処理したい
      • また、上述した内容について考慮できていないことが分かって軌道修正したい時にも

テンプレート

基本的な作りをベースにして必要な箇所を修正する

execute.rb
class Application < Rails::Application
  ...
  # ログ出力時に時刻も載せる
  config.log_formatter = Logger::Formatter.new
end
execute.rb
# 実行コマンドは以下
# nohup bin/rails r execute.rb -e production > tmp/execute.log &

require 'optparse'

Rails.logger = Logger.new(STDOUT)



class Executor
  def initialize(in_processes:, in_threads:, batch_size:)
    @in_processes = in_processes
    @in_threads = in_threads
    @batch_size = batch_size
  end

  def execute!
    [処理対象のモデル].find_in_batches(batch_size: batch_size) do |per_batch_size|
      Parallel.each(per_batch_size.in_groups(in_processes, false), in_processes: in_processes) do |per_process|
        @reconnected ||= ApplicationRecord.connection.reconnect! || true

        Parallel.each(per_process, in_threads: in_threads) do |per_thread|
          Rails.logger.info("message: #{per_thread.id} process start.")

          ActiveRecord::Base.connection_pool.with_connection do
            # [メインの処理]
          end

          Rails.logger.info("message: #{per_thread.id} process finish.")
        rescue StandardError => e
          Rails.logger.error(
            message: 'Executor is failed.',
            error_class: e.class,
            error_message: e.message,
            backtrace: e.backtrace,
            target_id: per_thread.id
          )

          # 処理失敗したら後でまとめて再実行するので一旦スルーする
        end
      end
    end
  end
end



command_name = File.basename($PROGRAM_NAME)

Rails.logger.info(message: "#{command_name} start.")

begin
  params = ARGV.getopts('', 'in_processes:1', 'in_threads:1', 'batch_size:10000')
  in_processes = params['in_processes'].to_i
  in_threads = params['in_threads'].to_i
  batch_size = params['batch_size'].to_i

  Executor.new(in_processes: in_processes, in_threads: in_threads, batch_size: batch_size).execute!
rescue StandardError => e
  Rails.logger.error(
    message: "#{command_name} is failed.",
    error_class: e.class,
    error_message: e.message,
    backtrace: e.backtrace
  )

  raise
end

Rails.logger.info(message: "#{command_name} finish.")

おまけ

  • 検証用ブランチを作っておくと共有しやすい
    • 修正内容はhttps://github.com/[リポジトリ]/compare/[ベースのブランチ]...[検証用ブランチ]で確認
    • 他環境での取得はgit fetch; git checkout [検証用ブランチ]
スクリプト実行後に使うコマンド
$ ps aux | grep 1716

# 処理されているか
$ tail -f ~.log

# 進み具合
$ watch -n3 'wc -l ~.log'
1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?