169
165

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

並列に実行するんだけど並列度も制限したい時

Posted at

まあ例を挙げて話したほうがわかりやすい気がするので例をあげると、たとえばあなたはオンプレミスからコンテナまで総計数千台のホスト的な何かで構成されたシステムを管理する必要があります、と。この規模になってくると当然日々何かが壊れては何かが直っていくのはそういうものですね。で、じゃあ、刻一刻と変わる管理台帳と実際の環境の整合性をチェックすることを考える、と。すると要件としては、

  1. 管理台帳から存在するはずのホスト名とそのIPアドレスの一覧を引っ張ってくる
  2. そこからまずホスト名をババババと名前解決して管理台帳どおりのIPアドレスが解決されるか確認する
  3. 次にそいつらが生きてると限らんので、解決されたIPアドレスに向かってICMP Echoパケットをババババと打ちまくって反応を見る
  4. ping応答あった「なにか」が、本当に管理台帳どおりのホストとは限らないので、とりあえずsshで入れるか確認する
    • 入れたらgetentなどを見て本当にそのホストか確認する
    • /etc/hostsにへんな行が書かれてると困るのでdiffを見る
    • 等等。serverspecとか。夢が膨らむ箇所。
  5. 以上の処理を(一個ずつやってたのでは間尺に合わんので)一気にやる
  6. 以上の処理を(一気にやると名前解決だけでも負荷で死ねるので)徐々にやる
  7. どれかのホストがどれかのフェーズで失敗したら全体を停止してメールかなにかでアラートをあげる

こんなもん? まあ5と6が問題なわけです。どうするか、と。


上記、単に並列度を制限したいだけならスレッドプールでやればいいですね。でも処理同士に依存関係がある。ちょっと一般化して考えてみます。とある処理はとある処理に依存してる。でも依存してない処理もある。依存してない同士は並列に動いていい。けど依存してる同士は動いちゃいかん。

依存というワードが出てきたので言語開発者としてはdependency analysisとかautomatic parallelizmとかそういう方向に頭が行きがちなのだけれど。でも今回は単にping打ちすぎて通信が死ぬのを回避したいだけなわけで、それはちょっとoverkillの気がする。

あと実用上の類似例としてはmake -j4とかそういう系のやつがありますね。Rakeにも実は-jがあるのでこれを実は真剣にコード読んだりしてみたのだけど。たしかにRakeで俺がやりたいことはできる。それは確認した。だけどやっぱ、依存関係を記述するのが面倒なのですね(もっというと正しい依存関係を持ったタスク群をホストごとに動的に構成するのが面倒)。今回やりたいことは一本道の処理が横にものすごく沢山並んでるという状況なのであって、依存の分岐とかそういうの、ないわけです。なので面倒やだ。Rakefileですらこのニーズにはoverkill。


というわけでしょうがないので自分で書くわけですが。

  • タスク依存関係があるのでTaskクラスは必要
  • 並列度の制限があるのでThreadPoolクラスは必要
  • クラス依存関係はただのMutexではダメ、前後関係なので生産者消費者問題が解決できる何かが必要

くらいの要件ですかねえ。こんなもん?

pool.rb
require 'etc'
require 'thread'

class Pool
  def initialize n = Etc.nprocessors
    @n = n
    @q = Queue.new
  end

  def process job
    @q.enq job
  end

  def wait
    Array.new @n do
      Thread.start do
        begin
          while j = @q.deq(true) do
            j.call
          end
        rescue ThreadError
          Thread.exit # OK, end of jobs
        end
      end
    end.each(&:join)
    self
  end
end

上記はすごく基本的なスレッドプール実装ね、これは機能的にはミニマルだけどそのぶん短くてバグもない。スレッドプールがバグってるとか悪夢でしかないので。このくらいシンプルなほうが安心。

task.rb
require 'thread'
require_relative 'pool'

class Task
  def initialize pool, cond = nil
    unless cond then
      cond = Queue.new
      cond.enq true
    end
    @pool = pool
    @job  = Proc.new
    @rx   = cond
    @tx   = Queue.new
  end

  def call
    return unless @rx.deq # wait
    val = @job.call
  ensure
    @tx.enq val
  end

  def then
    Thread.pass # hack
    t = self.class.new @pool, @tx do yield end
    @pool.process t
    return t
  end
end

これは前が詰まると後ろが詰まるの実装。#thenというメソッドはコンストラクタになっていて、これ経由で呼べば前後関係ができる。ブロック評価結果が真だと後ろに続く感じ。で、使うほうとしては

main.rb
require_relative 'task'
require_relative 'pool'
require 'resolv'
require 'net-ping'
require 'net-ssh'

p = Pool.new

while true do
  hosts = SELECT * FROM `hosts` WHERE `installed_at` IS NOT NULL; # etc. like that.
  hosts.each do |x|
    Thread.start x do |y|
      hostname  = y['hostname']  # or something.
      ipaddress = y['ipaddress'] # or something.

      Task.new(p) {

        Resolv.getaddress(hostname) == ipaddress

      }.then {

        Net::Ping::ICMP.new(ipaddress).ping?

      }.then {

        result = nil
        Net::SSH.start(ipaddress, Etc.getlogin) do |ssh|
          result = ssh.exec!("hostname")
        end
        result == hostname

      }.then {

        result = nil
        Net::SSH.start(ipaddress, Etc.getlogin) do |ssh|
          result = ssh.exec!("getent hosts #{hostname}")
        end
        result.split.first == ipaddress

      }.then {
        ...
      }
    end
  end

  p.wait
end

こんな感じじゃないですかね。これでたぶん今のところ動いてそうだ。

169
165
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
169
165

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?