Workflow Engine をつくろう! Part 1(Task の依存関係の解決)

  • 83
    いいね
  • 0
    コメント

Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行

Workflow Engine って何?

Workflow Engine と言っても多機能なものから、シンプルなものまで様々なものがあります。そこで、主旨がぶれないように、この記事での Workflow Engine は、以下の要件を満たすソフトウェアとします。

  • Workflow Engine とは、依存関係のある複数の Task を、意図した順番通りに実行するもの

この記事では、この要件を満たす Workflow Engine を Ruby でつくる方法について解説します。

依存関係を記述する

依存関係を解決するコードを書く前に、依存関係を記述する方法をまず決めましょう。

依存関係を記述するには、luigi のように Task クラスに書く、ariflow のように worflow 定義時に動的に組み立てる、digdag のように外部の設定ファイルに書く、などの方法が考えられます。今回は「Task クラスに書く」方法を採用します。

この方針に則り、Task クラスは、Task で実行する内容を記述する run メソッドと、依存関係を記述する requires メソッドを持たせます。

task.rb
class Task
  def run
    raise NotImplementedError, "You must implement #{self.class}##{__method__}"
  end

  def requires
    [] # If you need to define task dependencies, override in subclass
  end
end

サブクラスでは、必要に応じて requires メソッドをオーバーライドします。依存する Task が無い場合は、requires メソッドをオーバーライドする必要はありません。

例えば、TaskA が TaskB と TaskC に依存する場合は、

class TaskA < Task
  def requires
    [ TaskB.new, TaskC.new ]
  end
end

# TaskB, TaskC はどの Task にも依存していない
class TaskB < Task; end
class TaskC < Task; end

のように書きます。これを図にすると以下のようになります。

Alt text

このように依存関係を記述した図を DAG (Directed Acyclic Graph) と呼びます。日本語だと、有向非巡回グラフ といいます。つまり、ループのない有向グラフです。ループがあったら、終わらない Workflow になってしまうので、通常 Workflow は DAG で表現できます。DAG という言葉は、Workflow Engine のドキュメントに出てくることも多いので、知識として覚えておきましょう。

依存関係を解決する

依存関係が記述できるようになったので、依存関係を解決する仕組みを実装してみましょう。例として、以下のように、1つの Task が2つの Task から依存されている Workflow を考えます。

Alt text

Topological Sort

先ほど Workflow が DAG で表現できると書きましたが、DAG は必ず Topological Sort できることが証明されています。Topological Sort とは、DAG 中の全ての Node をその出力辺の先の Node より前に来るように並べ替えることです。Workflow でいうと、「依存する Task が必ず前に来るように並べ替えること」であり、依存解決のアルゴリズムとして利用できます。ちなみに Make もこのアルゴリズムで依存解決をおこなっています。

Topological Sort 自体は簡単に実装できる 1 のですが、Ruby 標準モジュールとして提供される TSort がこの機能を提供しています。Ruby で実装するので、これを使うことにします。

Task

TSort を利用するために「Task が同値」であることの定義をする必要があります。今回は簡単のために、クラス名が同じ Task は同値であるとします。同値性を実装するために、eql?hash メソッドを override して、追加します。

task.rb
class Task
  def eql?(other)
    self.hash == other.hash
  end

  def hash
    self.class.name.hash
  end
end

DAG

Task を管理する DAG を表現するクラスを新しく追加します。DAG はクラスは TSort を include し、Topological Sort に関連する2つのメソッド tsort_each_nodetsort_each_child を実装します。また、Task を登録する時に、同値である Task が既に登録済みの場合は2重で登録しないようにしておきます。

dag.rb
require 'tsort'

class DAG
  include TSort

  def initialize
    @tasks = {}
  end

  def tsort_each_node(&block)
    @tasks.each_key(&block)
  end

  def tsort_each_child(node, &block)
    @tasks.fetch(node).each(&block)
  end

  def add_task(task)
    _add_task(nil, task)
  end

  private

  def _add_task(parent, task)
    unless @tasks[task]    
      @tasks[task] = task.requires
      task.requires.each do |r|
        _add_task(task, r)
      end
    end
    task
  end
end

Workflow

Workflow は DAG を利用して依存解決をします。ルートとなる Task を登録し、tsort メソッドで解決された順番に通りに run メソッドを実行します。

workflow.rb
require_relative 'dag'

class Workflow
  def run(task)
    dag = DAG.new
    dag.add_task(task)
    dag.tsort.each(&:run)
  end
end

依存関係を持った Workflow を実行してみる

コードが組み上がったので、実際に workflow を定義して実行してみましょう。

example.rb
require_relative 'workflow'
require_relative 'task'

class EchoTask < Task
  def run
    puts "#{self.class.name}#run"
  end
end

class TaskA < EchoTask
  def requires
    [ TaskB.new, TaskC.new ]
  end
end

class TaskB < EchoTask
  def requires
    [ TaskD.new ]
  end
end

class TaskC < EchoTask
  def requires
    [ TaskD.new ]
  end
end

class TaskD < EchoTask
end

Workflow.new.run(TaskA.new)

実際に実行してみると、workflow の最上流の Task(この場合、TaskA)を Workflow#run メソッドに与えるだけで、依存関係のある全ての Task が意図した順番で実行できていることがわかります。

$ ruby example.rb
TaskD#run
TaskC#run
TaskB#run
TaskA#run

全ソースコードはこちら。
https://gist.github.com/3313a3e01057b5e2906c

まとめ

この記事では Workflow Engine の機能のうち、依存解決を実現する機能を DAG と Topological Sort を利用して実装しました。2回目以降の予定は以下の通りです。順番は入れ替わる可能性があります。

  1. Workflow の冪等性を保証する方法
  2. Task 間でデータのやり取りの方法
  3. Task の並列実行制御の方法
  4. もっと Ruby らしく書けるようにする(追加)

Part 2 Workflow の冪等性


  1. Tarjan の考案したアルゴリズムで、深さ優先探索をベースにした方法で実装できます。Ruby の TSort モジュールもこの Tarjan のアルゴリズムで実装されています