3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【Rails】プロセス並列

Last updated at Posted at 2023-10-21

プロセス並列とは

複数のプロセスを起動して各プロセスに異なるCPUコアを割り当てて並列処理を行う方法.

プロセスとは、実行中のプログラムのことです。

1 つのプロセスには、1 つのメモリ空間 (メモリ領域) が割り当てられます。メモリ空間はプロセスから OS に要求すれば (空きがあれば) 増やしてくれます。

スレッドとは、プロセス内で命令を逐次実行する部分であり、CPU コアを利用する単位のことです。

マルチスレッドとは、1つのプログラムを複数に分割し、同時に処理を進める技術のこと。つまり、1つのプログラム内で複数の処理を同時に実行することができる。スレッドは軽量で、同じプロセス内であるため、スレッド同士でのデータの共有が容易で、また、プロセスの生成や破棄が必要なく、プログラマの手間も少なくて済む。ただし、スレッド同士が同じデータにアクセスするときには、注意が必要で、スレッドの中で排他制御を行わなければ、データ競合が起こる可能性がある。(スレッドセーフの話)

一方、マルチプロセスとは、1つのプログラムを複数に分割し、複数のプロセスで同時に処理を進める技術のこと。つまり、別々のプログラム間(別々の実行環境)でデータをやりとりしながら処理を進めることができる。プロセスは重量で、まったく別のプロセスとして実行されるため、プロセス間でデータの共有には特殊な手法が必要であり、プロセスの生成や破棄には時間とリソースが必要。

全体像

プロセス並列処理のサンプルコードを書き起こしてみました。
ここで確認する部分は、Parallel.map内Parallel.map直後の書き方です。

※通知処理の実行やメール生成処理等とありますが、並列化の書き方という点ではあまり関係ないものなので省略してます(サンプルコードなのでどんな処理を並列化するかという点ではなんでも良かった)。

Gemfile
gem 'parallel'
Sample Code
class NotificationProcessor
  # 並列数
  PROCESS_COUNT = 4

  # 通知データを分割するサイズ
  SPLIT_SIZE = 1000

  # 処理結果
  SUCCESS = 0
  ALERT = 1
  FATAL = 2
  ERROR = 3

  # 通知タイプ
  NOTIFICATION_SALE_INFO = 1
  NOTIFICATION_SHIPPING = 2

  # システムID
  SYSTEM_ID = 1

  # ステータス: 未送信
  STATUS_UNSENT = 0

  def execute
    begin
      # 通知データを抽出
      notifications = Notification.joins(status_management: :notification_content) 
                                  .where(notification_type: [NOTIFICATION_SALE_INFO, NOTIFICATION_SHIPPING])
                                  .where(status_managements: { sms_status: STATUS_UNSENT })
    rescue => e
      # 異常終了
      @mail_result = FATAL
      generate_error_mail('通知データの抽出に失敗しました。', e)
      fatal_exit
    end

    @mail_result = SUCCESS

    regist_count = parallel_process(notifications)

    mail_body = generate_mail_body(regist_count)
    generate_mail(mail_body)
  end

  private

  # メール本文の生成
  def generate_mail_body(regist_count)
    "処理済み総件数:#{regist_count}\n並列実行数:#{PROCESS_COUNT}\n"
  end

  # 並列数(定義した並列数 or CPUコア数の小さい方)
  def process_count
    [PROCESS_COUNT, Parallel::processor_count].min
  end

  # 並列処理の実行
  def parallel_process(notifications)
    # 抽出した通知IDをSPLIT_SIZE毎に分割
    split_notification_ids = notifications.ids.in_groups_of(SPLIT_SIZE, false)

    # mapのループ回数(それぞれのプロセス内で独立して扱う)
    map_count = 0

    # 並列処理
    parallel_results =
      Parallel.map(split_notification_ids, in_processes: process_count) do |notification_ids|
        # 子プロセスの初回呼出し時のみDB再接続
        @reconnect ||= ActiveRecord::Base.connection.reconnect!

        map_count += 1
        regist_count = 0

        notifications = Notification.eager_load({ status_management: :notification_content }, { billing: :member })
                                    .where(id: notification_ids)

        notifications.each do |notification|
          begin
            # 通知登録処理
            process_notification(notification)

            regist_count += 1

            # Parallel.worker_numberを呼ぶとワーカープロセスの番号を取得できる
            log(Parallel.worker_number, map_count, regist_count, notifications.size, notification.id)
          rescue NotificationTemplateNotFound => e
            # 警告終了
            generate_error_mail("テンプレート: #{notification.notification_content.sms_template} は登録されていません。", e)
            raise Parallel::Break, { status: :ALERT_EXIT, mail_result: ALERT }
          rescue StandardError => e
            # 異常終了
            generate_error_mail("通知登録に失敗しました。通知ID:#{notification.id}", e)
            raise Parallel::Break, { status: :FATAL_EXIT, mail_result: ERROR }
          end
        end

        { regist_count: regist_count, mail_result: @mail_result }
      end

    # 親プロセスに戻ってきた直後の処理
    after_parallel_actions(parallel_results)
  end

  # 並列処理実行後の処理
  def after_parallel_actions(parallel_results)
    # 親プロセスに戻ったら、明示的にDB接続を切る/再接続
    ActiveRecord::Base.connection.close
    ActiveRecord::Base.connection.reconnect!

    # 子プロセスの例外を拾う
    handle_child_exceptions(parallel_results)

    parallel_results.sum{ |result| result[:regist_count] }
  end

  # 子プロセスの例外を処理
  def handle_child_exceptions(parallel_results)
    if parallel_results.is_a?(Hash)
      case parallel_results[:status]
      when :FATAL_EXIT
        @mail_result = parallel_results[:mail_result]
        fatal_exit
      when :ALERT_EXIT
        @mail_result = parallel_results[:mail_result]
        alert_exit
      end
    end
  end

  # 通知処理の実行
  def process_notification(notification)
    request_form = NotificationProc::RequestForm.new do |rf|
      rf.member = notification.billing.member
      rf.notification = notification
      rf.notification_content = notification.status_management.notification_content
      rf.system_id = SYSTEM_ID
    end

    NotificationProc.execute(request_form)
  end

  def alert_exit
    # ALERT時に終了させる処理
  end

  def fatal_exit
    # FATAL時に終了させる処理
  end

  def log(worker_number, map_count, regist_count, total_count, notification_id)
    # ログ出力
  end

  def generate_mail(mail_body)
    # 正常終了メールの生成処理
  end

  def generate_error_mail(message, error = nil)
    # エラーメールの生成処理
  end
end
console
NotificationProcessor.execute

親プロセスから派生させた複数の子プロセスを、同時に実行している

フォーク

コンピュータ上で実行されているプログラム(プロセス)が、自身の複製を作成して新たなプロセスとして起動することをフォークという。

Parallelのメソッド

Parallel.mapの他、#each, #each_with_index, #map_with_index, #any?, #all? 等がある

実装時の注意点

子プロセスから親プロセスに戻ったら、DBの接続を切る+再接続を行う(後述

子プロセスで例外が発生した時はParallel::BreakParallel::Killを使用する

親プロセスで定義した変数の中身を、子プロセスで書き換えない(参照のみにする)
※インクリメント用変数など、それぞれのプロセス内で独立して処理するものであればOK

ワーカープロセスのメモリ使用量が増加し、メモリ不足になる可能性に注意

Parallel.eachParallel.map第1引数to_aされてParallel内部で配列になる。 この挙動を知らないとParallelの呼び出し元で意図せずメモリ使用量が増大することも。

※ParallelにActiveRecordを渡すとメモリ不足に陥る可能性があるため、なるべくid配列などを渡す

複数のプロセスが同時に同じリソースにアクセスするような処理をしている場合、そのリソースが悲観ロックされているとプロセス間で待機状態となり、結果的に処理が直列時と変わらないような挙動になってしまう(並列化してもロック待ちで並ぶため、処理時間が直列と変わらない)

子プロセスでのreturn, break, next, exit, abortは推奨されていないため、使用する際は十分挙動に注意する
※使用できるケースもあるため、打鍵にて十分に挙動を確認する。

子プロセスではStandardError以外の例外で抜けてはいけない

Marshalでシリアライズできないオブジェクトを受け渡すことは出来ない(後述

シリアライズとは

シリアライズとは、複数の要素を一列に並べる操作や処理のこと。単にシリアライズといった場合には、プログラムの実行状態や複雑なデータ構造などを一つの文字列やバイト列で表現する「直列化」を指すことが多い。

並列数とCPUコア数

「4並列を指定しているのに、2並列でしか実行されない」という場合には、実行環境のCPUコア数を疑う

CPUコア数よりも大きな値を並列数に指定している場合、物理的な制約によりそれより大きな数での並列処理は行われない

PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly

PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly
 	This probably means the server terminated abnormally
 	before or while processing the request.

プロセス並列によってこのようにDBへの接続に問題が発生した場合、フォーク直後や親プロセスへ戻った直後に明示的にDB接続を切ったり、再接続を試す

Parallel.map(...) do
  @reconnect ||= ActiveRecord::Base.connection.reconnect!
  # 省略
end

ActiveRecord::Base.connection.close
ActiveRecord::Base.connection.reconnect!

Parallel::DeadWorker

ハマると厄介なやつ。デバッグ方法を記載しておく。

Parallel::DeadWorker
 .../gems/parallel-1.20.1/lib/parallel.rb:78:in `rescue in work'
 .../gems/parallel-1.20.1/lib/parallel.rb:75:in `work'

dump or loadのどちらで発生したのかを把握する

ワーカープロセスとの通信

parallelはワーカープロセスとの通信をIO.pipeで生成したパイプの入出力で行います。 ワーカープロセスとのオブジェクトの受け渡しはMarshal.dump, Marshal.loadを使います。 このためMarshalでシリアライズできないオブジェクトをワーカープロセスと受け渡すことはできません。

確認方法: Gemに直接コードを仕込んで確認

def work(data)
  begin
    Marshal.dump(data, write)
  rescue Errno::EPIPE
    puts "DeadWorker encountered during dump. data: #{data}, write: #{write}"
    raise DeadWorker
  end

  result = begin
    Marshal.load(read)
  rescue EOFError
    puts "DeadWorker encountered during load. read: #{read}"
    raise DeadWorker
  end
  raise result.exception if result.is_a?(ExceptionWrapper)
  result
end

まずはMarshal.dump(data, write) or Marshal.load(read)のどちらで発生したのかを把握。gemに直接putsだったり、他にもログを出力させるよう仕込んでも良い。

確認方法: railsコンソール

デフォルトオブジェクトがProcであるHash」はダンプ出来ないということに気がつくまでParallel::DeadWorkerに悩まされた。実は rails cで気付けた。

デフォルトオブジェクトがProcであるHashの場合

iterm2
(base) app % bundle exec rails c
Loading development environment (Rails 7.0.2.3)
irb(main):001:0> sample_hash = { "00001" => 1, "00002" => 2, "00003" => 3 }
=> {"00001"=>1, "00002"=>2, "00003"=>3}
irb(main):002:0> hash_keys = [:first, :second, :third]
=> [:first, :second, :third]
irb(main):003:0> hash = Hash.new { |hash, key| hash[key] = sample_hash.transform_values { 0 } }
=> {}
irb(main):004:0> hash_keys.each { |key| hash[key] }
=> [:first, :second, :third]
irb(main):005:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):006:0> hash[:aaa]
=> {"00001"=>0, "00002"=>0, "00003"=>0}
irb(main):007:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}, :aaa=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):008:0> hash.default_proc
=> #<Proc:0x0000000111374f80 (irb):3>
irb(main):009:0> Marshal.dump(hash)
(irb):9:in `dump': can't dump hash with default proc (TypeError)

Marshal.dumpでシリアライズできないので、ここでParallel::DeadWorkerが発生していたことが分かる

通常のHashに直した場合

iterm2
(base) app % bundle exec rails c
Loading development environment (Rails 7.0.2.3)
irb(main):001:0> sample_hash = { "00001" => 1, "00002" => 2, "00003" => 3 }
=> {"00001"=>1, "00002"=>2, "00003"=>3}
irb(main):002:0> default_value = sample_hash.transform_values { 0 }
=> {"00001"=>0, "00002"=>0, "00003"=>0}
irb(main):003:0> hash_keys = [:first, :second, :third]
=> [:first, :second, :third]
irb(main):004:0> hash = {}
=> {}
irb(main):005:0> hash_keys.each { |key| hash[key] = default_value }
=> [:first, :second, :third]
irb(main):006:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):007:0> hash[:aaa]
=> nil
irb(main):008:0> hash.default_proc
=> nil
irb(main):009:0> Marshal.dump(hash)
=> "\x04\b{\b:\nfirst{\bI\"\n00001\x06:\x06ETi\x00I\"\n00002\x06;\x06Ti\x00I\"\n00003\x06;\x06Ti\x00:\vsecond@\x06:\nthird@\x06"

シリアライズされ、Parallel::DeadWorkerも解消される

行き詰まった時は、小さな粒度でデバッグしていくことを忘れない

GitHub

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?