Posted at

並列に実行するんだけど並列度も制限したい時

More than 3 years have passed since last update.

まあ例を挙げて話したほうがわかりやすい気がするので例をあげると、たとえばあなたはオンプレミスからコンテナまで総計数千台のホスト的な何かで構成されたシステムを管理する必要があります、と。この規模になってくると当然日々何かが壊れては何かが直っていくのはそういうものですね。で、じゃあ、刻一刻と変わる管理台帳と実際の環境の整合性をチェックすることを考える、と。すると要件としては、


  1. 管理台帳から存在するはずのホスト名とそのIPアドレスの一覧を引っ張ってくる

  2. そこからまずホスト名をババババと名前解決して管理台帳どおりのIPアドレスが解決されるか確認する

  3. 次にそいつらが生きてると限らんので、解決されたIPアドレスに向かってICMP Echoパケットをババババと打ちまくって反応を見る

  4. ping応答あった「なにか」が、本当に管理台帳どおりのホストとは限らないので、とりあえずsshで入れるか確認する


    • 入れたらgetentなどを見て本当にそのホストか確認する

    • /etc/hostsにへんな行が書かれてると困るのでdiffを見る

    • 等等。serverspecとか。夢が膨らむ箇所。



  5. 以上の処理を(一個ずつやってたのでは間尺に合わんので)一気にやる

  6. 以上の処理を(一気にやると名前解決だけでも負荷で死ねるので)徐々にやる

  7. どれかのホストがどれかのフェーズで失敗したら全体を停止してメールかなにかでアラートをあげる

こんなもん? まあ5と6が問題なわけです。どうするか、と。


上記、単に並列度を制限したいだけならスレッドプールでやればいいですね。でも処理同士に依存関係がある。ちょっと一般化して考えてみます。とある処理はとある処理に依存してる。でも依存してない処理もある。依存してない同士は並列に動いていい。けど依存してる同士は動いちゃいかん。

依存というワードが出てきたので言語開発者としてはdependency analysisとかautomatic parallelizmとかそういう方向に頭が行きがちなのだけれど。でも今回は単にping打ちすぎて通信が死ぬのを回避したいだけなわけで、それはちょっとoverkillの気がする。

あと実用上の類似例としてはmake -j4とかそういう系のやつがありますね。Rakeにも実は-jがあるのでこれを実は真剣にコード読んだりしてみたのだけど。たしかにRakeで俺がやりたいことはできる。それは確認した。だけどやっぱ、依存関係を記述するのが面倒なのですね(もっというと正しい依存関係を持ったタスク群をホストごとに動的に構成するのが面倒)。今回やりたいことは一本道の処理が横にものすごく沢山並んでるという状況なのであって、依存の分岐とかそういうの、ないわけです。なので面倒やだ。Rakefileですらこのニーズにはoverkill。


というわけでしょうがないので自分で書くわけですが。


  • タスク依存関係があるのでTaskクラスは必要

  • 並列度の制限があるのでThreadPoolクラスは必要

  • クラス依存関係はただのMutexではダメ、前後関係なので生産者消費者問題が解決できる何かが必要

くらいの要件ですかねえ。こんなもん?


pool.rb

require 'etc'

require 'thread'

class Pool
def initialize n = Etc.nprocessors
@n = n
@q = Queue.new
end

def process job
@q.enq job
end

def wait
Array.new @n do
Thread.start do
begin
while j = @q.deq(true) do
j.call
end
rescue ThreadError
Thread.exit # OK, end of jobs
end
end
end.each(&:join)
self
end
end


上記はすごく基本的なスレッドプール実装ね、これは機能的にはミニマルだけどそのぶん短くてバグもない。スレッドプールがバグってるとか悪夢でしかないので。このくらいシンプルなほうが安心。


task.rb

require 'thread'

require_relative 'pool'

class Task
def initialize pool, cond = nil
unless cond then
cond = Queue.new
cond.enq true
end
@pool = pool
@job = Proc.new
@rx = cond
@tx = Queue.new
end

def call
return unless @rx.deq # wait
val = @job.call
ensure
@tx.enq val
end

def then
Thread.pass # hack
t = self.class.new @pool, @tx do yield end
@pool.process t
return t
end
end


これは前が詰まると後ろが詰まるの実装。#thenというメソッドはコンストラクタになっていて、これ経由で呼べば前後関係ができる。ブロック評価結果が真だと後ろに続く感じ。で、使うほうとしては


main.rb

require_relative 'task'

require_relative 'pool'
require 'resolv'
require 'net-ping'
require 'net-ssh'

p = Pool.new

while true do
hosts = SELECT * FROM `hosts` WHERE `installed_at` IS NOT NULL; # etc. like that.
hosts.each do |x|
Thread.start x do |y|
hostname = y['hostname'] # or something.
ipaddress = y['ipaddress'] # or something.

Task.new(p) {

Resolv.getaddress(hostname) == ipaddress

}.then {

Net::Ping::ICMP.new(ipaddress).ping?

}.then {

result = nil
Net::SSH.start(ipaddress, Etc.getlogin) do |ssh|
result = ssh.exec!("hostname")
end
result == hostname

}.then {

result = nil
Net::SSH.start(ipaddress, Etc.getlogin) do |ssh|
result = ssh.exec!("getent hosts #{hostname}")
end
result.split.first == ipaddress

}.then {
...
}
end
end

p.wait
end


こんな感じじゃないですかね。これでたぶん今のところ動いてそうだ。