Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行
Task の並列実行
今回は Workflow Engine に Task を並列実行できる機能を実装をします。Workflow Engine が利用されるのは、ある程度以上の数の Task を含む処理になるので、全体の実行時間を短縮するために並列実行の機能はとても重要なものになります。
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
Task2 -> Task1
Task3 -> Task1;
Task4 -> Task1;
Task5 -> Task1;
Task6 -> Task2;
Task6 -> Task3;
Task6 -> Task4;
Task6 -> Task5;
}
)
例えば、上図の Workflow で各 Task が1分かかる処理だとして、単純に順次実行すると6分かかります。しかし、Task2,3,4,5 はそれぞれ依存関係がなく、並列に実行できるので、並列度4で実行した場合、Task6 で1分、Task2-5 で1分、最後に Tsask1 で1分となり、3分と半分の時間で実行可能になります。実際には CPU のコア数だったり、他に動いているプロセスとの兼ね合いでそこまでは速くならないかもしれませんが、間違いなく効果はあります。
Ruby における並列処理
Ruby で並列処理を実装しようと思った場合は、言語サポートの機能内では以下の手法があります。
-
Thread
を利用した複数スレッド -
Kernel.#fork
を利用した複数プロセス -
dRuby
による分散プロセス
それぞれおおまかに特長をまとめると、以下の表のようになります。(評価は相対評価です)
Thread | Kernel.#fork | dRuby | |
---|---|---|---|
スケーラビリティ | 低 | 中 | 高 |
実装・実行環境の構築 | 簡単 | 普通 | 難しい |
これらは排他的ではないので、組み合わせて利用することも可能です。今回は、最もスケールしない1が、最も実装が簡単な Thread を利用して実装していきます。
実装方針
並列実行を処理の概要は以下のようになります。
- 依存関係に基づいて、Task ソートする。ソートした、配列を Task Queue とする
- Task 実行用の Worker スレッドを指定した数だけ用意する
- Worker スレッド内で、Task Queue から Task を取り出す
- Task が実行可能になるまで待つ
- Task を実行する
Task クラスの変更
並列実装をするにあたって、Task が実行可能状態にあるのかを判定する必要があります。これまでの説明から、「Task が実行可能である」というのは「依存する全ての Task が完了している」ことであり、そして、「Task が完了している」というのは、「出力となる Target が存在している」ということです。
これらの判定を行うメソッドを Task クラスに実装していきます。
class Task
# Task が実行可能かどうか
def ready?
requires.all? {|t| t.completed?}
end
# Task が完了しているかどうか
def completed?
_output.all? {|o| o.exists?}
end
private
def _output
o = output
o = [o] unless o.is_a?(Array)
o
end
end
Workflow クラスの拡張
Workflow クラスに Thread を利用した並列処理を追加すると以下のようになります。実装方法はいくつかあると思いますが、ここでは Thread 間で Task Queue を共有し、複数 Thread でそれを参照するようにしました。Ruby の組み込みの Queue クラスは Thread Safe であることが保証されているので、特に難しく考えなくても良いのが嬉しいですね。
require_relative 'dag'
require 'thread'
class Workflow
def initialize(conf={})
@dag = DAG.new
@concurrency = conf[:concurrency] || 1
end
def run(task)
@dag.add_task(task)
parallel(@dag.tsort, @concurrency) do |t|
until t.ready?
puts "#{Time.now} #{t.class.name} is not ready"
sleep 1
end
t.run unless t.completed?
end
end
def parallel(enumerable, concurrency)
q = Queue.new
enumerable.each {|e| q << e}
proc = lambda do
until q.empty?
t = q.pop
puts "#{Time.now} pop task: #{t.class.name}"
yield t
end
end
threads = []
concurrency.times do
threads << Thread.start(&proc)
end
threads.each {|t| t.join}
end
end
並列実行を試してみる
それでは、実際にこの実装が正しく動くのかどうか試してみましょう。本文の最初に書いた図と同じワークフローを実装してみます。ただし、各タスクにかかる時間は5秒とします。
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
Task2 -> Task1
Task3 -> Task1;
Task4 -> Task1;
Task5 -> Task1;
Task6 -> Task2;
Task6 -> Task3;
Task6 -> Task4;
Task6 -> Task5;
}
)
require_relative 'workflow'
require_relative 'task'
require_relative 'local_file_target'
class WaitTask < Task
def output
LocalFileTarget.new("/tmp/data_#{self.class.name}_#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
puts "#{Time.now} start #{self.class.name}"
sleep(5)
path = output.path
File.write(path, "done")
puts "#{Time.now} end #{self.class.name}"
end
end
class Task1 < WaitTask
def requires; [ Task2.new, Task3.new, Task4.new, Task5.new ]; end
end
class Task2 < WaitTask
def requires; [ Task6.new ]; end
end
class Task3 < WaitTask
def requires; [ Task6.new ]; end
end
class Task4 < WaitTask
def requires; [ Task6.new ]; end
end
class Task5 < WaitTask
def requires; [ Task6.new ]; end
end
class Task6 < WaitTask
end
Workflow.new(concurrency: 4).run(Task1.new)
理論通りに行けば、並列度4で実行した場合、Task6 で5秒、Task2-5 で5秒、最後に Tsask1 で5秒となり、大体15秒で全体の処理が終わるはずです。
$ ruby example.rb
2016-03-20 22:00:33 +0900 pop task: Task6
2016-03-20 22:00:33 +0900 pop task: Task2
2016-03-20 22:00:33 +0900 pop task: Task3
2016-03-20 22:00:33 +0900 start Task6
2016-03-20 22:00:33 +0900 Task2 is not ready
2016-03-20 22:00:33 +0900 pop task: Task4
2016-03-20 22:00:33 +0900 Task3 is not ready
2016-03-20 22:00:33 +0900 Task4 is not ready
2016-03-20 22:00:34 +0900 Task3 is not ready
2016-03-20 22:00:34 +0900 Task2 is not ready
2016-03-20 22:00:34 +0900 Task4 is not ready
2016-03-20 22:00:35 +0900 Task2 is not ready
2016-03-20 22:00:35 +0900 Task3 is not ready
2016-03-20 22:00:35 +0900 Task4 is not ready
2016-03-20 22:00:36 +0900 Task3 is not ready
2016-03-20 22:00:36 +0900 Task2 is not ready
2016-03-20 22:00:36 +0900 Task4 is not ready
2016-03-20 22:00:37 +0900 Task3 is not ready
2016-03-20 22:00:37 +0900 Task4 is not ready
2016-03-20 22:00:37 +0900 Task2 is not ready
2016-03-20 22:00:38 +0900 end Task6
2016-03-20 22:00:38 +0900 pop task: Task5
2016-03-20 22:00:38 +0900 start Task5
2016-03-20 22:00:38 +0900 start Task4
2016-03-20 22:00:38 +0900 start Task3
2016-03-20 22:00:38 +0900 start Task2
2016-03-20 22:00:43 +0900 end Task5
2016-03-20 22:00:43 +0900 pop task: Task1
2016-03-20 22:00:43 +0900 Task1 is not ready
2016-03-20 22:00:43 +0900 end Task3
2016-03-20 22:00:43 +0900 end Task2
2016-03-20 22:00:43 +0900 end Task4
2016-03-20 22:00:44 +0900 start Task1
2016-03-20 22:00:49 +0900 end Task1
ということで、16秒で処理が終わりました。ほぼ理論通りの性能が出せました。
全ソースコードはこちら。
まとめ
Thread を利用することで気軽に Task の並列実行ができるようになりました。2
次回は、コードをもっと Ruby らしく書けるように、DSL の導入と、各クラスの変更をしていきます。なお、気が付いている人はいるかもしれませんが、現在の並列実行制御には1つ重大な欠点がありますが、それも改修していきます。
Part5 もっと Ruby らしく (書いたらリンクを張る)
-
なんで Ruby の Thread は Process fork に比べて、スケールしない(コアを全て使い切れない)かについては、 公式ドキュメント や http://qiita.com/motsat/items/8c9b6bc56152444f50a0 や、 http://www.jstorimer.com/blogs/workingwithcode/8100871-nobody-understands-the-gil-part-2-implementation をお読みください。GIL 奥が深い。 ↩
-
今回は解説のために、自前で実装しましたが、実際の場面では自前で実装するよりは parallel を使うことになると思います。parallel を使うことで、簡単に複数プロセスでの実行に処理を切り替えることが可能になります。 ↩