Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行
Task 間でのデータのやり取り
今回は Task 間でのデータのやり取りを実装していきます。次回以降が少し長くなるのと、実装が簡単なので、今回は短めにまとめます。
Task 間のデータのやり取りとは?
「Task 間のデータのやり取り」とは何かを、いつもの例で説明します。
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskB;
TaskD -> TaskC;
}
)
「Task 間のデータのやり取り」というのは、正確には「下流(Downstream)Task の output メソッドの戻り値を、上流(Upstream)Task が参照できる」機能のことです。上の図でいうと、TaskB, TaskC は TaskD の、TaskA は TaskB, TaskC の output メソッドの戻り値である Target クラスのインスタンスを参照できます。この機能を持つメソッドを、input メソッドとして追加します。
すると
- TaskD#output == TaskB#input
- TaskD#output == TaskC#input
- TaskB#output == TaskA#input
- TaskC#output == TaskA#input
こんな感じの関係になります。具体例としては、以下のように、下流 Task の出力内容の和を出力する Task なんてものが書けます。
class GenerateDataTask < Task
def output
LocalFileTarget.new("/tmp/data#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
# 1から10までの数字をファイルに出力
File.open(output.path, "w") do |f|
(1..10).each do |i|
f.puts i
end
end
end
end
class SumTask < Task
def requires
[ GenerateDataTask.new ]
end
def output
LocalFileTarget.new("/tmp/output_#{Time.now.strftime('%Y-%m-%d')}.txt")
end
def run
sum = 0
# GenerateDataTask#output の戻り値から、ファイル名を取得
File.foreach(input[0].path) do |line|
sum += line.to_i
end
File.write(output.path, sum) #=> 1から10までの数字の和である55が出力される
end
end
Workflow.new.run(SumTask.new)
実際の Workflow でよくある日次処理でファイル名に日時が含まれているような動的にファイル名が変わる場合でも、それを意識することなく受け渡すことができます。このようにデータのやり取りを Target クラスで抽象化することで、SumTask
の再利用性が上がります。
実装してみよう
今回の実装は Task クラスに以下の input メソッドを足すだけです。簡単ですね。
class Task
def input
@input ||= requires.map { |task| task.output }
end
#(省略)
end
では、実際に前述のサンプルコードを実行してみましょう。
$ ruby example.rb
$ cat /tmp/output_2016-01-06.txt
55
できていますね。
全ソースコードはこちら。
https://gist.github.com/hakobera/03bcc4576b7bacab419f
まとめ
この記事では、Workflow Engine の機能のうち、関連する Task 間でのデータを Target クラスを経由してやり取りする方法について解説しました。
次回は、Task の並列実行について解説します。