Bigqueryを使ったバッチジョブを色々と実行しているのですが、Rakeで複雑な依存関係を管理したり、並列実行させたりするのが辛くなってきたのでRukawaというワークフローエンジンを自作しました。
自作したのは、RailsプロダクトにAirflowとかLuigiとかAzkabanとか入れるにはちょっと重厚過ぎる感じだったのと、Rubyで書ける方が楽で良いやという理由からです。
RukawaとはRUby KAntan Workflow Assistantの略です(後付け)
(本当はミッチーとか水戸の方が好きなんだけど良い名前が浮かばなかった)
実際は、並列実行を可能にして書き方を変えてみたRakeとそんなに大差無い。
Rukawaの機能
ジョブの定義
まず実行したい処理をジョブクラスに記述します。
module ExecuteLog
def self.store
@store ||= {}
end
end
class SampleJob < Rukawa::Job
def run
sleep rand(5)
ExecuteLog.store[self.class] = Time.now
end
end
class Job1 < SampleJob
end
class Job2 < SampleJob
end
class Job3 < SampleJob
end
class Job4 < SampleJob
end
class Job5 < SampleJob
def run
raise "job5 error"
end
end
class Job6 < SampleJob
add_skip_rule ->(job) { job.is_a?(Job6) }
end
class Job7 < SampleJob
end
こんな感じでRukawa::Job
を継承したクラスを作ります。
run
というメソッドの中身が実行されます。
ここは同期処理で記述します。
add_skip_rule
というクラスメソッドに、Procかインスタントメソッド名のSymbolを渡すと条件に合致する場合は、そのジョブをスキップします。
ジョブネットの定義
ジョブクラスができたら、各ジョブ間の依存関係を定義します。
class SampleJobNet < Rukawa::JobNet
class << self
def dependencies
{
Job1 => [],
Job2 => [Job1], Job3 => [Job1],
Job4 => [Job2, Job3],
Job5 => [Job3],
Job6 => [Job4, Job5],
Job7 => [Job6],
}
end
end
end
Rukawa::JobNet
というクラスを継承してself.dependencies
メソッドにHash形式で依存関係を記述します。
例えばJob1
は何にも依存していないため単独で実行可能、Job4
はJob2
とJob3
が完了しないと実行できない処理を意味します。
またジョブネットは入れ子にすることができます。
ジョブクラスと同じ様にジョブネットのクラス名を書けば、そのジョブネットの実行が全て終わってから続きを実行したり、あるジョブが終わってから動き出すジョブネットを定義できます。
add_skip_rule
はジョブネットレベルでも利用可能です。その場合、下位のジョブ全てが同じルールでスキップされます。
実行
rukawaコマンドを利用します。
runサブコマンドの引数としてジョブネットのクラス名を渡します。
% bundle exec rukawa help run
Usage:
rukawa run JOB_NET_NAME
Options:
-c, [--concurrency=N] # Default: cpu count
[--variables=key:value]
[--config=CONFIG] # If this options is not set, try to load ./rukawa.rb
[--job-dirs=one two three] # Load job directories
-b, [--batch], [--no-batch] # If batch mode, not display running status
-l, [--log=LOG]
# Default: ./rukawa.log
[--stdout], [--no-stdout] # Output log to stdout
-d, [--dot=DOT] # Output job status by dot format
-r, [--refresh-interval=N] # Refresh interval for running status information
# Default: 3
% bundle exec rukawa run SampleJobNet
+--------------+---------+
| Job | Status |
+--------------+---------+
| Job1 | waiting |
| Job2 | waiting |
| Job3 | waiting |
| Job4 | waiting |
| InnerJobNet | waiting |
| InnerJob3 | waiting |
| InnerJob1 | waiting |
| InnerJob2 | waiting |
| Job8 | waiting |
| Job5 | waiting |
| Job6 | waiting |
| Job7 | waiting |
| InnerJobNet2 | waiting |
| InnerJob4 | waiting |
| InnerJob5 | waiting |
| InnerJob6 | waiting |
+--------------+---------+
+--------------+----------+
| Job | Status |
+--------------+----------+
| Job1 | finished |
| Job2 | finished |
| Job3 | finished |
| Job4 | finished |
| InnerJobNet | running |
| InnerJob3 | running |
| InnerJob1 | running |
| InnerJob2 | waiting |
| Job8 | waiting |
| Job5 | error |
| Job6 | error |
| Job7 | error |
| InnerJobNet2 | running |
| InnerJob4 | running |
| InnerJob5 | skipped |
| InnerJob6 | waiting |
+--------------+----------+
実行時にカレントディレクトリのjob_nets
とjobs
以下にある.rb
ファイルを自動的にロードします
実行時に、あるジョブがエラー、またはスキップされた場合、そのジョブに依存するジョブは全てエラー(スキップ)となります。
後続のジョブの実行を継続するルールの設定については、今後実装予定です。
ジョブ依存関係の可視化
Hashだけだとジョブの全体図が分かり辛いのでgraphviz形式でジョブの全体図を出力できます。
graphサブコマンドにジョブネットのクラス名を渡します
% bundle exec rukawa graph -o SampleJobNet.dot SampleJobNet
% dot -Tpng -o SampleJobNet.png SampleJobNet.dot
直接pngやらsvgを出力する機能は現時点で未実装なので、今はdotの呼び出しが必要です。
出力するとこんな感じになります。
実行結果の可視化
実行結果もgraphviz形式で出力できます。
% bundle exec rukawa run SampleJobNet -r 1 -d result.dot
% dot -Tpng -o result.png result.dot
コンフィグファイル
configオプションを指定することでGraphvizのスタイルを変えたり、デフォルトのconcurrencyを設定したりできます。
制約
並列実行制御やジョブのキューイングを全てconcurrent-rubyに丸投げしているので、コードベースはDAGの構築とそれの畳み込みがほとんどで実装が簡易化されてます。
また、DSLも用意してないのでRubyのクラスを普通に定義することでジョブを定義します。
そのため、以下の制約があります。
- concurrencyと依存関係の定義上では同時に動いても良いはずのジョブがキュー待ちになる場合がある
- 一回の実行の中で同じクラスのジョブは一度しか実行できない
- もし複数回実行したい場合は、メタプログラミング的に定義するなどの工夫が必要になる
その他の現時点における仕様上の割り切りとしては以下のようなものがあります。
- cronの様なスケジューラーは無し、今のところ組み込む予定もない
- デーモン化しない、コマンドから起動するだけ
- 複数ノードに渡って実行を分散する機能はない
一方で、基本的にRubyのクラス定義とloadで動作するので、Railsアプリから必要なファイルをrequireしてくるだけで処理をそのまま利用できるといったメリットも存在します。
ToDo
- コマンドラインオプションから変数を渡してジョブで利用する
- 特定のジョブからResumeする
- 一部のジョブが成功、失敗、スキップした時の後続ジョブの扱いを定義可能にする
- ジョブ間でのデータの受け渡し(一応実装しかけのものはあるが、まだ動かない)
- Rubyのコードさえ書いてしまえば、何とでもなるといえばなる。
- コールバック(要らんかも)
- プラガブルな実行状況の通知
色々考えると、activesupportがあった方が楽なんだけど依存gem増やすのはどうだろうなあ。activesupportぐらい良いかなあとか葛藤がある……。
なんにせよ、今後もしばらく機能追加していきます。