http://qiita.com/pilot/items/69cbae56d85a443fb623#2%E5%8F%B0%E3%81%A8%E3%82%82%E6%8C%87%E5%AE%9A%E3%81%AA%E3%81%97%E3%81%AB%E3%81%99%E3%82%8B%E3%81%A8 のagentが複数のDigdagサーバで動いている場合の話の続き
digdag 0.8.18にて
td>でTreasureDataに数秒かかるクエリを投げた際のタスクログ
td>を使った(TreasureDataにクエリを投げる)タスクをpush・実行してみた
当該クエリの実行には数秒を要する
すると、途中まで1号機、以降は2号機にタスク動作ログが出た
digファイルにはtask1(td>)とtask2(echo>)の2タスクしか定義していないがtask1の動作ログは5回出た
1号機にて
2016-11-01 12:47:54.958 +0900 [INFO] (XNIO-1 task-8): Starting a new session project id=18 workflow name=xxx session_time=2016-11-01T12:47:54+09:00
2016-11-01 12:47:54.984 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=452 +xxx+task1
- task1実行ログ (TreasureData接続?)
2016-11-01 12:47:55.939 +0900 [DEBUG] (0103@+xxx+task1): Retrying task io.digdag.spi.TaskExecutionException: Retrying this task after 0 seconds
2016-11-01 12:47:55.959 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=452 +xxx+task1
- task1実行ログ (SQL発行)
2016-11-01 12:47:57.533 +0900 [DEBUG] (0103@+xxx+task1): Retrying task io.digdag.spi.TaskExecutionException: Retrying this task after 1 seconds
2号機にて
2016-11-01 12:47:58.639 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=452 +xxx+task1
- task1実行ログ (ジョブステータス問い合わせ)
2016-11-01 12:48:00.379 +0900 [DEBUG] (0093@+xxx+task1): Retrying task io.digdag.spi.TaskExecutionException: Retrying this task after 1 seconds
2016-11-01 12:48:02.004 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=452 +xxx+task1
- task1実行ログ (ジョブステータス問い合わせ)
2016-11-01 12:48:03.307 +0900 [DEBUG] (0093@+xxx+task1): Retrying task io.digdag.spi.TaskExecutionException: Retrying this task after 2 seconds
2016-11-01 12:48:06.552 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=452 +xxx+task1
- task1実行ログ (ジョブステータス問い合わせ)
2016-11-01 12:48:07.902 +0900 [DEBUG] (workflow-executor-0): Queuing task of attempt_id=59: id=453 +xxx+task2
- task2実行ログ (td.last_job_idをecho>)
ログ内容の理解
TreasureDataのクエリ実行は非同期実行なので、SQLを投げた後に「終わった?」との問い合わせを投げる必要があり、「まだだよ」との回答であれば数秒後にまた「終わった?」を投げる(「終わったよ」との回答を得るまで繰り返す)
その「数秒後の問い合わせ」もキューにそのためのタスクを入れることで実現している(idが452のまま変わらないので新しいタスクを入れるのではなく元のタスクをまた入れているだけか)
そしてどっちの号機のagentがそのタスクをキューから取り出すのかは不定なので、結果としてtd>オペレータは複数号機の間を行ったり来たりしながら実行されることになる
(もっと長時間かかるクエリの場合、1号機→2号機→また1号機、となることもあった)
また、キューに入っていた前タスクが完了するとworkflow executorがそれを検知して次のタスクをキューに入れる仕事をしていることも見て取れた