Workflow Engine をつくろう! Part 4(Task の並列実行)

  • 5
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行

Task の並列実行

今回は Workflow Engine に Task を並列実行できる機能を実装をします。Workflow Engine が利用されるのは、ある程度以上の数の Task を含む処理になるので、全体の実行時間を短縮するために並列実行の機能はとても重要なものになります。

Alt text

例えば、上図の Workflow で各 Task が1分かかる処理だとして、単純に順次実行すると6分かかります。しかし、Task2,3,4,5 はそれぞれ依存関係がなく、並列に実行できるので、並列度4で実行した場合、Task6 で1分、Task2-5 で1分、最後に Tsask1 で1分となり、3分と半分の時間で実行可能になります。実際には CPU のコア数だったり、他に動いているプロセスとの兼ね合いでそこまでは速くならないかもしれませんが、間違いなく効果はあります。

Ruby における並列処理

Ruby で並列処理を実装しようと思った場合は、言語サポートの機能内では以下の手法があります。

  • Thread を利用した複数スレッド
  • Kernel.#fork を利用した複数プロセス
  • dRuby による分散プロセス

それぞれおおまかに特長をまとめると、以下の表のようになります。(評価は相対評価です)

Thread Kernel.#fork dRuby
スケーラビリティ
実装・実行環境の構築 簡単 普通 難しい

これらは排他的ではないので、組み合わせて利用することも可能です。今回は、最もスケールしない1が、最も実装が簡単な Thread を利用して実装していきます。

実装方針

並列実行を処理の概要は以下のようになります。

  1. 依存関係に基づいて、Task ソートする。ソートした、配列を Task Queue とする
  2. Task 実行用の Worker スレッドを指定した数だけ用意する
  3. Worker スレッド内で、Task Queue から Task を取り出す
  4. Task が実行可能になるまで待つ
  5. Task を実行する

Task クラスの変更

並列実装をするにあたって、Task が実行可能状態にあるのかを判定する必要があります。これまでの説明から、「Task が実行可能である」というのは「依存する全ての Task が完了している」ことであり、そして、「Task が完了している」というのは、「出力となる Target が存在している」ということです。

これらの判定を行うメソッドを Task クラスに実装していきます。

task.rb
class Task
  # Task が実行可能かどうか
  def ready?
    requires.all? {|t| t.completed?}
  end  

  # Task が完了しているかどうか
  def completed?
    _output.all? {|o| o.exists?}
  end

  private

  def _output
    o = output
    o = [o] unless o.is_a?(Array)
    o
  end
end

Workflow クラスの拡張

Workflow クラスに Thread を利用した並列処理を追加すると以下のようになります。実装方法はいくつかあると思いますが、ここでは Thread 間で Task Queue を共有し、複数 Thread でそれを参照するようにしました。Ruby の組み込みの Queue クラスは Thread Safe であることが保証されているので、特に難しく考えなくても良いのが嬉しいですね。

workflow.rb
require_relative 'dag'
require 'thread'

class Workflow
  def initialize(conf={})
    @dag = DAG.new
    @concurrency = conf[:concurrency] || 1
  end

  def run(task)
    @dag.add_task(task)
    parallel(@dag.tsort, @concurrency) do |t|
      until t.ready?
        puts "#{Time.now} #{t.class.name} is not ready"
        sleep 1
      end
      t.run unless t.completed?
    end
  end

  def parallel(enumerable, concurrency)
    q = Queue.new
    enumerable.each {|e| q << e}

    proc = lambda do
      until q.empty?
        t = q.pop
        puts "#{Time.now} pop task: #{t.class.name}"
        yield t
      end
    end

    threads = []
    concurrency.times do
      threads << Thread.start(&proc)
    end
    threads.each {|t| t.join}
  end
end

並列実行を試してみる

それでは、実際にこの実装が正しく動くのかどうか試してみましょう。本文の最初に書いた図と同じワークフローを実装してみます。ただし、各タスクにかかる時間は5秒とします。

Alt text

example.rb
require_relative 'workflow'
require_relative 'task'
require_relative 'local_file_target'

class WaitTask < Task
  def output
    LocalFileTarget.new("/tmp/data_#{self.class.name}_#{Time.now.strftime('%Y-%m-%d')}.txt")
  end

  def run
    puts "#{Time.now} start #{self.class.name}"
    sleep(5)
    path = output.path
    File.write(path, "done")
    puts "#{Time.now} end #{self.class.name}"
  end
end

class Task1 < WaitTask
  def requires; [ Task2.new, Task3.new, Task4.new, Task5.new ]; end
end

class Task2 < WaitTask
  def requires; [ Task6.new ]; end
end

class Task3 < WaitTask
  def requires; [ Task6.new ]; end
end

class Task4 < WaitTask
  def requires; [ Task6.new ]; end
end

class Task5 < WaitTask
  def requires; [ Task6.new ]; end
end

class Task6 < WaitTask
end

Workflow.new(concurrency: 4).run(Task1.new)

理論通りに行けば、並列度4で実行した場合、Task6 で5秒、Task2-5 で5秒、最後に Tsask1 で5秒となり、大体15秒で全体の処理が終わるはずです。

$ ruby example.rb
2016-03-20 22:00:33 +0900 pop task: Task6
2016-03-20 22:00:33 +0900 pop task: Task2
2016-03-20 22:00:33 +0900 pop task: Task3
2016-03-20 22:00:33 +0900 start Task6
2016-03-20 22:00:33 +0900 Task2 is not ready
2016-03-20 22:00:33 +0900 pop task: Task4
2016-03-20 22:00:33 +0900 Task3 is not ready
2016-03-20 22:00:33 +0900 Task4 is not ready
2016-03-20 22:00:34 +0900 Task3 is not ready
2016-03-20 22:00:34 +0900 Task2 is not ready
2016-03-20 22:00:34 +0900 Task4 is not ready
2016-03-20 22:00:35 +0900 Task2 is not ready
2016-03-20 22:00:35 +0900 Task3 is not ready
2016-03-20 22:00:35 +0900 Task4 is not ready
2016-03-20 22:00:36 +0900 Task3 is not ready
2016-03-20 22:00:36 +0900 Task2 is not ready
2016-03-20 22:00:36 +0900 Task4 is not ready
2016-03-20 22:00:37 +0900 Task3 is not ready
2016-03-20 22:00:37 +0900 Task4 is not ready
2016-03-20 22:00:37 +0900 Task2 is not ready
2016-03-20 22:00:38 +0900 end Task6
2016-03-20 22:00:38 +0900 pop task: Task5
2016-03-20 22:00:38 +0900 start Task5
2016-03-20 22:00:38 +0900 start Task4
2016-03-20 22:00:38 +0900 start Task3
2016-03-20 22:00:38 +0900 start Task2
2016-03-20 22:00:43 +0900 end Task5
2016-03-20 22:00:43 +0900 pop task: Task1
2016-03-20 22:00:43 +0900 Task1 is not ready
2016-03-20 22:00:43 +0900 end Task3
2016-03-20 22:00:43 +0900 end Task2
2016-03-20 22:00:43 +0900 end Task4
2016-03-20 22:00:44 +0900 start Task1
2016-03-20 22:00:49 +0900 end Task1

ということで、16秒で処理が終わりました。ほぼ理論通りの性能が出せました。

全ソースコードはこちら。

https://gist.github.com/hakobera/fd4ff0a09c493713ce13

まとめ

Thread を利用することで気軽に Task の並列実行ができるようになりました。2

次回は、コードをもっと Ruby らしく書けるように、DSL の導入と、各クラスの変更をしていきます。なお、気が付いている人はいるかもしれませんが、現在の並列実行制御には1つ重大な欠点がありますが、それも改修していきます。

Part5 もっと Ruby らしく (書いたらリンクを張る)


  1. なんで Ruby の Thread は Process fork に比べて、スケールしない(コアを全て使い切れない)かについては、 公式ドキュメントhttp://qiita.com/motsat/items/8c9b6bc56152444f50a0 や、 http://www.jstorimer.com/blogs/workingwithcode/8100871-nobody-understands-the-gil-part-2-implementation をお読みください。GIL 奥が深い。 

  2. 今回は解説のために、自前で実装しましたが、実際の場面では自前で実装するよりは parallel を使うことになると思います。parallel を使うことで、簡単に複数プロセスでの実行に処理を切り替えることが可能になります。