LoginSignup
6
6

More than 5 years have passed since last update.

luigiでタスク実行にかかった時間を取得する

Posted at

訳あって、OSSのデータフロー制御ツールであるluigiで書かれたジョブのログを調べていたのだが、DEBUGとかの「そこは別にいいよ」というメッセージを結構大量に出す割に、実行したタスクがどれくらい時間がかかったかをログに出さないのである。

しかもログメッセージにデフォルトではタイムスタンプがついていない!

なんともなluigiさんだが、その後調べた結果、タスクの実行時間が取れる方法が分かったのでメモしておく。

環境

$ cat /etc/redhat-release 
CentOS Linux release 7.2.1511 (Core) 
$ python -V
Python 2.7.5

インストール

$ su -
# pip install luigi
# pip list | grep luigi
luigi (2.3.3)

デフォルトのログ

以下はサンプルのtop_artists.pyを動かした時に出力されるログメッセージの抜粋である。

$ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if AggregateArtists(date_interval=2012-06) is complete
DEBUG: Checking if Streams(date=2012-06-01) is complete
DEBUG: Checking if Streams(date=2012-06-02) is complete
(略)
INFO: Informed scheduler that task   AggregateArtists_2012_06_7af15faabf   has status   PENDING
INFO: Informed scheduler that task   Streams_2012_06_30_5c89db6ebe   has status   PENDING
INFO: Informed scheduler that task   Streams_2012_06_29_b5fcbcadc1   has status   PENDING
(略)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 31
INFO: [pid 28321] Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) running   Streams(date=2012-06-06)
INFO: [pid 28321] Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) done      Streams(date=2012-06-06)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   Streams_2012_06_06_f7dba06bd7   has status   DONE
(略)
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=588456947, workers=1, host=localhost.localdomain, username=hadoop, pid=28321) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 31 tasks of which:
* 31 ran successfully:
    - 1 AggregateArtists(date_interval=2012-06)
    - 30 Streams(date=2012-06-01...2012-06-30)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

実行時間の情報がない!!

Execution Summaryにいれとけよ!!(#^ω^)ビキビキ

タスクの実行時間を採る方法

luigiには内部で定義されたイベントが発生した際に呼びだされるコールバックを登録することができる。
その中に、Event.PROCESSING_TIMEという、タスクが正常終了した際に発行されるイベントがある。このイベントは内部にタスクがかかった時間を格納しているので、それを取り出すコールバックを登録しておけばよい。

すべて以下のリンク先の受け売りである。圧倒的感謝。

How to output the execution time of tasks in the luigi workflow system

コードも上のものを使って動作させると、以下のようなログメッセージになる。

実行に当たっては、カレントディレクトリにinput.txtを適当な内容で用意しておく。サンプルのコードはこのファイルを別名でコピーするタスクが実装されている。

$ cat input.txt
Hello Luigi!

実行。

$ python luigi_time_tasks_example.py --local-scheduler
DEBUG: Checking if TaskC() is complete
DEBUG: Checking if TaskB() is complete
INFO: Informed scheduler that task   TaskC__99914b932b   has status   PENDING
DEBUG: Checking if TaskA() is complete
INFO: Informed scheduler that task   TaskB__99914b932b   has status   PENDING
INFO: Informed scheduler that task   TaskA__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) running   TaskB()
### PROCESSING TIME ###: 3.0078060627
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) done      TaskB()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TaskB__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) running   TaskC()
### PROCESSING TIME ###: 6.01015210152
INFO: [pid 28797] Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) done      TaskC()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TaskC__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=876088965, workers=1, host=localhost.localdomain, username=hadoop, pid=28797) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 present dependencies were encountered:
    - 1 TaskA()
* 2 ran successfully:
    - 1 TaskB()
    - 1 TaskC()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

### PROCESSING TIME ###: (タスク実行にかかった時間[秒])という形でログが追加出力されるようになる。

もう少し正確に述べると、リンク先のサンプルコードで

class TimeTaskMixin(object):
    '''
    A mixin that when added to a luigi task, will print out
    the tasks execution time to standard out, when the task is
    finished
    '''
    @luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
    def print_execution_time(self, processing_time):
        print('### PROCESSING TIME ###: ' + str(processing_time))

というようにluigiからPROCESSING_TIMEイベントに格納されてきた値をprintしている。

processing_timeに入っていくる値の単位が何かは上のサンプルだけだとさっぱりわからないが、luigi本体の実装を見ると秒数であることがわかる

自分で実装する時はどこかのファイルに書き出したり、DBに書いたりすればいいと思われる。

このPROCESSING_TIME機構だが、結構古いバージョンから実装されているようで、1.0.17あたりから入っている

以上。

6
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
6