Edited at

Workflow Engine をつくろう! Part 1(Task の依存関係の解決)

More than 3 years have passed since last update.

Part 1 Task の依存関係の解決

Part 2 Workflow の冪等性

Part 3 Task 間でのデータのやり取り

Part 4 Task の並列実行


Workflow Engine って何?

Workflow Engine と言っても多機能なものから、シンプルなものまで様々なものがあります。そこで、主旨がぶれないように、この記事での Workflow Engine は、以下の要件を満たすソフトウェアとします。


  • Workflow Engine とは、依存関係のある複数の Task を、意図した順番通りに実行するもの

この記事では、この要件を満たす Workflow Engine を Ruby でつくる方法について解説します。


依存関係を記述する

依存関係を解決するコードを書く前に、依存関係を記述する方法をまず決めましょう。

依存関係を記述するには、luigi のように Task クラスに書く、ariflow のように worflow 定義時に動的に組み立てる、digdag のように外部の設定ファイルに書く、などの方法が考えられます。今回は「Task クラスに書く」方法を採用します。

この方針に則り、Task クラスは、Task で実行する内容を記述する run メソッドと、依存関係を記述する requires メソッドを持たせます。


task.rb

class Task

def run
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end

def requires
[] # If you need to define task dependencies, override in subclass
end
end


サブクラスでは、必要に応じて requires メソッドをオーバーライドします。依存する Task が無い場合は、requires メソッドをオーバーライドする必要はありません。

例えば、TaskA が TaskB と TaskC に依存する場合は、

class TaskA < Task

def requires
[ TaskB.new, TaskC.new ]
end
end

# TaskB, TaskC はどの Task にも依存していない
class TaskB < Task; end
class TaskC < Task; end

のように書きます。これを図にすると以下のようになります。

Alt text

このように依存関係を記述した図を DAG (Directed Acyclic Graph) と呼びます。日本語だと、有向非巡回グラフ といいます。つまり、ループのない有向グラフです。ループがあったら、終わらない Workflow になってしまうので、通常 Workflow は DAG で表現できます。DAG という言葉は、Workflow Engine のドキュメントに出てくることも多いので、知識として覚えておきましょう。


依存関係を解決する

依存関係が記述できるようになったので、依存関係を解決する仕組みを実装してみましょう。例として、以下のように、1つの Task が2つの Task から依存されている Workflow を考えます。

Alt text


Topological Sort

先ほど Workflow が DAG で表現できると書きましたが、DAG は必ず Topological Sort できることが証明されています。Topological Sort とは、DAG 中の全ての Node をその出力辺の先の Node より前に来るように並べ替えることです。Workflow でいうと、「依存する Task が必ず前に来るように並べ替えること」であり、依存解決のアルゴリズムとして利用できます。ちなみに Make もこのアルゴリズムで依存解決をおこなっています。

Topological Sort 自体は簡単に実装できる 1 のですが、Ruby 標準モジュールとして提供される TSort がこの機能を提供しています。Ruby で実装するので、これを使うことにします。


Task

TSort を利用するために「Task が同値」であることの定義をする必要があります。今回は簡単のために、クラス名が同じ Task は同値であるとします。同値性を実装するために、eql?hash メソッドを override して、追加します。


task.rb

class Task

def eql?(other)
self.hash == other.hash
end

def hash
self.class.name.hash
end
end



DAG

Task を管理する DAG を表現するクラスを新しく追加します。DAG はクラスは TSort を include し、Topological Sort に関連する2つのメソッド tsort_each_nodetsort_each_child を実装します。また、Task を登録する時に、同値である Task が既に登録済みの場合は2重で登録しないようにしておきます。


dag.rb

require 'tsort'

class DAG
include TSort

def initialize
@tasks = {}
end

def tsort_each_node(&block)
@tasks.each_key(&block)
end

def tsort_each_child(node, &block)
@tasks.fetch(node).each(&block)
end

def add_task(task)
_add_task(nil, task)
end

private

def _add_task(parent, task)
unless @tasks[task]
@tasks[task] = task.requires
task.requires.each do |r|
_add_task(task, r)
end
end
task
end
end



Workflow

Workflow は DAG を利用して依存解決をします。ルートとなる Task を登録し、tsort メソッドで解決された順番に通りに run メソッドを実行します。


workflow.rb

require_relative 'dag'

class Workflow
def run(task)
dag = DAG.new
dag.add_task(task)
dag.tsort.each(&:run)
end
end



依存関係を持った Workflow を実行してみる

コードが組み上がったので、実際に workflow を定義して実行してみましょう。


example.rb

require_relative 'workflow'

require_relative 'task'

class EchoTask < Task
def run
puts "#{self.class.name}#run"
end
end

class TaskA < EchoTask
def requires
[ TaskB.new, TaskC.new ]
end
end

class TaskB < EchoTask
def requires
[ TaskD.new ]
end
end

class TaskC < EchoTask
def requires
[ TaskD.new ]
end
end

class TaskD < EchoTask
end

Workflow.new.run(TaskA.new)


実際に実行してみると、workflow の最上流の Task(この場合、TaskA)を Workflow#run メソッドに与えるだけで、依存関係のある全ての Task が意図した順番で実行できていることがわかります。

$ ruby example.rb

TaskD#run
TaskC#run
TaskB#run
TaskA#run

全ソースコードはこちら。

https://gist.github.com/3313a3e01057b5e2906c


まとめ

この記事では Workflow Engine の機能のうち、依存解決を実現する機能を DAG と Topological Sort を利用して実装しました。2回目以降の予定は以下の通りです。順番は入れ替わる可能性があります。


  1. Workflow の冪等性を保証する方法

  2. Task 間でデータのやり取りの方法

  3. Task の並列実行制御の方法

  4. もっと Ruby らしく書けるようにする(追加)

Part 2 Workflow の冪等性





  1. Tarjan の考案したアルゴリズムで、深さ優先探索をベースにした方法で実装できます。Ruby の TSort モジュールもこの Tarjan のアルゴリズムで実装されています