Edited at

Workflow Engine をつくろう! Part 3(Task 間でのデータのやり取り)

More than 3 years have passed since last update.

Part 1 Task の依存関係の解決

Part 2 Workflow の冪等性

Part 3 Task 間でのデータのやり取り

Part 4 Task の並列実行


Task 間でのデータのやり取り

今回は Task 間でのデータのやり取りを実装していきます。次回以降が少し長くなるのと、実装が簡単なので、今回は短めにまとめます。


Task 間のデータのやり取りとは?

「Task 間のデータのやり取り」とは何かを、いつもの例で説明します。

「Task 間のデータのやり取り」というのは、正確には「下流(Downstream)Task の output メソッドの戻り値を、上流(Upstream)Task が参照できる」機能のことです。上の図でいうと、TaskB, TaskC は TaskD の、TaskA は TaskB, TaskC の output メソッドの戻り値である Target クラスのインスタンスを参照できます。この機能を持つメソッドを、input メソッドとして追加します。

すると


  • TaskD#output == TaskB#input

  • TaskD#output == TaskC#input

  • TaskB#output == TaskA#input

  • TaskC#output == TaskA#input

こんな感じの関係になります。具体例としては、以下のように、下流 Task の出力内容の和を出力する Task なんてものが書けます。

class GenerateDataTask < Task

def output
LocalFileTarget.new("/tmp/data#{Time.now.strftime('%Y-%m-%d')}.txt")
end

def run
# 1から10までの数字をファイルに出力
File.open(output.path, "w") do |f|
(1..10).each do |i|
f.puts i
end
end
end
end

class SumTask < Task
def requires
[ GenerateDataTask.new ]
end

def output
LocalFileTarget.new("/tmp/output_#{Time.now.strftime('%Y-%m-%d')}.txt")
end

def run
sum = 0
# GenerateDataTask#output の戻り値から、ファイル名を取得
File.foreach(input[0].path) do |line|
sum += line.to_i
end
File.write(output.path, sum) #=> 1から10までの数字の和である55が出力される
end
end

Workflow.new.run(SumTask.new)

実際の Workflow でよくある日次処理でファイル名に日時が含まれているような動的にファイル名が変わる場合でも、それを意識することなく受け渡すことができます。このようにデータのやり取りを Target クラスで抽象化することで、SumTask の再利用性が上がります。


実装してみよう

今回の実装は Task クラスに以下の input メソッドを足すだけです。簡単ですね。

class Task

def input
@input ||= requires.map { |task| task.output }
end
#(省略)
end

では、実際に前述のサンプルコードを実行してみましょう。

$ ruby example.rb

$ cat /tmp/output_2016-01-06.txt
55

できていますね。

全ソースコードはこちら。

https://gist.github.com/hakobera/03bcc4576b7bacab419f


まとめ

この記事では、Workflow Engine の機能のうち、関連する Task 間でのデータを Target クラスを経由してやり取りする方法について解説しました。

次回は、Task の並列実行について解説します。

Part4へ