Edited at

PythonとLuigiによるデータパイプライン構築

More than 3 years have passed since last update.


はじめに

この記事は Building Data Pipelines with Python and Luigi の和訳です。

元記事がよくできていたので、自分の理解も兼ねて、拙いながらも訳してみました。

誤り等ありましたらぜひコメントでご指摘おねがいいたします。


PythonとLuigiによるデータパイプライン構築

データサイエンティストにとって、日々の業務はエンジニアリングというよりも研究開発の色が濃いことがしばしばある。にもかかわらず、プロトタイプから製品までのプロセスには、素早く泥臭い決定が次善の策1であり、けっこうな数のリエンジニアリングの努力が必要となる。これはいつも革新を遅らせ、一般的に言えば、プロジェクト全体を遅らせる。

この記事はデータパイプライン構築の経験について議論する:データの抽出、洗浄、結合、前処理などデータ駆動製品のためのデータを準備するのに必要となる一般的なすべてのステップである。特に焦点を当てるのはデータの配管、そしてLuigiのようなワークフローマネージャがいかにあなたを邪魔せずに救世主となるのかである。最小限の努力で、プロトタイプから製品への移行がスムーズになる。

コードのサンプルは GitHub Gist で公開している。


これまでのプロトタイプ

過去のプロトタイプでは、データパイプラインはおおむね次のような感じだった:

$ python get_some_data.py

$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

データプロジェクトの予備実験段階では、次に挙げるようなことはごく普通のことだ:前処理が必要であり、それは素早いハック2へとなりそうであり、だからエンジニアリングのベストプラクティスに煩わされ、そしてスクリプトの数は膨れ上がりデータパイプラインが牙をむく。

このアプローチは素早くそしてハックであるという利点だけを持つ。欠点は、退屈であることだ:毎回パイプラインを再実行したくなり、スクリプトの束を次々に手動で呼び出す必要がある。さらに、このプロトタイプを同僚に共有するとき、誤解の余地が大いに存在する (「なぜdo_stuff_with_dataが動かないの?」「最初にclean_some_dataをした?」などなど)。

明らかにハックな解は、すべてをひとつのスクリプトに押し込むことにみえる。いくらかの素早いリファクタリングの後、do_everything.py スクリプトは次のようになるだろう:

if __name__ == '__main__':

get_some_data()
clean_some_data()
join_other_data()
do_stuff_with_data()

実行するのは簡単だ:

$ python do_everything.py

(注意:スクリプトの束を順番に呼び出すようなbashスクリプトにすべてをまとめることもできるが、欠点は変わらず同じである)


コードのテンプレ (鋳型)

製品への準備段階となるパイプラインへと移った時、例の全てを実行するコードの側面について少し考える必要がある。特に、エラー処理が考慮されるべきだ:

try:

get_some_data()
except GetSomeDataError as e:
# エラー処理

しかし全てのタスクを一緒にすると、try/exceptのクリスマスツリーと化す:

try:

get_some_data()
try:
clean_some_data()
try:
# ここでなにかをする...
except EvenMoreErrors:
# ...
except CleanSomeDataError as e:
# CleanSomeDataError を処理
except GetSomeDataError as e:
# GetSomeDataError を処理

別の重要な考えるべき点はどうやってパイプラインを復帰させるかだ。たとえば、最初のいくつかのタスクは完了したが、その途中でエラーが起きたら、どうやって最初の成功したステップを再実行せずにそのパイプラインを再実行できるだろうか?

# タスクがすでに成功しているかチェック

if not i_got_the_data_already():
# していないなら、それを実行
try:
get_some_date()
except GetSomeDataError as e:
# エラー処理


Luigi へ

Luigi は、バッチジョブの複雑なデータパイプライン構築を助けるためにSpotifyで開発された、ワークフロー管理のためのPythonのツールである。Luigi のインストールは:

pip install luigi

Luigi の便利な特徴は:


  • 依存管理

  • チェックポイント / 障害復帰

  • CLIインテグレーション / パラメータ化

  • 依存グラフ可視化

データパイプラインにどうやって Luigi を適用できるかを理解するためのキーコンセプトがふたつある:タスクとターゲットだ。タスクは仕事の集まりであり、luigi.Task クラスを継承、いくつかの基礎的なメソッドをオーバーライドすることで表現する。タスクの出力がターゲットであり、それはローカルファイルシステムかもしれないし、Amazon S3 かもしれないし、データベースにあるかもしれない。

依存関係は入力と出力で定義できる。例えば、もしタスクBがタスクAに依存している場合、それはタスクAの出力がタスクBの入力である、ということを意味する。

いくつかの典型的なタスクを見てみよう:

# Filename: run_luigi.py

import luigi

class PrintNumbers(luigi.Task):

def requires(self):
return []

def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")

def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):

def requires(self):
return [PrintNumbers()]

def output(self):
return luigi.LocalTarget("squares.txt")

def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))

if __name__ == '__main__':
luigi.run()

このコードはふたつのタスクを提示している:1から10までの数を一行ずつnumber_up_to_10.txtというファイルに書き出すPrintNumbersと、そのファイルを読んで平方数とのペアを一行ずつsquares.txtというファイルに書き出すSquaredNumbersだ。

このタスクを実行するには:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi はタスク間の依存関係チェックを考慮し、SquaredNumbersの入力がないことを発見するので、最初にPrintNumbersタスクを走らせてから、SquaredNumbersを実行に移す。

Luigi に渡した最初の引数は実行したいパイプラインの最後のタスクの名前だ。二番目の引数は、単に Luigi にローカルスケジューラを使うように伝えているだけだ (詳細は後述)。

luigi コマンドを用いることもできる:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler


タスクの骨組み

Luigi のタスクを作るためには、単にluigi.Taskを親としてクラスを作り、いくつかのメソッドをオーバーライドすればよい。特に:



  • requires()は依存しているタスクのリスト


  • output()はタスクのターゲット (例えば LocalTarget, S3Targetなど)


  • run()は実行のロジック

である。Luigi はrequires()output()の返り値をチェックし、それに応じて依存関係グラフを構築する。


パラメータを渡す

ハードコーディングされたファイル名や設定値は一般的に言ってアンチパターンだ。一度タスクの構造とダイナミクスを理解したら、同じスクリプトを引数を変えて動的に呼び出すことが出来るように設定をパラメータ化すべきだ。

luigi.Parameter()クラスがそれだ。それぞれの Luigi のタスクはいくつかのパラメータを持ちうる。たとえば、前の例で、数を変えられるようにするとしよう。range()関数のパラメータとして使用しているのは整数なので、デフォルトのパラメータクラスではなくluigi.IntParameterを使うことができる。変更したタスクは次のようになる:

class PrintNumbers(luigi.Task):

n = luigi.IntParameter()

def requires(self):
return []

def output(self):
return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))

def run(self):
with self.output().open('w') as f:
for i in range(1, self.n+1):
f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):
n = luigi.IntParameter()

def requires(self):
return [PrintNumbers(n=self.n)]

def output(self):
return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))

def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))

SquaredNumbersタスクを20まで上げて呼び出すには:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

パラメータにはデフォルト値を持たせることもできる。例えば:

n = luigi.IntParameter(default=10)

この場合、--n引数を指定しなければ、10が使われる。

サンプルはGitHub Gistへ


ローカル vs グローバルスケジューラ

前に、Luigi のタスクをローカルスケジューラで実行するときに--local-schedulerオプションを使った。これは開発には便利だが、製品環境の場合、centralised scheduler を使うべきだ (scheduler のドキュメントを見よ)。

これにはいくつかの利点がある:


  • 同じタスクのふたつのインスタンスを同時に実行することを回避

  • ナイスなWebベースでの可視化

Luigi スケジューラのデーモンをフォアグラウンドで走らせるには:

$ luigid

バックグラウンドの場合:

$ luigid --background

これはデフォルトで8082番ポートを使うので、ブラウザで http://localhost:8082/ へアクセスすれば可視化を見ることができる。

グローバル Luigi スケジューラが走っている時は、ローカルスケジューラのためのオプションなしで再実行できる:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

サンプルコードはミリ秒単位で終了するが、もしブラウザに切り替えてタスクがまだ実行中のときの依存関係グラフを見たいなら、おそらく大きな数たとえば10,000,000とかそれ以上の数を--nオプションに与えれば良いだろう。

依存関係グラフのスクリーンショットは:

dependency-graph-screenshot.png


まとめ

Pythonで書かれたワークフローマネージャ、Luigi を使ったデータパイプラインの定義について議論した。Luigi はタスクとターゲットによるパイプラインのナイスな抽象化を提供し、あなたのために依存関係も考慮してくれる。

コード再利用、そしてプロトタイプから製品へ移行するマインドセットの観点から、私はビジネスロジックのタスクを個別のPythonパッケージとして定義することが便利だと思う (つまり、setup.pyファイルとともに)。このやり方だと、Luigi のスクリプトからは単に import your_package と宣言してそこから呼び出せば良い。

ひとつのタスクが複数のファイルを出力として生成することは可能だが、もしそうなった場合、おそらくだが、そのタスクが小さな単位 (つまり複数のタスク) に分割できないかを考えるべきだ。それらの出力は論理的に同じか?依存関係はないか?もしタスクを分割できない場合、ただoutput()をタスクそれ自体の名前やらタイムスタンプやらを組み合わせたログファイルとするのが単純かつ便利だと思う。そのログファイルの名前はTaskName_timestamp_param1value_param2value_etcのようになるだろう。

Luigi のようなワークフローマネージャは、データパイプラインを開発するときに、依存関係を処理し、パラメータやエラー処理のためのコードの鋳型の量を減らし、障害復帰を管理し、明快なパターンに従わせてくれるので、一般的に言って有用だ。

限界も考慮することが重要である:


  • Luigi はバッチジョブのために開発されているので、リアルタイムに近い処理にはたぶん役に立たない

  • 実行をトリガーしてはくれない。あなたはデータパイプラインを実行する必要がある (例えば cronjob を通して)






  1. 原文:"some of the early quick-and-dirty decisions turn out to be sub-optimal" 



  2. 原文:"quick hack"