プロセス並列とは
複数のプロセスを起動して各プロセスに異なるCPUコアを割り当てて並列処理を行う方法.
プロセスとは、実行中のプログラムのことです。
1 つのプロセスには、1 つのメモリ空間 (メモリ領域) が割り当てられます。メモリ空間はプロセスから OS に要求すれば (空きがあれば) 増やしてくれます。
スレッドとは、プロセス内で命令を逐次実行する部分であり、CPU コアを利用する単位のことです。
マルチスレッドとは、1つのプログラムを複数に分割し、同時に処理を進める技術のこと。つまり、1つのプログラム内で複数の処理を同時に実行することができる。スレッドは軽量で、同じプロセス内であるため、スレッド同士でのデータの共有が容易で、また、プロセスの生成や破棄が必要なく、プログラマの手間も少なくて済む。ただし、スレッド同士が同じデータにアクセスするときには、注意が必要で、スレッドの中で排他制御を行わなければ、データ競合が起こる可能性がある。(スレッドセーフの話)
一方、マルチプロセスとは、1つのプログラムを複数に分割し、複数のプロセスで同時に処理を進める技術のこと。つまり、別々のプログラム間(別々の実行環境)でデータをやりとりしながら処理を進めることができる。プロセスは重量で、まったく別のプロセスとして実行されるため、プロセス間でデータの共有には特殊な手法が必要であり、プロセスの生成や破棄には時間とリソースが必要。
全体像
プロセス並列処理のサンプルコードを書き起こしてみました。
ここで確認する部分は、Parallel.map内
とParallel.map直後
の書き方です。
※通知処理の実行やメール生成処理等とありますが、並列化の書き方という点ではあまり関係ないものなので省略してます(サンプルコードなのでどんな処理を並列化するかという点ではなんでも良かった)。
gem 'parallel'
class NotificationProcessor
# 並列数
PROCESS_COUNT = 4
# 通知データを分割するサイズ
SPLIT_SIZE = 1000
# 処理結果
SUCCESS = 0
ALERT = 1
FATAL = 2
ERROR = 3
# 通知タイプ
NOTIFICATION_SALE_INFO = 1
NOTIFICATION_SHIPPING = 2
# システムID
SYSTEM_ID = 1
# ステータス: 未送信
STATUS_UNSENT = 0
def execute
begin
# 通知データを抽出
notifications = Notification.joins(status_management: :notification_content)
.where(notification_type: [NOTIFICATION_SALE_INFO, NOTIFICATION_SHIPPING])
.where(status_managements: { sms_status: STATUS_UNSENT })
rescue => e
# 異常終了
@mail_result = FATAL
generate_error_mail('通知データの抽出に失敗しました。', e)
fatal_exit
end
@mail_result = SUCCESS
regist_count = parallel_process(notifications)
mail_body = generate_mail_body(regist_count)
generate_mail(mail_body)
end
private
# メール本文の生成
def generate_mail_body(regist_count)
"処理済み総件数:#{regist_count}件\n並列実行数:#{PROCESS_COUNT}\n"
end
# 並列数(定義した並列数 or CPUコア数の小さい方)
def process_count
[PROCESS_COUNT, Parallel::processor_count].min
end
# 並列処理の実行
def parallel_process(notifications)
# 抽出した通知IDをSPLIT_SIZE毎に分割
split_notification_ids = notifications.ids.in_groups_of(SPLIT_SIZE, false)
# mapのループ回数(それぞれのプロセス内で独立して扱う)
map_count = 0
# 並列処理
parallel_results =
Parallel.map(split_notification_ids, in_processes: process_count) do |notification_ids|
# 子プロセスの初回呼出し時のみDB再接続
@reconnect ||= ActiveRecord::Base.connection.reconnect!
map_count += 1
regist_count = 0
notifications = Notification.eager_load({ status_management: :notification_content }, { billing: :member })
.where(id: notification_ids)
notifications.each do |notification|
begin
# 通知登録処理
process_notification(notification)
regist_count += 1
# Parallel.worker_numberを呼ぶとワーカープロセスの番号を取得できる
log(Parallel.worker_number, map_count, regist_count, notifications.size, notification.id)
rescue NotificationTemplateNotFound => e
# 警告終了
generate_error_mail("テンプレート: #{notification.notification_content.sms_template} は登録されていません。", e)
raise Parallel::Break, { status: :ALERT_EXIT, mail_result: ALERT }
rescue StandardError => e
# 異常終了
generate_error_mail("通知登録に失敗しました。通知ID:#{notification.id}", e)
raise Parallel::Break, { status: :FATAL_EXIT, mail_result: ERROR }
end
end
{ regist_count: regist_count, mail_result: @mail_result }
end
# 親プロセスに戻ってきた直後の処理
after_parallel_actions(parallel_results)
end
# 並列処理実行後の処理
def after_parallel_actions(parallel_results)
# 親プロセスに戻ったら、明示的にDB接続を切る/再接続
ActiveRecord::Base.connection.close
ActiveRecord::Base.connection.reconnect!
# 子プロセスの例外を拾う
handle_child_exceptions(parallel_results)
parallel_results.sum{ |result| result[:regist_count] }
end
# 子プロセスの例外を処理
def handle_child_exceptions(parallel_results)
if parallel_results.is_a?(Hash)
case parallel_results[:status]
when :FATAL_EXIT
@mail_result = parallel_results[:mail_result]
fatal_exit
when :ALERT_EXIT
@mail_result = parallel_results[:mail_result]
alert_exit
end
end
end
# 通知処理の実行
def process_notification(notification)
request_form = NotificationProc::RequestForm.new do |rf|
rf.member = notification.billing.member
rf.notification = notification
rf.notification_content = notification.status_management.notification_content
rf.system_id = SYSTEM_ID
end
NotificationProc.execute(request_form)
end
def alert_exit
# ALERT時に終了させる処理
end
def fatal_exit
# FATAL時に終了させる処理
end
def log(worker_number, map_count, regist_count, total_count, notification_id)
# ログ出力
end
def generate_mail(mail_body)
# 正常終了メールの生成処理
end
def generate_error_mail(message, error = nil)
# エラーメールの生成処理
end
end
NotificationProcessor.execute
親プロセスから派生させた複数の子プロセスを、同時に実行している
フォーク
コンピュータ上で実行されているプログラム(プロセス)が、自身の複製を作成して新たなプロセスとして起動することをフォークという。
Parallelのメソッド
Parallel.map
の他、#each
, #each_with_index
, #map_with_index
, #any?
, #all?
等がある
実装時の注意点
子プロセスから親プロセスに戻ったら、DBの接続を切る+再接続を行う(後述)
子プロセスで例外が発生した時はParallel::Break
やParallel::Kill
を使用する
親プロセスで定義した変数の中身を、子プロセスで書き換えない(参照のみにする)
※インクリメント用変数など、それぞれのプロセス内で独立して処理するものであればOK
ワーカープロセスのメモリ使用量が増加し、メモリ不足になる可能性に注意
Parallel.each
やParallel.map
の第1引数はto_a
されてParallel内部で配列になる。 この挙動を知らないとParallelの呼び出し元で意図せずメモリ使用量が増大することも。
※ParallelにActiveRecordを渡すとメモリ不足に陥る可能性があるため、なるべくid配列などを渡す
複数のプロセスが同時に同じリソースにアクセスするような処理をしている場合、そのリソースが悲観ロックされているとプロセス間で待機状態となり、結果的に処理が直列時と変わらないような挙動になってしまう(並列化してもロック待ちで並ぶため、処理時間が直列と変わらない)
子プロセスでのreturn
, break
, next
, exit
, abort
は推奨されていないため、使用する際は十分挙動に注意する
※使用できるケースもあるため、打鍵にて十分に挙動を確認する。
子プロセスではStandardError
以外の例外で抜けてはいけない
Marshal
でシリアライズできないオブジェクトを受け渡すことは出来ない(後述)
シリアライズとは
シリアライズとは、複数の要素を一列に並べる操作や処理のこと。単にシリアライズといった場合には、プログラムの実行状態や複雑なデータ構造などを一つの文字列やバイト列で表現する「直列化」を指すことが多い。
並列数とCPUコア数
「4並列を指定しているのに、2並列でしか実行されない」という場合には、実行環境のCPUコア数を疑う
CPUコア数よりも大きな値を並列数に指定している場合、物理的な制約によりそれより大きな数での並列処理は行われない
PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly
PG::ConnectionBad: PQconsumeInput() server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
プロセス並列によってこのようにDBへの接続に問題が発生した場合、フォーク直後や親プロセスへ戻った直後に明示的にDB接続を切ったり、再接続を試す
Parallel.map(...) do
@reconnect ||= ActiveRecord::Base.connection.reconnect!
# 省略
end
ActiveRecord::Base.connection.close
ActiveRecord::Base.connection.reconnect!
Parallel::DeadWorker
ハマると厄介なやつ。デバッグ方法を記載しておく。
Parallel::DeadWorker
.../gems/parallel-1.20.1/lib/parallel.rb:78:in `rescue in work'
.../gems/parallel-1.20.1/lib/parallel.rb:75:in `work'
dump
or load
のどちらで発生したのかを把握する
-
Marshal.#dump
-
Marshal.#load
parallelはワーカープロセスとの通信をIO.pipeで生成したパイプの入出力で行います。 ワーカープロセスとのオブジェクトの受け渡しはMarshal.dump, Marshal.loadを使います。 このためMarshalでシリアライズできないオブジェクトをワーカープロセスと受け渡すことはできません。
確認方法: Gemに直接コードを仕込んで確認
def work(data)
begin
Marshal.dump(data, write)
rescue Errno::EPIPE
puts "DeadWorker encountered during dump. data: #{data}, write: #{write}"
raise DeadWorker
end
result = begin
Marshal.load(read)
rescue EOFError
puts "DeadWorker encountered during load. read: #{read}"
raise DeadWorker
end
raise result.exception if result.is_a?(ExceptionWrapper)
result
end
まずはMarshal.dump(data, write)
or Marshal.load(read)
のどちらで発生したのかを把握。gemに直接puts
だったり、他にもログを出力させるよう仕込んでも良い。
確認方法: railsコンソール
「デフォルトオブジェクトがProcであるHash」はダンプ出来ないということに気がつくまでParallel::DeadWorker
に悩まされた。実は rails cで気付けた。
デフォルトオブジェクトがProcであるHashの場合
(base) app % bundle exec rails c
Loading development environment (Rails 7.0.2.3)
irb(main):001:0> sample_hash = { "00001" => 1, "00002" => 2, "00003" => 3 }
=> {"00001"=>1, "00002"=>2, "00003"=>3}
irb(main):002:0> hash_keys = [:first, :second, :third]
=> [:first, :second, :third]
irb(main):003:0> hash = Hash.new { |hash, key| hash[key] = sample_hash.transform_values { 0 } }
=> {}
irb(main):004:0> hash_keys.each { |key| hash[key] }
=> [:first, :second, :third]
irb(main):005:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):006:0> hash[:aaa]
=> {"00001"=>0, "00002"=>0, "00003"=>0}
irb(main):007:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}, :aaa=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):008:0> hash.default_proc
=> #<Proc:0x0000000111374f80 (irb):3>
irb(main):009:0> Marshal.dump(hash)
(irb):9:in `dump': can't dump hash with default proc (TypeError)
Marshal.dump
でシリアライズできないので、ここでParallel::DeadWorker
が発生していたことが分かる
通常のHashに直した場合
(base) app % bundle exec rails c
Loading development environment (Rails 7.0.2.3)
irb(main):001:0> sample_hash = { "00001" => 1, "00002" => 2, "00003" => 3 }
=> {"00001"=>1, "00002"=>2, "00003"=>3}
irb(main):002:0> default_value = sample_hash.transform_values { 0 }
=> {"00001"=>0, "00002"=>0, "00003"=>0}
irb(main):003:0> hash_keys = [:first, :second, :third]
=> [:first, :second, :third]
irb(main):004:0> hash = {}
=> {}
irb(main):005:0> hash_keys.each { |key| hash[key] = default_value }
=> [:first, :second, :third]
irb(main):006:0> hash
=> {:first=>{"00001"=>0, "00002"=>0, "00003"=>0}, :second=>{"00001"=>0, "00002"=>0, "00003"=>0}, :third=>{"00001"=>0, "00002"=>0, "00003"=>0}}
irb(main):007:0> hash[:aaa]
=> nil
irb(main):008:0> hash.default_proc
=> nil
irb(main):009:0> Marshal.dump(hash)
=> "\x04\b{\b:\nfirst{\bI\"\n00001\x06:\x06ETi\x00I\"\n00002\x06;\x06Ti\x00I\"\n00003\x06;\x06Ti\x00:\vsecond@\x06:\nthird@\x06"
シリアライズされ、Parallel::DeadWorker
も解消される
行き詰まった時は、小さな粒度でデバッグしていくことを忘れない
GitHub