85
91

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Workflow Engine をつくろう! Part 1(Task の依存関係の解決)

Last updated at Posted at 2015-12-27

Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行

Workflow Engine って何?

Workflow Engine と言っても多機能なものから、シンプルなものまで様々なものがあります。そこで、主旨がぶれないように、この記事での Workflow Engine は、以下の要件を満たすソフトウェアとします。

  • Workflow Engine とは、依存関係のある複数の Task を、意図した順番通りに実行するもの

この記事では、この要件を満たす Workflow Engine を Ruby でつくる方法について解説します。

依存関係を記述する

依存関係を解決するコードを書く前に、依存関係を記述する方法をまず決めましょう。

依存関係を記述するには、luigi のように Task クラスに書く、ariflow のように worflow 定義時に動的に組み立てる、digdag のように外部の設定ファイルに書く、などの方法が考えられます。今回は「Task クラスに書く」方法を採用します。

この方針に則り、Task クラスは、Task で実行する内容を記述する run メソッドと、依存関係を記述する requires メソッドを持たせます。

task.rb
class Task
  def run
    raise NotImplementedError, "You must implement #{self.class}##{__method__}"
  end

  def requires
    [] # If you need to define task dependencies, override in subclass
  end
end

サブクラスでは、必要に応じて requires メソッドをオーバーライドします。依存する Task が無い場合は、requires メソッドをオーバーライドする必要はありません。

例えば、TaskA が TaskB と TaskC に依存する場合は、

class TaskA < Task
  def requires
    [ TaskB.new, TaskC.new ]
  end
end

# TaskB, TaskC はどの Task にも依存していない
class TaskB < Task; end
class TaskC < Task; end

のように書きます。これを図にすると以下のようになります。

![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA;
TaskC -> TaskA;
}
)

このように依存関係を記述した図を DAG (Directed Acyclic Graph) と呼びます。日本語だと、有向非巡回グラフ といいます。つまり、ループのない有向グラフです。ループがあったら、終わらない Workflow になってしまうので、通常 Workflow は DAG で表現できます。DAG という言葉は、Workflow Engine のドキュメントに出てくることも多いので、知識として覚えておきましょう。

依存関係を解決する

依存関係が記述できるようになったので、依存関係を解決する仕組みを実装してみましょう。例として、以下のように、1つの Task が2つの Task から依存されている Workflow を考えます。

![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskB;
TaskD -> TaskC;
}
)

Topological Sort

先ほど Workflow が DAG で表現できると書きましたが、DAG は必ず Topological Sort できることが証明されています。Topological Sort とは、DAG 中の全ての Node をその出力辺の先の Node より前に来るように並べ替えることです。Workflow でいうと、「依存する Task が必ず前に来るように並べ替えること」であり、依存解決のアルゴリズムとして利用できます。ちなみに Make もこのアルゴリズムで依存解決をおこなっています。

Topological Sort 自体は簡単に実装できる 1 のですが、Ruby 標準モジュールとして提供される TSort がこの機能を提供しています。Ruby で実装するので、これを使うことにします。

Task

TSort を利用するために「Task が同値」であることの定義をする必要があります。今回は簡単のために、クラス名が同じ Task は同値であるとします。同値性を実装するために、eql?hash メソッドを override して、追加します。

task.rb
class Task
  def eql?(other)
    self.hash == other.hash
  end

  def hash
    self.class.name.hash
  end
end

DAG

Task を管理する DAG を表現するクラスを新しく追加します。DAG はクラスは TSort を include し、Topological Sort に関連する2つのメソッド tsort_each_nodetsort_each_child を実装します。また、Task を登録する時に、同値である Task が既に登録済みの場合は2重で登録しないようにしておきます。

dag.rb
require 'tsort'

class DAG
  include TSort

  def initialize
    @tasks = {}
  end
  
  def tsort_each_node(&block)
    @tasks.each_key(&block)
  end

  def tsort_each_child(node, &block)
    @tasks.fetch(node).each(&block)
  end

  def add_task(task)
    _add_task(nil, task)
  end

  private

  def _add_task(parent, task)
    unless @tasks[task]    
      @tasks[task] = task.requires
      task.requires.each do |r|
        _add_task(task, r)
      end
    end
    task
  end
end

Workflow

Workflow は DAG を利用して依存解決をします。ルートとなる Task を登録し、tsort メソッドで解決された順番に通りに run メソッドを実行します。

workflow.rb
require_relative 'dag'

class Workflow
  def run(task)
    dag = DAG.new
    dag.add_task(task)
    dag.tsort.each(&:run)
  end
end

依存関係を持った Workflow を実行してみる

コードが組み上がったので、実際に workflow を定義して実行してみましょう。

example.rb
require_relative 'workflow'
require_relative 'task'

class EchoTask < Task
  def run
    puts "#{self.class.name}#run"
  end
end

class TaskA < EchoTask
  def requires
    [ TaskB.new, TaskC.new ]
  end
end

class TaskB < EchoTask
  def requires
    [ TaskD.new ]
  end
end

class TaskC < EchoTask
  def requires
    [ TaskD.new ]
  end
end

class TaskD < EchoTask
end

Workflow.new.run(TaskA.new)

実際に実行してみると、workflow の最上流の Task(この場合、TaskA)を Workflow#run メソッドに与えるだけで、依存関係のある全ての Task が意図した順番で実行できていることがわかります。

$ ruby example.rb
TaskD#run
TaskC#run
TaskB#run
TaskA#run

全ソースコードはこちら。
https://gist.github.com/3313a3e01057b5e2906c

まとめ

この記事では Workflow Engine の機能のうち、依存解決を実現する機能を DAG と Topological Sort を利用して実装しました。2回目以降の予定は以下の通りです。順番は入れ替わる可能性があります。

  1. Workflow の冪等性を保証する方法
  2. Task 間でデータのやり取りの方法
  3. Task の並列実行制御の方法
  4. もっと Ruby らしく書けるようにする(追加)

Part 2 Workflow の冪等性

  1. Tarjan の考案したアルゴリズムで、深さ優先探索をベースにした方法で実装できます。Ruby の TSort モジュールもこの Tarjan のアルゴリズムで実装されています

85
91
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
85
91

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?