10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Workflow Engine をつくろう! Part 3(Task 間でのデータのやり取り)

Last updated at Posted at 2016-01-06

Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行

Task 間でのデータのやり取り

今回は Task 間でのデータのやり取りを実装していきます。次回以降が少し長くなるのと、実装が簡単なので、今回は短めにまとめます。

Task 間のデータのやり取りとは?

「Task 間のデータのやり取り」とは何かを、いつもの例で説明します。

![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskB;
TaskD -> TaskC;
}
)

「Task 間のデータのやり取り」というのは、正確には「下流(Downstream)Task の output メソッドの戻り値を、上流(Upstream)Task が参照できる」機能のことです。上の図でいうと、TaskB, TaskC は TaskD の、TaskA は TaskB, TaskC の output メソッドの戻り値である Target クラスのインスタンスを参照できます。この機能を持つメソッドを、input メソッドとして追加します。

すると

  • TaskD#output == TaskB#input
  • TaskD#output == TaskC#input
  • TaskB#output == TaskA#input
  • TaskC#output == TaskA#input

こんな感じの関係になります。具体例としては、以下のように、下流 Task の出力内容の和を出力する Task なんてものが書けます。

class GenerateDataTask < Task
  def output
    LocalFileTarget.new("/tmp/data#{Time.now.strftime('%Y-%m-%d')}.txt")
  end

  def run
    # 1から10までの数字をファイルに出力
    File.open(output.path, "w") do |f|
      (1..10).each do |i|
        f.puts i
      end
    end
  end
end

class SumTask < Task
  def requires
    [ GenerateDataTask.new ]
  end

  def output
    LocalFileTarget.new("/tmp/output_#{Time.now.strftime('%Y-%m-%d')}.txt")
  end

  def run
    sum = 0
    # GenerateDataTask#output の戻り値から、ファイル名を取得
    File.foreach(input[0].path) do |line|
      sum += line.to_i
    end
    File.write(output.path, sum) #=> 1から10までの数字の和である55が出力される
  end
end

Workflow.new.run(SumTask.new)

実際の Workflow でよくある日次処理でファイル名に日時が含まれているような動的にファイル名が変わる場合でも、それを意識することなく受け渡すことができます。このようにデータのやり取りを Target クラスで抽象化することで、SumTask の再利用性が上がります。

実装してみよう

今回の実装は Task クラスに以下の input メソッドを足すだけです。簡単ですね。

class Task
  def input
    @input ||= requires.map { |task| task.output }
  end
#(省略)
end

では、実際に前述のサンプルコードを実行してみましょう。

$ ruby example.rb
$ cat /tmp/output_2016-01-06.txt
55

できていますね。

全ソースコードはこちら。
https://gist.github.com/hakobera/03bcc4576b7bacab419f

まとめ

この記事では、Workflow Engine の機能のうち、関連する Task 間でのデータを Target クラスを経由してやり取りする方法について解説しました。

次回は、Task の並列実行について解説します。

Part4へ

10
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?