Part 1 Task の依存関係の解決
Part 2 Workflow の冪等性
Part 3 Task 間でのデータのやり取り
Part 4 Task の並列実行
まえおき
この連載記事で作成する Workflow Engine は、Luigi の設計思想に大きく影響を受けています。なので、 @k24d さんの Luigi によるワークフロー管理 を先に読んでおくと、理解が深まると思います。
前回は Task の依存関係の解決方法を実装しましたが、Part 2の今回は Workflow の冪等性について実装していきます。
Workflow の冪等性
Workflow を実行、制御する上で、冪等性(何回実行しても結果が同じであること)は大変重要です。なぜ重要かというと、現実問題として Workflow は途中で失敗する可能性があり、いかに確実に簡単にリトライ処理ができるかが、業務システムを運用の手間に直結するからです。
リトライと冪等性のデザインパターンについては、@frsyuki さんのブログの記事がとても良いので、読んでみると良いと思います。
例をあげて考える
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskB;
TaskD -> TaskC;
TaskB[style=filled, color=red];
TaskC[style=filled, color=green];
TaskD[style=filled, color=green];
}
)
上記のような Workflow で、4つの Task のうち、2つが成功し、1つが失敗、1つが未実行のケース考えてみます。このケースでは、TaskB をリトライし、次に TaskA を実行して Workflow のリトライに成功することがわかります。このように Workflow のリトライとは、Workflow の依存関係を考慮しながら、失敗した Task とその上流の Task を実行することで実現できます。
ただし、この程度の Workflow なら手作業でのリトライもできますが、以下ように Workflow が複雑になってくると、どの順番で再実行すれば良いのかをすぐに整理、理解するのはとても困難です。(ちなみに TaskL -> B -> K -> C -> I -> A と実行すれば良いです)
![Alt text](http://g.gravizo.com/g?
digraph G {
rankdir=LR;
TaskB -> TaskA
TaskC -> TaskA;
TaskD -> TaskC;
TaskE -> TaskB;
TaskF -> TaskK;
TaskG -> TaskB;
TaskH -> TaskB;
TaskI -> TaskA;
TaskJ -> TaskI;
TaskK -> TaskI;
TaskL -> TaskI;
TaskL -> TaskC;
TaskF -> TaskL;
TaskE -> TaskD;
TaskD[style=filled, color=green];
TaskE[style=filled, color=green];
TaskF[style=filled, color=green];
TaskG[style=filled, color=green];
TaskH[style=filled, color=green];
TaskJ[style=filled, color=green];
TaskL[style=filled, color=red];
}
)
もっと複雑になったらと考えると、頭が痛くなってきますね。しかし、これをシンプルに解決する方法があります。それは Task 自体に冪等性を保証する仕組みを導入することで、Workflow Engine が状態の管理をしなくても良くすることです。これは、前述の記事の中でも取り上げられている「パターン4:操作を細かくして信頼性を高める」で説明されている「個々の操作を冪等にし、細かい粒度でリトライできるようにする」方法です。
Task が冪等性を持っていれば、Workflow Engine のリトライ処理は、全ての Task をまとめて再実行するだけで済むようになります。ただし、そのまま再実行してしまうと、既に完了済みの Task も再実行されてしまい、出力結果が2倍になったり、時間がかかりすぎるという問題があります。全体で8時間かかる Workflow が7時間経過した所で失敗した場合、再度8時間かかるというのでは実運用では大問題です。この問題の解決方法は完了済みの Task は再実行時に run メソッドの処理がスキップされるようにすれば良いです。
整理すると、
- Workflow 内の個々の Task が冪等性を持てば、リトライ処理は全 Task の再実行というシンプルな処理になる
- Task は自身が完了済みかどうかを把握する仕組みを持っている
- Task が完了済みの場合、再実行時に run メソッドの処理をスキップする
となります。それではこれを実現するコードを実装していきましょう。
Target クラスの導入
「Task は自身が完了済みかどうかを把握する仕組み」を表現するクラスとして、Target クラスを導入します。Target クラスは抽象クラスで、自身が存在するかどうかを判定する exists? メソッドを持ちます。
class Target
def exists?
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
end
それでは Target クラスの具象クラスとして、File と対応する FileTarget クラスを定義してみましょう。
class FileTarget < Target
attr_reader :path
def initialize(path)
@path = path
end
def exists?
File.exist?(path)
end
end
インスタンス作成時に渡した path に対応するファイルが存在する場合は、true を返します。Target クラスはこれだけです。Workflow Engine では FileTarget のようによく使うものを組み込みクラスとして提供することで、利用者は Target クラスを定義することなく使えるようにします。次に、この Target クラスを Task クラスでどのように利用するのかを解説します。
Task クラスの拡張
「Task は自身が完了済みかどうかを把握する仕組み」を実現するために、「Task クラスは run メソッドの実行を完了した時、Target クラスに対応する何かを出力するクラスである」という定義を追加し、自身の完了を表現する Target クラス(のサブクラス)を返す Task#output メソッドを実装します。例えば、FileTarget を使って、次のように実装します。
class Task
def output
raise NotImplementedError, "You must implement #{self.class}##{__method__}"
end
end
class TaskA < Task
def output
FileTarget.new("TaskA.txt")
end
def run
puts "TaskA#run"
File.write(output.path, "done")
end
end
これで「Task が完了しているかどう」は「task.output.exists? が true を返すかどうか」と同義になります。
Workflow クラスの変更
ここまでくれば、あとは Workflow#run メソッドを少し変更し、task.output.exists? が true の場合に、run メソッドの実行をスキップするようにすれば完成です。
class Workflow
def run(task)
dag = DAG.new
dag.add_task(task)
dag.tsort.each do |t|
t.run unless t.output.exists?
end
end
end
全ての Task を巡回することで、状態管理の必要性をなくしつつ、完了している場合は run メソッドが実行されないので、リトライにかかる時間を最小限にすることができます。
ただし、この方法では Task#run メソッドが複雑になると「完了」の定義も難しくなり、Target クラスの実装が難しくなります。Workflow Engine のサポートを最大限に活かすためにも、1つの Task の処理はなるべく小さくするように心がけるようにしましょう。
Workflow のリトライを試してみる
require_relative 'workflow'
require_relative 'task'
require_relative 'file_target'
class EchoTask < Task
def output
FileTarget.new("/tmp/workflow/#{self.class.name}.txt")
end
def run
puts "#{self.class.name}#run"
File.write(output.path, "done")
end
end
class TaskA < EchoTask
def requires
[ TaskB.new, TaskC.new ]
end
end
class TaskB < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskC < EchoTask
def requires
[ TaskD.new ]
end
end
class TaskD < EchoTask
end
Workflow.new.run(TaskA.new)
これを example.rb という名前のファイル名で保存し、2回実行してみてください。
$ ruby example.rb
TaskD#run
TaskB#run
TaskC#run
TaskA#run
# もう一度実行すると既に完了済みなので何も表示されない
$ ruby example.rb
# TaskA, TaskB の FileTarget に対応するファイルを消して再実行すると、
# TaskB と TaskA のみが実行される
$ rm /tmp/workflow/Task(A|B).txt
$ ruby example.rb
TaskB#run
TaskA#run
全ソースコードはこちら。
https://gist.github.com/hakobera/f1166b697e0e3f5f37f8
まとめ
この記事では Workflow Engine の機能のうち、リトライ機能を実現する Workflow の冪等性を実装する方法について解説しました。Workflow の冪等性は Task の冪等性を保証することにより、状態を持たずに全 Task の再実行により実現できることがわかりました。
次回は、Task 間でのデータのやり取りについて解説します。