訳あって、OSSのデータフロー制御ツールであるluigi
で書かれたジョブのログを調べていたのだが、DEBUG
とかの「そこは別にいいよ」というメッセージを結構大量に出す割に、実行したタスクがどれくらい時間がかかったかをログに出さないのである。
しかもログメッセージにデフォルトではタイムスタンプがついていない!
なんともなluigi
さんだが、その後調べた結果、タスクの実行時間が取れる方法が分かったのでメモしておく。
環境
$ cat /etc/redhat-release
CentOS Linux release 7.2.1511 (Core)
$ python -V
Python 2.7.5
インストール
$ su -
# pip install luigi
# pip list | grep luigi
luigi (2.3.3)
デフォルトのログ
以下はサンプルのtop_artists.pyを動かした時に出力されるログメッセージの抜粋である。
$ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if AggregateArtists(date_interval=2012-06) is complete
DEBUG: Checking if Streams(date=2012-06-01) is complete
DEBUG: Checking if Streams(date=2012-06-02) is complete
(略)
INFO: Informed scheduler that task AggregateArtists_2012_06_7af15faabf has status PENDING
INFO: Informed scheduler that task Streams_2012_06_30_5c89db6ebe has status PENDING
INFO: Informed scheduler that task Streams_2012_06_29_b5fcbcadc1 has status PENDING
(略)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 31
INFO: [pid 28321] Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) running Streams(date=2012-06-06)
INFO: [pid 28321] Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) done Streams(date=2012-06-06)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task Streams_2012_06_06_f7dba06bd7 has status DONE
(略)
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 31 tasks of which:
* 31 ran successfully:
- 1 AggregateArtists(date_interval=2012-06)
- 30 Streams(date=2012-06-01...2012-06-30)
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
実行時間の情報がない!!
Execution Summaryにいれとけよ!!(#^ω^)ビキビキ
タスクの実行時間を採る方法
luigiには内部で定義されたイベントが発生した際に呼びだされるコールバックを登録することができる。
その中に、Event.PROCESSING_TIME
という、タスクが正常終了した際に発行されるイベントがある。このイベントは内部にタスクがかかった時間を格納しているので、それを取り出すコールバックを登録しておけばよい。
すべて以下のリンク先の受け売りである。圧倒的感謝。
How to output the execution time of tasks in the luigi workflow system
コードも上のものを使って動作させると、以下のようなログメッセージになる。
実行に当たっては、カレントディレクトリにinput.txt
を適当な内容で用意しておく。サンプルのコードはこのファイルを別名でコピーするタスクが実装されている。
$ cat input.txt
Hello Luigi!
実行。
$ python luigi_time_tasks_example.py --local-scheduler
DEBUG: Checking if TaskC() is complete
DEBUG: Checking if TaskB() is complete
INFO: Informed scheduler that task TaskC__99914b932b has status PENDING
DEBUG: Checking if TaskA() is complete
INFO: Informed scheduler that task TaskB__99914b932b has status PENDING
INFO: Informed scheduler that task TaskA__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) running TaskB()
### PROCESSING TIME ###: 3.0078060627
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) done TaskB()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task TaskB__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) running TaskC()
### PROCESSING TIME ###: 6.01015210152
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) done TaskC()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task TaskC__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 present dependencies were encountered:
- 1 TaskA()
* 2 ran successfully:
- 1 TaskB()
- 1 TaskC()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
### PROCESSING TIME ###: (タスク実行にかかった時間[秒])
という形でログが追加出力されるようになる。
もう少し正確に述べると、リンク先のサンプルコードで
class TimeTaskMixin(object):
'''
A mixin that when added to a luigi task, will print out
the tasks execution time to standard out, when the task is
finished
'''
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def print_execution_time(self, processing_time):
print('### PROCESSING TIME ###: ' + str(processing_time))
というようにluigi
からPROCESSING_TIME
イベントに格納されてきた値をprint
している。
processing_time
に入っていくる値の単位が何かは上のサンプルだけだとさっぱりわからないが、luigi
本体の実装を見ると秒数であることがわかる
自分で実装する時はどこかのファイルに書き出したり、DBに書いたりすればいいと思われる。
このPROCESSING_TIME
機構だが、結構古いバージョンから実装されているようで、1.0.17あたりから入っている
以上。