1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

windows環境でluigiに無理やり並列処理をさせる

Posted at

標準ライブラリの方にも手を加える改造をします
windows環境以外または並列処理をしない場合は、改造は必要ありません

はじめに

luigiとは

ジョブスケジューラーの一種です。
複数のジョブの間に依存関係がある場合、それらを正しい順序で実行してくれます。
また、ジョブの間に依存関係がない場合は並列に実行してくれます。

詳しくは公式ページを見てください。
spotify/luigi

windows環境での問題

windows環境でのみ並列化ができません。

その原因は、luigiではジョブをプロセス間でpickleを使ってシリアライズしていますが、windows環境のpickleの実装ではシリアライズできないオブジェクトがあるせいです。(多分)

解決策

無理やり改造する方法(バージョン2.2.0以上)

ライブラリを書き換えます。
書き換え対象はluigi/worker.pyと標準ライブラリのmultiprocessing/reduction.pyの二つです。

Lib/site-packages/luigi/worker.py
# importを追加
from functools import partial

# TaskProcess.__init__内部
class TaskProcess(multiprocessing.Process):
    ...
    def __init__(self, task, worker_id, result_queue, tracking_url_callback,
                 status_message_callback, use_multiprocessing=False, worker_timeout=0):
        ...
        # self.tracking_url_callback = tracking_url_callback
        self.tracking_url_callback = partial(tracking_url_callback, task)
        # self.status_message_callback = status_message_callback
        self.status_message_callback = partial(status_message_callback, task)
        ...
    ...

class worker(Config):
    ...
    # Worker._create_task_processの内部の関数を移動
    def _update_tracking_url(self, task, tracking_url):
            self._scheduler.add_task(
                task_id=task.task_id,
                worker=self._id,
                status=RUNNING,
                tracking_url=tracking_url,
            )
        
    # Worker._create_task_processの内部の関数を移動
    def _update_status_message(self, task, message):
        self._scheduler.set_task_status_message(task.task_id, message)

    def _create_task_process(self, task):
        # def update_tracking_url(tracking_url):
        #     self._scheduler.add_task(
        #         task_id=task.task_id,
        #         worker=self._id,
        #         status=RUNNING,
        #         tracking_url=tracking_url,
        #     )

        # def update_status_message(message):
        #     self._scheduler.set_task_status_message(task.task_id, message)

        return TaskProcess(
            task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
            use_multiprocessing=bool(self.worker_processes > 1),
            worker_timeout=self._config.timeout
        )
    ...
Lib/multiprocessing/reduction.py
# 冒頭のインポートの部分
# import pickle
import dill as pickle

dillはpipでインストールできます。

別の解決策

古いバージョンを使います。この場合は、あまり改造は必要ありません。

こちらは標準ライブラリには手を加えません。
windowsでluigiを並列処理させたらエラーになったけど、その解決法
Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?

最後に

私が実際に使っている限りでは問題は起きていませんが、
改造は自己責任でお願いします。

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?