これなに?
F81アドベントカレンダー9日目の長谷川です。明日の担当は本日が誕生日🎉の pastelIncさんです。おめでとうございます。
この記事では、パイプラインフレームワークのLuigiの出力結果を、Jupyterなどで簡単に確認するためのライブラリ、thunderboltの紹介していきます。
なお、この記事ではLuigiの基礎的な内容については、一切解説しません。
パイプライン処理の課題
パイプラインフレームワークを使ってデータ処理を開発、運用しているときに、よく中間生成物を簡単に確認したい! と思うこと多々あります。
有向非巡回グラフの特性上、依存している処理の出力結果が期待している物とズレていると、そのズレが連鎖的に伝搬し、ゴミを入れてもゴミしか出ない、汚染されたパイプラインになりかねないからです。
Taskを追加しまくって、どんどん開発が進むのは気分が良いですが、出力物の確認をせずにガンガン処理を増やしていくと、パイプラインが複雑化し、どこでどのような処理が行われているのかが徐々に不透明になり、期待した結果とのズレが生じた際に、そのズレがどこで生じているのかを特定するのが、より困難になっていきます。
thunderbolt⚡とは?
そこでthunderbolt(とgokart)の出番です。
thunderboltは、Luigiをより使いやすくしたラッパーライブラリであるgokartで出力されるlogをパースし、マネージドできるライブラリです。名前の由来はマリオカートのアイテム、サンダー⚡からですね。
名の通り、あくまでgokartをサポートするツールなので、gokartでluigiを構築していることが前提となっています。 gokartの使い方は、M3さんのブログか、前に書いた記事をお読みください。
gokartが出力するlog
gokartは以下のように.pkl
形式でlogを出力します。
resources
├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
└── log
├── module_versions
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
├── processing_time
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
├── task_log
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
└── task_params
└── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
logには、
- 利用したモジュールのバージョン
- 処理にかかった時間
- loggerで出力されたlog
- 適用されたパラメータ
が保存されています。
gokartの場合、デフォルトだと指定したファイル名にhash値がついて保存されてます。hash値はTaskのパラメータによって決まるため頻繁に変わり、しかもlogはpickle
で保存されるので、出力物をloadしたり、ログを確認したりが、いささか面倒ですが、このあたりをサポートしてくれるのがthunderboltです。
使い方
公式のexampleに載ってます、おわり。
だと、ちょっと寂しいので、基本的な使い方をここでも紹介します
なおライブラリのバージョンは
- luigi==2.8.10
- gokart==0.3.7
- thunderbolt==0.0.1
です
初期化
from thunderbolt import Thunderbolt
tb=Thunderbolt(workspace_directory='[goartの保存先のディレクトリ]')
で、マネージャのインスタンスを作成します。
なお、ディレクトリのパラメータが指定されていない場合、環境変数TASK_WORKSPACE_DIRECTORY
で指定されたpathが参照されます。
出力物の確認
以下のようにget_task_df
で読み込むと、出力物の情報がDataFrame
として確認できます。
tb.get_task_df()
task_id | task_name | last_modified | task_params | |
---|---|---|---|---|
0 | 0 | LoadTargetData | 2019-11-20 11:04:22.727454 | {'target_id': '0'} |
1 | 1 | LoadData | 2019-11-20 11:04:22.794075 | {} |
2 | 2 | LoadTargetData | 2019-11-20 11:04:22.774771 | {'target_id': '2'} |
3 | 3 | TargetId | 2019-11-20 11:04:22.669280 | {} |
出力物をload
以下のようにload(task_id=[確認したい出力物のtask_id])
で出力物の中身を確認できます。
tb.load(task_id=1)[0].head()
0 | 1 | 2 | 3 | target | |
---|---|---|---|---|---|
100 | 6.3 | 3.3 | 6.0 | 2.5 | 2 |
101 | 5.8 | 2.7 | 5.1 | 1.9 | 2 |
102 | 7.1 | 3.0 | 5.9 | 2.1 | 2 |
103 | 6.3 | 2.9 | 5.6 | 1.8 | 2 |
104 | 6.5 | 3.0 | 5.8 | 2.2 | 2 |
その他
今回は試してないですが、S3とGCSもサポートしているので、ディレクトリのpathをs3://xxx
あるいはgs://xxx
とバケットを指定すると、クラウド上に出力物が保存されていても問題なく実行できるはずです。
終わりに
パイプラインジャングルを彷徨った際に、⚡の光が一縷の望みにならんことを!