標準ライブラリの方にも手を加える改造をします
windows環境以外または並列処理をしない場合は、改造は必要ありません
はじめに
luigiとは
ジョブスケジューラーの一種です。
複数のジョブの間に依存関係がある場合、それらを正しい順序で実行してくれます。
また、ジョブの間に依存関係がない場合は並列に実行してくれます。
詳しくは公式ページを見てください。
spotify/luigi
windows環境での問題
windows環境でのみ並列化ができません。
その原因は、luigiではジョブをプロセス間でpickleを使ってシリアライズしていますが、windows環境のpickleの実装ではシリアライズできないオブジェクトがあるせいです。(多分)
解決策
無理やり改造する方法(バージョン2.2.0以上)
ライブラリを書き換えます。
書き換え対象はluigi/worker.pyと標準ライブラリのmultiprocessing/reduction.pyの二つです。
# importを追加
from functools import partial
# TaskProcess.__init__内部
class TaskProcess(multiprocessing.Process):
...
def __init__(self, task, worker_id, result_queue, tracking_url_callback,
status_message_callback, use_multiprocessing=False, worker_timeout=0):
...
# self.tracking_url_callback = tracking_url_callback
self.tracking_url_callback = partial(tracking_url_callback, task)
# self.status_message_callback = status_message_callback
self.status_message_callback = partial(status_message_callback, task)
...
...
class worker(Config):
...
# Worker._create_task_processの内部の関数を移動
def _update_tracking_url(self, task, tracking_url):
self._scheduler.add_task(
task_id=task.task_id,
worker=self._id,
status=RUNNING,
tracking_url=tracking_url,
)
# Worker._create_task_processの内部の関数を移動
def _update_status_message(self, task, message):
self._scheduler.set_task_status_message(task.task_id, message)
def _create_task_process(self, task):
# def update_tracking_url(tracking_url):
# self._scheduler.add_task(
# task_id=task.task_id,
# worker=self._id,
# status=RUNNING,
# tracking_url=tracking_url,
# )
# def update_status_message(message):
# self._scheduler.set_task_status_message(task.task_id, message)
return TaskProcess(
task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
use_multiprocessing=bool(self.worker_processes > 1),
worker_timeout=self._config.timeout
)
...
# 冒頭のインポートの部分
# import pickle
import dill as pickle
dillはpipでインストールできます。
別の解決策
古いバージョンを使います。この場合は、あまり改造は必要ありません。
こちらは標準ライブラリには手を加えません。
windowsでluigiを並列処理させたらエラーになったけど、その解決法
Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?
最後に
私が実際に使っている限りでは問題は起きていませんが、
改造は自己責任でお願いします。