Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行
Workflow Engine って何?
Workflow Engine と言っても多機能なものから、シンプルなものまで様々なものがあります。そこで、主旨がぶれないように、この記事での Workflow Engine は、以下の要件を満たすソフトウェアとします。
- Workflow Engine とは、依存関係のある複数の Task を、意図した順番通りに実行するもの
この記事では、この要件を満たす Workflow Engine を Ruby でつくる方法について解説します。
依存関係を記述する
依存関係を解決するコードを書く前に、依存関係を記述する方法をまず決めましょう。
依存関係を記述するには、luigi のように Task クラスに書く、ariflow のように worflow 定義時に動的に組み立てる、digdag のように外部の設定ファイルに書く、などの方法が考えられます。今回は「Task クラスに書く」方法を採用します。
この方針に則り、Task クラスは、Task で実行する内容を記述する run メソッドと、依存関係を記述する requires メソッドを持たせます。
class Task
def run
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
def requires
[] # If you need to define task dependencies, override in subclass
end
end
サブクラスでは、必要に応じて requires メソッドをオーバーライドします。依存する Task が無い場合は、requires メソッドをオーバーライドする必要はありません。
例えば、TaskA が TaskB と TaskC に依存する場合は、
class TaskA < Task
def requires
[ TaskB.new, TaskC.new ]
end
end
# TaskB, TaskC はどの Task にも依存していない
class TaskB < Task; end
class TaskC < Task; end
のように書きます。これを図にすると以下のようになります。
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA;
TaskC -> TaskA;
}
)
このように依存関係を記述した図を DAG (Directed Acyclic Graph) と呼びます。日本語だと、有向非巡回グラフ といいます。つまり、ループのない有向グラフです。ループがあったら、終わらない Workflow になってしまうので、通常 Workflow は DAG で表現できます。DAG という言葉は、Workflow Engine のドキュメントに出てくることも多いので、知識として覚えておきましょう。
依存関係を解決する
依存関係が記述できるようになったので、依存関係を解決する仕組みを実装してみましょう。例として、以下のように、1つの Task が2つの Task から依存されている Workflow を考えます。
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskB;
TaskD -> TaskC;
}
)
Topological Sort
先ほど Workflow が DAG で表現できると書きましたが、DAG は必ず Topological Sort できることが証明されています。Topological Sort とは、DAG 中の全ての Node をその出力辺の先の Node より前に来るように並べ替えることです。Workflow でいうと、「依存する Task が必ず前に来るように並べ替えること」であり、依存解決のアルゴリズムとして利用できます。ちなみに Make もこのアルゴリズムで依存解決をおこなっています。
Topological Sort 自体は簡単に実装できる 1 のですが、Ruby 標準モジュールとして提供される TSort がこの機能を提供しています。Ruby で実装するので、これを使うことにします。
Task
TSort を利用するために「Task が同値」であることの定義をする必要があります。今回は簡単のために、クラス名が同じ Task は同値であるとします。同値性を実装するために、eql?
と hash
メソッドを override して、追加します。
class Task
def eql?(other)
self.hash == other.hash
end
def hash
self.class.name.hash
end
end
DAG
Task を管理する DAG を表現するクラスを新しく追加します。DAG はクラスは TSort を include し、Topological Sort に関連する2つのメソッド tsort_each_node
と tsort_each_child
を実装します。また、Task を登録する時に、同値である Task が既に登録済みの場合は2重で登録しないようにしておきます。
require 'tsort'
class DAG
include TSort
def initialize
@tasks = {}
end
def tsort_each_node(&block)
@tasks.each_key(&block)
end
def tsort_each_child(node, &block)
@tasks.fetch(node).each(&block)
end
def add_task(task)
_add_task(nil, task)
end
private
def _add_task(parent, task)
unless @tasks[task]
@tasks[task] = task.requires
task.requires.each do |r|
_add_task(task, r)
end
end
task
end
end
Workflow
Workflow は DAG を利用して依存解決をします。ルートとなる Task を登録し、tsort メソッドで解決された順番に通りに run メソッドを実行します。
require_relative 'dag'
class Workflow
def run(task)
dag = DAG.new
dag.add_task(task)
dag.tsort.each(&:run)
end
end
依存関係を持った Workflow を実行してみる
コードが組み上がったので、実際に workflow を定義して実行してみましょう。
require_relative 'workflow'
require_relative 'task'
class EchoTask < Task
def run
puts "#{self.class.name}#run"
end
end
class TaskA < EchoTask
def requires
[ TaskB.new, TaskC.new ]
end
end
class TaskB < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskC < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskD < EchoTask
end
Workflow.new.run(TaskA.new)
実際に実行してみると、workflow の最上流の Task(この場合、TaskA)を Workflow#run
メソッドに与えるだけで、依存関係のある全ての Task が意図した順番で実行できていることがわかります。
$ ruby example.rb
TaskD#run
TaskC#run
TaskB#run
TaskA#run
全ソースコードはこちら。
https://gist.github.com/3313a3e01057b5e2906c
まとめ
この記事では Workflow Engine の機能のうち、依存解決を実現する機能を DAG と Topological Sort を利用して実装しました。2回目以降の予定は以下の通りです。順番は入れ替わる可能性があります。
- Workflow の冪等性を保証する方法
- Task 間でデータのやり取りの方法
- Task の並列実行制御の方法
- もっと Ruby らしく書けるようにする(追加)
-
Tarjan の考案したアルゴリズムで、深さ優先探索をベースにした方法で実装できます。Ruby の TSort モジュールもこの Tarjan のアルゴリズムで実装されています ↩