15
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Fringe81Advent Calendar 2019

Day 9

thunderbolt⚡ on gokartでパイプラインの出力結果をサクサク確認する

Posted at

これなに?

F81アドベントカレンダー9日目の長谷川です。明日の担当は本日が誕生日🎉の pastelIncさんです。おめでとうございます。
この記事では、パイプラインフレームワークのLuigiの出力結果を、Jupyterなどで簡単に確認するためのライブラリ、thunderboltの紹介していきます。
なお、この記事ではLuigiの基礎的な内容については、一切解説しません。

パイプライン処理の課題

パイプラインフレームワークを使ってデータ処理を開発、運用しているときに、よく中間生成物を簡単に確認したい! と思うこと多々あります。
有向非巡回グラフの特性上、依存している処理の出力結果が期待している物とズレていると、そのズレが連鎖的に伝搬し、ゴミを入れてもゴミしか出ない、汚染されたパイプラインになりかねないからです。
Taskを追加しまくって、どんどん開発が進むのは気分が良いですが、出力物の確認をせずにガンガン処理を増やしていくと、パイプラインが複雑化し、どこでどのような処理が行われているのかが徐々に不透明になり、期待した結果とのズレが生じた際に、そのズレがどこで生じているのかを特定するのが、より困難になっていきます。

thunderbolt⚡とは?

そこでthunderbolt(とgokart)の出番です。
thunderboltは、Luigiをより使いやすくしたラッパーライブラリであるgokartで出力されるlogをパースし、マネージドできるライブラリです。名前の由来はマリオカートのアイテム、サンダー⚡からですね。
名の通り、あくまでgokartをサポートするツールなので、gokartでluigiを構築していることが前提となっています。 gokartの使い方は、M3さんのブログか、前に書いた記事をお読みください。

gokartが出力するlog

gokartは以下のように.pkl形式でlogを出力します。

 resources
 ├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
 └── log
     ├── module_versions
     │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
     ├── processing_time
     │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
     ├── task_log
     │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
     └── task_params
         └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl

logには、

  • 利用したモジュールのバージョン
  • 処理にかかった時間
  • loggerで出力されたlog
  • 適用されたパラメータ

が保存されています。

gokartの場合、デフォルトだと指定したファイル名にhash値がついて保存されてます。hash値はTaskのパラメータによって決まるため頻繁に変わり、しかもlogはpickleで保存されるので、出力物をloadしたり、ログを確認したりが、いささか面倒ですが、このあたりをサポートしてくれるのがthunderboltです。

使い方

公式のexampleに載ってます、おわり。
だと、ちょっと寂しいので、基本的な使い方をここでも紹介します

なおライブラリのバージョンは

  • luigi==2.8.10
  • gokart==0.3.7
  • thunderbolt==0.0.1

です

初期化

from thunderbolt import Thunderbolt
tb=Thunderbolt(workspace_directory='[goartの保存先のディレクトリ]')

で、マネージャのインスタンスを作成します。

なお、ディレクトリのパラメータが指定されていない場合、環境変数TASK_WORKSPACE_DIRECTORYで指定されたpathが参照されます。

出力物の確認

以下のようにget_task_dfで読み込むと、出力物の情報がDataFrameとして確認できます。

tb.get_task_df()
task_id task_name last_modified task_params
0 0 LoadTargetData 2019-11-20 11:04:22.727454 {'target_id': '0'}
1 1 LoadData 2019-11-20 11:04:22.794075 {}
2 2 LoadTargetData 2019-11-20 11:04:22.774771 {'target_id': '2'}
3 3 TargetId 2019-11-20 11:04:22.669280 {}

出力物をload

以下のようにload(task_id=[確認したい出力物のtask_id])で出力物の中身を確認できます。

tb.load(task_id=1)[0].head()
0 1 2 3 target
100 6.3 3.3 6.0 2.5 2
101 5.8 2.7 5.1 1.9 2
102 7.1 3.0 5.9 2.1 2
103 6.3 2.9 5.6 1.8 2
104 6.5 3.0 5.8 2.2 2

その他

今回は試してないですが、S3とGCSもサポートしているので、ディレクトリのpathをs3://xxxあるいはgs://xxxとバケットを指定すると、クラウド上に出力物が保存されていても問題なく実行できるはずです。

終わりに

パイプラインジャングルを彷徨った際に、⚡の光が一縷の望みにならんことを!

15
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?