データを扱う仕事をしていると、何らかの形でデータをある順序に沿って処理する Workflow = データパイプラインを構築する必要になる場面に出くわすことが多いと思います。
こうした処理に適用できるソフトウェアは既にいくつか存在するのですが、自分が仕事で使う上でマッチするものがなかったので、tumugi を Ruby のライブラリを作成し、 gem として公開しました。ロゴはこんな感じ。
Workflow Engine をつくろう! シリーズ連載は、元々このライブラリの設計を整理するために書いていた記事でした。まだ、未完ですが興味のある方はそちらもお読みください。
注釈
この記事は tumugi が Python で書かれたワークフローエンジンである Luigi に強く影響されて書いたことから、対比として分かりやすいため Building Data Pilelines with Python and Luigi に似せた構成で書きました。
記事中のコードは gist にもおいてあります。
素の Ruby でデータパイプラインを構築する
Ruby で最も単純にデータパイプラインを構築するには、一連のタスクを記述したスクリプトをシェルから順次実行することです。
$ ruby get_some_data.rb
$ ruby clean_some_data.rb
$ ruby join_other_data.rb
$ ruby do_stuff_with_data.rb
これでも十分なケースもあるとは思いますが、仕事で運用していくには以下のような要望が出てくることでしょう。
- 1つのコマンドで全体を再実行したい
- 失敗したら、再実行したい。その際、再実行時には成功した処理はスキップしたい
- 依存関係のない処理は並列実行して、処理時間を短縮したい
- 前処理で作成したデータを、後段の処理で参照したい
これを実現するために、コードの書き方のルールを作り、エラー処理を実装し・・・とするのも一つの手ですが、こうした定形処理はライブラリとして実装するのに向いています。
tumugi はこうした処理を手軽に提供する gem です。
tumugi ってなに?
tumugi は Ruby 製のワークフロー管理ライブラリです。提供する主な機能は
- ワークフローを Ruby と 言語内 DSL で記述可
- タスクの依存性管理
- エラー時のリトライ処理
- 再実行時に失敗したタスクのみを実行
- 依存関係の可視化(要 Graphviz)
- プラグインによる機能拡張(というか、コア機能以外はすべてプラグインで提供)
tumugi の制約
tumugi は実装を小さくするために、以下の制約があります。
- 1台のマシンで動かすことが前提で、複数台の分散環境での実行はできません
- スケジューラ (cron) 機能は提供しません。Jenkins や Rundeck などの既存のスケジューラと組み合わせて使用することを想定しています。(自分は仕事では Jenkins 2.0 と組み合わせで使っています)
これらの制約から、ガンガン計算するような大規模バッチには向かず、小規模のバッチ、もしくは外部のデータソース/APIを呼び出すコントローラ的な位置づけで利用することを想定しています。
tumugi のコアコンセプト
tumugi でデータパイプラインを構築する上で、重要な2つのコンセプトがあります。それが、Task
と Target
です。
- Task
- tumugi における処理の最小単位
- ワークフローとは依存関係と実行順序が定義された
Task
の集合
- Target
-
Task
は処理の結果として、生成されるもの。例えば、- ローカルマシン上のファイル
- Google Cloud Storage 上のファイル
- BigQuery のテーブル
- Task1 が Task2 に依存している場合、Task2 の出力である
Target
が、Task1 の入力となる
-
アーキテクチャについて、より詳細を知りたい人は http://tumugi.github.io/architecture/ をご覧ください。
tumugi によるワークフロー
tumugi は Ruby の言語内 DSL でワークフローを記述するので、どこでも Ruby のコードが使えます。
task :print_numbers do
output target(:local_file, "numbers_up_to_10.txt")
run {
output.open("w") do |f|
(1..10).each {|i| f.puts(i)}
end
}
end
task :squared_numbers do
requires :print_numbers
output target(:local_file, "squares.txt")
run {
output.open("w") do |fout|
input.open("r") do |fin|
fin.each_line do |line|
n = line.to_i
out = n * n
fout.puts("#{n}:#{out}")
end
end
end
}
end
このコードは、:print_numbers
と :squared_numbers
という2つの Task
を含んでいます。
-
:print_numbers
- 1行に1つずつ、1から10までの数字を含む10行の
numbers_up_to_10.txt
という名前のファイルを出力するTask
- 1行に1つずつ、1から10までの数字を含む10行の
-
:squared_numbers
-
:print_numbers
の出力したファイルを読み、各行の2乗の数と元の数のペアをsquares.txt
という名前のファイルに出力するTask
-
これを実行してみましょう。
まずは、Bundler を使って、tumugi をインストールしてみましょう
source "https://rubygems.org"
gem "tumugi", "~> 0.6.3"
上記の内容を含む Gemfile
を作成したら、bundle install
でインストールします。インストール完了したら、早速実行してみましょう。
$ bundle exec tumugi run -f run_tumugi.rb squared_numbers
tumugi run -f ワークフローファイル名.rb 実行したいタスク名
という風に指定します。
結果はこんな感じになります。
2016-08-30 16:56:10 +0900 [INFO] (tumugi-main) tumugi v0.6.3
2016-08-30 16:56:10 +0900 [INFO] (tumugi-workflow) Load workflow from run_tumugi.rb
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) workflow_start: e10133c9-3a7e-4ab7-9f6f-8b62b17b9ad4
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_start: print_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_completed: print_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_start: squared_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_completed: squared_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) workflow_end: e10133c9-3a7e-4ab7-9f6f-8b62b17b9ad4
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+-----------+
| Workflow Result |
+-----------------+---------------+------------+-----------+
| Task | Requires | Parameters | State |
+-----------------+---------------+------------+-----------+
| print_numbers | | | completed |
+-----------------+---------------+------------+-----------+
| squared_numbers | print_numbers | | completed |
+-----------------+---------------+------------+-----------+
2016-08-30 16:56:10 +0900 [INFO] (tumugi-main) status: success, command: run, args: {:task=>"squared_numbers", :options=>{"quiet"=>false, "verbose"=>false, "log_format"=>"text", "file"=>"run_tumugi.rb"}}
それぞれのタスクが指定したファイルが2つ作られていることも確認できます。
$ cat numbers_up_to_10.txt
1
2
3
4
5
6
7
8
9
10
$ cat squares.txt
1:1
2:4
3:9
4:16
5:25
6:36
7:49
8:64
9:81
10:100
コマンドには squared_numbers
しか指定していないのに、依存関係のある print_numbers
タスクも同時に実行されていることがわかります。
ワークフローの可視化
タスクが増えてくると、コードだけだと依存関係がわかりにくい場合があります。その場合、ワークフローの依存関係(DAG)を可視化する tumugi show
コマンドを使うと便利です。内部的に Graphviz 使っているので、別途インストールする必要があります。
$ bundle exec tumugi show -f run_tumugi.rb -o dag.png squared_numbers
これで dag.png
に次のような画像が出力されます。
DSL
上記の例のように tumugi は DSL でタスクを定義します。基本形は次の通りです。
task "タスク名(Symbol も可)" do
requires "依存するタスク名(複数の依存がある場合、Array, Hashも可)"
output target(:target_type, target_params) # 省略可
run {
puts "ここにタスクの実体を書きます"
puts "output を省略しなかった場合、ここで output に対応する何かを出力します"
}
end
CLI 経由でパラメータを渡す
CLI 経由でパラメータを渡すことができると、タスクの汎用性が上がります。よく使われる例としては、「日次バッチに対して、今日の日付を渡す」とか、「繰り返しの回数を渡す」などがあります。
このパターンはよく使われるため、tumugi にはパラメータをタスクに渡す機能が実装してあります。それでは、前述したワークフローを、パラメータを利用するように改造してみましょう。
task :print_numbers do
# max_num という名前のパラメータを使うことを宣言
param :max_num, type: :integer, auto_bind: true, required: true
output target(:local_file, "numbers_up_to_10.txt")
run {
output.open("w") do |f|
(1..max_num).each {|i| f.puts(i)}
end
}
end
# squared_numbers は一緒なので省略
パラメータは CLI 実行時に -p パラメータ名:値
というオプションで渡します。
$ bundle exec tumugi run -f run_tumugi2.rb squared_numbers -p max_num:5
で、実際やってみると、
2016-08-30 19:38:45 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+---------+
| Workflow Result |
+-----------------+---------------+------------+---------+
| Task | Requires | Parameters | State |
+-----------------+---------------+------------+---------+
| print_numbers | | max_num=5 | skipped |
+-----------------+---------------+------------+---------+
| squared_numbers | print_numbers | | skipped |
+-----------------+---------------+------------+---------+
どちらのタスクも skipped
と言われてしまいました。tumugi は既に成功したタスクはスキップする仕様になっているので、こうなるのです。同じタスクを再実行するためには、Target
に対応するファイルを消しておく必要があります。
$ rm *.txt # 前回の実行結果を消しておかないと全部スキップされる
$ bundle exec tumugi run -f run_tumugi2.rb squared_numbers -p max_num:5
2016-08-30 19:43:19 +0900 [INFO] (tumugi-main) tumugi v0.6.3
2016-08-30 19:43:19 +0900 [INFO] (tumugi-workflow) Parameters: {"max_num"=>"5"}
2016-08-30 19:43:19 +0900 [INFO] (tumugi-workflow) Load workflow from run_tumugi2.rb
2016-08-30 19:43:19 +0900 [INFO] (tumugi-run) workflow_start: 993c33f1-8583-4a13-b6dd-f7d6063754ae
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_start: print_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_completed: print_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_start: squared_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_completed: squared_numbers
2016-08-30 19:43:20 +0900 [INFO] (tumugi-run) workflow_end: 993c33f1-8583-4a13-b6dd-f7d6063754ae
2016-08-30 19:43:20 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+-----------+
| Workflow Result |
+-----------------+---------------+------------+-----------+
| Task | Requires | Parameters | State |
+-----------------+---------------+------------+-----------+
| print_numbers | | max_num=5 | completed |
+-----------------+---------------+------------+-----------+
| squared_numbers | print_numbers | | completed |
+-----------------+---------------+------------+-----------+
2016-08-30 19:43:20 +0900 [INFO] (tumugi-main) status: success, command: run, args: {:task=>"squared_numbers", :options=>{"quiet"=>false, "verbose"=>false, "log_format"=>"text", "file"=>"run_tumugi2.rb", "params"=>{"max_num"=>"5"}}}
今度は無事に実行されたようです。
$ cat numbers_up_to_10.txt
1
2
3
4
5
$ cat squares.txt
1:1
2:4
3:9
4:16
5:25
結果も指定した通り、最大値が5に変わっていることがわかります。
プラグイン
tumugi はプラグインにより Task
, Target
の種類を増やすことができます。実は tumugi の Task
, Target
は組み込みのもの(:local_file
など)も全てプラグインとして実装されています。
Task
プラグインは type
オプションで指定します。例えば、シェルのコマンドを実行する tumugi-plugin-command
の場合、以下のようになります。
task :download_log, type: :command do
param :log_filename, type: :string
param :day, auto_bind: true, type: :time, required: true
log_filename {
"access_#{day.strftime('%Y%m%d')}.log.zip"
}
command {
url = "https://tumugi.github.io/data/#{log_filename}"
"wget #{url} -O #{output.path}"
}
output {
target(:local_file, "tmp/#{log_filename}")
}
end
公式プラグイン一覧は => http://tumugi.github.io/plugins/
tumugi プラグインの作り方は別の記事で書きたいと思います。
その他
- タスクが多くなってきたので、ファイルを分割したい
- ワークフロー自体が Ruby で記述されているので、ファイル分割して
require
すれば OK
- ワークフロー自体が Ruby で記述されているので、ファイル分割して
- その他のプラグインの使い方がわからない
- 公式ドキュメントにプラグインを使ったコードのサンプルがあります
- 各種プラグインについては別途記事を書く予定です(TODO: 後でリンクを追加)
まとめ
tumugi を使うとデータパイプライン構築で考えることが減り、各タスクをどう実装するかに注力することができます。ワークフローエンジンとか、データパイプライン構築とか難しく考えず、ちょっと高機能な Rake くらいな感じで使ってもらえればなと思います。
tumugi 本体、および公式プラグインすべて Github で公開しているので、コントリビュートもお待ちしております。