Ruby と tumugi によるデータパイプライン構築

More than 1 year has passed since last update.

データを扱う仕事をしていると、何らかの形でデータをある順序に沿って処理する Workflow = データパイプラインを構築する必要になる場面に出くわすことが多いと思います。

こうした処理に適用できるソフトウェアは既にいくつか存在するのですが、自分が仕事で使う上でマッチするものがなかったので、tumugi を Ruby のライブラリを作成し、 gem として公開しました。ロゴはこんな感じ。

logo

Workflow Engine をつくろう! シリーズ連載は、元々このライブラリの設計を整理するために書いていた記事でした。まだ、未完ですが興味のある方はそちらもお読みください。

注釈

この記事は tumugi が Python で書かれたワークフローエンジンである Luigi に強く影響されて書いたことから、対比として分かりやすいため Building Data Pilelines with Python and Luigi に似せた構成で書きました。

記事中のコードは gist にもおいてあります。

素の Ruby でデータパイプラインを構築する

Ruby で最も単純にデータパイプラインを構築するには、一連のタスクを記述したスクリプトをシェルから順次実行することです。

$ ruby get_some_data.rb
$ ruby clean_some_data.rb
$ ruby join_other_data.rb
$ ruby do_stuff_with_data.rb

これでも十分なケースもあるとは思いますが、仕事で運用していくには以下のような要望が出てくることでしょう。

  • 1つのコマンドで全体を再実行したい
  • 失敗したら、再実行したい。その際、再実行時には成功した処理はスキップしたい
  • 依存関係のない処理は並列実行して、処理時間を短縮したい
  • 前処理で作成したデータを、後段の処理で参照したい

これを実現するために、コードの書き方のルールを作り、エラー処理を実装し・・・とするのも一つの手ですが、こうした定形処理はライブラリとして実装するのに向いています。

tumugi はこうした処理を手軽に提供する gem です。

tumugi ってなに?

tumugi は Ruby 製のワークフロー管理ライブラリです。提供する主な機能は

  • ワークフローを Ruby と 言語内 DSL で記述可
  • タスクの依存性管理
  • エラー時のリトライ処理
  • 再実行時に失敗したタスクのみを実行
  • 依存関係の可視化(要 Graphviz)
  • プラグインによる機能拡張(というか、コア機能以外はすべてプラグインで提供)

tumugi の制約

tumugi は実装を小さくするために、以下の制約があります。

  • 1台のマシンで動かすことが前提で、複数台の分散環境での実行はできません
  • スケジューラ (cron) 機能は提供しません。Jenkins や Rundeck などの既存のスケジューラと組み合わせて使用することを想定しています。(自分は仕事では Jenkins 2.0 と組み合わせで使っています)

これらの制約から、ガンガン計算するような大規模バッチには向かず、小規模のバッチ、もしくは外部のデータソース/APIを呼び出すコントローラ的な位置づけで利用することを想定しています。

tumugi のコアコンセプト

tumugi でデータパイプラインを構築する上で、重要な2つのコンセプトがあります。それが、TaskTarget です。

  • Task
    • tumugi における処理の最小単位
    • ワークフローとは依存関係と実行順序が定義された Task の集合
  • Target
    • Task は処理の結果として、生成されるもの。例えば、
      • ローカルマシン上のファイル
      • Google Cloud Storage 上のファイル
      • BigQuery のテーブル
    • Task1 が Task2 に依存している場合、Task2 の出力である Target が、Task1 の入力となる

アーキテクチャについて、より詳細を知りたい人は http://tumugi.github.io/architecture/ をご覧ください。

tumugi によるワークフロー

tumugi は Ruby の言語内 DSL でワークフローを記述するので、どこでも Ruby のコードが使えます。

run_tumugi.rb
task :print_numbers do
  output target(:local_file, "numbers_up_to_10.txt")

  run {
    output.open("w") do |f|
      (1..10).each {|i| f.puts(i)}
    end
  }
end

task :squared_numbers do
  requires :print_numbers

  output target(:local_file, "squares.txt")

  run {
    output.open("w") do |fout|
      input.open("r") do |fin|
        fin.each_line do |line|
          n = line.to_i
          out = n * n
          fout.puts("#{n}:#{out}")
        end
      end
    end
  }
end

このコードは、:print_numbers:squared_numbers という2つの Task を含んでいます。

  • :print_numbers
    • 1行に1つずつ、1から10までの数字を含む10行の numbers_up_to_10.txt という名前のファイルを出力する Task
  • :squared_numbers
    • :print_numbers の出力したファイルを読み、各行の2乗の数と元の数のペアを squares.txt という名前のファイルに出力する Task

これを実行してみましょう。
まずは、Bundler を使って、tumugi をインストールしてみましょう

Gemfile
source "https://rubygems.org"
gem "tumugi", "~> 0.6.3"

上記の内容を含む Gemfile を作成したら、bundle install でインストールします。インストール完了したら、早速実行してみましょう。

$ bundle exec tumugi run -f run_tumugi.rb squared_numbers

tumugi run -f ワークフローファイル名.rb 実行したいタスク名 という風に指定します。
結果はこんな感じになります。

2016-08-30 16:56:10 +0900 [INFO] (tumugi-main) tumugi v0.6.3
2016-08-30 16:56:10 +0900 [INFO] (tumugi-workflow) Load workflow from run_tumugi.rb
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) workflow_start: e10133c9-3a7e-4ab7-9f6f-8b62b17b9ad4
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_start: print_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_completed: print_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_start: squared_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-executor) task_completed: squared_numbers
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) workflow_end: e10133c9-3a7e-4ab7-9f6f-8b62b17b9ad4
2016-08-30 16:56:10 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+-----------+
|                     Workflow Result                      |
+-----------------+---------------+------------+-----------+
| Task            | Requires      | Parameters | State     |
+-----------------+---------------+------------+-----------+
| print_numbers   |               |            | completed |
+-----------------+---------------+------------+-----------+
| squared_numbers | print_numbers |            | completed |
+-----------------+---------------+------------+-----------+
2016-08-30 16:56:10 +0900 [INFO] (tumugi-main) status: success, command: run, args: {:task=>"squared_numbers", :options=>{"quiet"=>false, "verbose"=>false, "log_format"=>"text", "file"=>"run_tumugi.rb"}}

それぞれのタスクが指定したファイルが2つ作られていることも確認できます。

$ cat numbers_up_to_10.txt
1
2
3
4
5
6
7
8
9
10

$ cat squares.txt
1:1
2:4
3:9
4:16
5:25
6:36
7:49
8:64
9:81
10:100

コマンドには squared_numbers しか指定していないのに、依存関係のある print_numbers タスクも同時に実行されていることがわかります。

ワークフローの可視化

タスクが増えてくると、コードだけだと依存関係がわかりにくい場合があります。その場合、ワークフローの依存関係(DAG)を可視化する tumugi show コマンドを使うと便利です。内部的に Graphviz 使っているので、別途インストールする必要があります。

$ bundle exec tumugi show -f run_tumugi.rb -o dag.png squared_numbers

これで dag.png に次のような画像が出力されます。

dag.png

DSL

上記の例のように tumugi は DSL でタスクを定義します。基本形は次の通りです。

task "タスク名(Symbol も可)" do
  requires "依存するタスク名(複数の依存がある場合、Array, Hashも可)"
  output target(:target_type, target_params) # 省略可
  run {
    puts "ここにタスクの実体を書きます"
    puts "output を省略しなかった場合、ここで output に対応する何かを出力します"
  }
end

CLI 経由でパラメータを渡す

CLI 経由でパラメータを渡すことができると、タスクの汎用性が上がります。よく使われる例としては、「日次バッチに対して、今日の日付を渡す」とか、「繰り返しの回数を渡す」などがあります。

このパターンはよく使われるため、tumugi にはパラメータをタスクに渡す機能が実装してあります。それでは、前述したワークフローを、パラメータを利用するように改造してみましょう。

run_tumugi2.rb
task :print_numbers do
  # max_num という名前のパラメータを使うことを宣言
  param :max_num, type: :integer, auto_bind: true, required: true

  output target(:local_file, "numbers_up_to_10.txt")

  run {
    output.open("w") do |f|
      (1..max_num).each {|i| f.puts(i)}
    end
  }
end

# squared_numbers は一緒なので省略

パラメータは CLI 実行時に -p パラメータ名:値 というオプションで渡します。

$ bundle exec tumugi run -f run_tumugi2.rb squared_numbers -p max_num:5

で、実際やってみると、

2016-08-30 19:38:45 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+---------+
|                    Workflow Result                     |
+-----------------+---------------+------------+---------+
| Task            | Requires      | Parameters | State   |
+-----------------+---------------+------------+---------+
| print_numbers   |               | max_num=5  | skipped |
+-----------------+---------------+------------+---------+
| squared_numbers | print_numbers |            | skipped |
+-----------------+---------------+------------+---------+

どちらのタスクも skipped と言われてしまいました。tumugi は既に成功したタスクはスキップする仕様になっているので、こうなるのです。同じタスクを再実行するためには、Target に対応するファイルを消しておく必要があります。

$ rm *.txt # 前回の実行結果を消しておかないと全部スキップされる
$ bundle exec tumugi run -f run_tumugi2.rb squared_numbers -p max_num:5
2016-08-30 19:43:19 +0900 [INFO] (tumugi-main) tumugi v0.6.3
2016-08-30 19:43:19 +0900 [INFO] (tumugi-workflow) Parameters: {"max_num"=>"5"}
2016-08-30 19:43:19 +0900 [INFO] (tumugi-workflow) Load workflow from run_tumugi2.rb
2016-08-30 19:43:19 +0900 [INFO] (tumugi-run) workflow_start: 993c33f1-8583-4a13-b6dd-f7d6063754ae
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_start: print_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_completed: print_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_start: squared_numbers
2016-08-30 19:43:19 +0900 [INFO] (tumugi-executor) task_completed: squared_numbers
2016-08-30 19:43:20 +0900 [INFO] (tumugi-run) workflow_end: 993c33f1-8583-4a13-b6dd-f7d6063754ae
2016-08-30 19:43:20 +0900 [INFO] (tumugi-run) Result report:
+-----------------+---------------+------------+-----------+
|                     Workflow Result                      |
+-----------------+---------------+------------+-----------+
| Task            | Requires      | Parameters | State     |
+-----------------+---------------+------------+-----------+
| print_numbers   |               | max_num=5  | completed |
+-----------------+---------------+------------+-----------+
| squared_numbers | print_numbers |            | completed |
+-----------------+---------------+------------+-----------+
2016-08-30 19:43:20 +0900 [INFO] (tumugi-main) status: success, command: run, args: {:task=>"squared_numbers", :options=>{"quiet"=>false, "verbose"=>false, "log_format"=>"text", "file"=>"run_tumugi2.rb", "params"=>{"max_num"=>"5"}}}

今度は無事に実行されたようです。

$ cat numbers_up_to_10.txt
1
2
3
4
5

$ cat squares.txt
1:1
2:4
3:9
4:16
5:25

結果も指定した通り、最大値が5に変わっていることがわかります。

プラグイン

tumugi はプラグインにより Task, Target の種類を増やすことができます。実は tumugi の Task, Target は組み込みのもの(:local_fileなど)も全てプラグインとして実装されています。

Task プラグインは type オプションで指定します。例えば、シェルのコマンドを実行する tumugi-plugin-command の場合、以下のようになります。

task :download_log, type: :command do
  param :log_filename, type: :string
  param :day, auto_bind: true, type: :time, required: true

  log_filename {
    "access_#{day.strftime('%Y%m%d')}.log.zip"
  }
  command {
    url = "https://tumugi.github.io/data/#{log_filename}"
    "wget #{url} -O #{output.path}"
  }

  output {
    target(:local_file, "tmp/#{log_filename}")
  }
end

公式プラグイン一覧は => http://tumugi.github.io/plugins/
tumugi プラグインの作り方は別の記事で書きたいと思います。

その他

  • タスクが多くなってきたので、ファイルを分割したい
    • ワークフロー自体が Ruby で記述されているので、ファイル分割して require すれば OK
  • その他のプラグインの使い方がわからない

まとめ

tumugi を使うとデータパイプライン構築で考えることが減り、各タスクをどう実装するかに注力することができます。ワークフローエンジンとか、データパイプライン構築とか難しく考えず、ちょっと高機能な Rake くらいな感じで使ってもらえればなと思います。

tumugi 本体、および公式プラグインすべて Github で公開しているので、コントリビュートもお待ちしております。

https://github.com/tumugi

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.