8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

クラス内のProcessPoolExecutorにselfを渡してはいけない

Posted at

環境

Python 3.7.4
Windows 10

ProcessPoolExecutorが止まる

並列処理のためにProcessPoolExecutorを使用した際にエラーが発生しました。
以下がそのコードです。

import concurrent
import concurrent.futures

class Process:
    def __init__(self):
        self.process_list = []
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
        # self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)  # スレッド処理なら動く
    
    def _process(self, n):
        # 処理部
        return 1
    
    def start_process(self, n):
        # 実行部
        self.process_list.append(self.executor.submit(self._process, n)) 
    
    def get_result(self):
        # 結果取得
        print("wait...")
        concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
        print("all processes were finished")
        res_list = [res.result() for res in self.process_list]
        print("got result")
        self.executor.shutdown(wait=True)
        print("shutdown")
        self.process_list = []
        return res_list

if __name__ == "__main__":
    process = Process()
    for i in range(10):
        process.start_process(i)
    result = process.get_result()
    print(result)

このコードを実行すると、res_list = [res.result() for res in self.process_list]の部分で、

TypeError: can't pickle _thread.RLock objects

というエラーが発生します。

ちなみに、ThreadPoolExecutorなら動きます。

解決方法

処理部を

def _process(self, n):
    # 処理部
    return 1

から

@staticmethod
def _process(n):
    # 処理部
    return 1

に変更します。処理部で自身(self)の持つインスタンス変数を使用していた場合は引数に加えます。

また、自身のインスタンスメソッドを使用していた場合は、そのメソッドもselfを引数に取らないもの(staticmethod, classmethod等)に変更する必要があります。

ProcessPoolExecutorに渡すメソッドの引数にpickle化できないオブジェクト(ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock等)を含むオブジェクトを渡さなければ大丈夫だと思います。

原因

この問題は、クラスのインスタンスにProcessPoolExecutor、つまりpickle化できないオブジェクトを含んでおり、それをインスタンスメソッドのself引数によってマルチプロセスに渡していることが原因です。

ProcessPoolExecutorのドキュメントによると、

ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
(引用元: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)

と書いてあるように、pickle化可能なオブジェクトのみ実行&return可能だそうです。

そのため、インスタンスメソッドを用いない(引数にselfを取らないメソッドを用いる)ことで回避できます。

修正例

インスタンス変数を使っていた場合の修正例です。self.calc, self.hogeがインスタンス変数です。

"""
インスタンス変数を使用していた場合
"""
import concurrent
import concurrent.futures

class Calc:
    def __init__(self, a):
        self.a = a
    
    def calc(self, n):
        return self.a + n

class Process:
    def __init__(self):
        self.process_list = []
        self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
        # self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
        self.calc = Calc(10)  # 処理を行うクラスのインスタンス
        self.hoge = 3  # インスタンス変数
    
    def _process_bad(self, n):
        res = self.calc.calc(n) * self.hoge
        return res
    
    @staticmethod
    def _process(calc, n, hoge):
        res = calc.calc(n) * hoge
        return res
    
    def start_process(self, n):
        # 実行部
        # self.process_list.append(self.executor.submit(self._process_bad, n))  # NG
        self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge))  # OK
    
    def get_result(self):
       # 省略

if __name__ == "__main__":
    process = Process()
    for i in range(10):
        process.start_process(i)
    result = process.get_result()
    print(result)

_process_bad()のように書いていた場合は、_process()のようにメソッド内で使用する変数をすべて引数に渡す必要があります。

また、Calcクラスにもpickle化できないオブジェクトを含まないようにしなければなりません。

ちなみにstaticmethodにする以外に、classmethodにしたりクラス外のメソッドを呼び出したりしても動きます。

# クラスメソッドにした例
@classmethod
def _process(cls, calc, n, hoge):
    res = calc.calc(n) * hoge
    return res

おわりに

私がこの問題に遭遇したときは以下のように、結果取得前にshutdownしていました。

def get_result(self):
    # 結果取得
    self.executor.shutdown(wait=True)  # 取得前にshutdown
    res_list = [res.result() for res in self.process_list]
    self.process_list = []
    return res_list

こうするとエラーメッセージが表示されないまま止まってしまいます。

また、pickle化できないものは戻り値にさえ入れなければ問題ないという勘違いもしており、しばらく原因がわかりませんでした。

エラーに苦しんで書いたマルチプロセス処理ですが、私がやりたかった処理ではマルチスレッドと大差ない実行時間となりました。

外部ライブラリを使う場合はマルチスレッド処理でも複数のCPUを使ってくれることがあるようです。

参考

[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5)
[2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (ProcessPoolExector.mapのfuncにlambda式を渡せない)
[3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (concurrent.futures使い方メモ)
[4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. concurrent.futures -- 並列タスク実行)
[5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (ProcessPoolExector.mapのfuncにlambda式を渡せない)
[6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Pythonのthreadingとmultiprocessingを完全理解)

8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?