環境
Python 3.7.4
Windows 10
ProcessPoolExecutorが止まる
並列処理のためにProcessPoolExecutorを使用した際にエラーが発生しました。
以下がそのコードです。
import concurrent
import concurrent.futures
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) # スレッド処理なら動く
def _process(self, n):
# 処理部
return 1
def start_process(self, n):
# 実行部
self.process_list.append(self.executor.submit(self._process, n))
def get_result(self):
# 結果取得
print("wait...")
concurrent.futures.wait(self.process_list, return_when=concurrent.futures.FIRST_EXCEPTION)
print("all processes were finished")
res_list = [res.result() for res in self.process_list]
print("got result")
self.executor.shutdown(wait=True)
print("shutdown")
self.process_list = []
return res_list
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
このコードを実行すると、res_list = [res.result() for res in self.process_list]
の部分で、
TypeError: can't pickle _thread.RLock objects
というエラーが発生します。
ちなみに、ThreadPoolExecutorなら動きます。
解決方法
処理部を
def _process(self, n):
# 処理部
return 1
から
@staticmethod
def _process(n):
# 処理部
return 1
に変更します。処理部で自身(self)の持つインスタンス変数を使用していた場合は引数に加えます。
また、自身のインスタンスメソッドを使用していた場合は、そのメソッドもselfを引数に取らないもの(staticmethod, classmethod等)に変更する必要があります。
ProcessPoolExecutorに渡すメソッドの引数にpickle化できないオブジェクト(ProcessPoolExecutor, queue.Queue, threading.Lock, threading.RLock等)を含むオブジェクトを渡さなければ大丈夫だと思います。
原因
この問題は、クラスのインスタンスにProcessPoolExecutor、つまりpickle化できないオブジェクトを含んでおり、それをインスタンスメソッドのself引数によってマルチプロセスに渡していることが原因です。
ProcessPoolExecutorのドキュメントによると、
ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
(引用元: https://docs.python.org/ja/3/library/concurrent.futures.html#processpoolexecutor)
と書いてあるように、pickle化可能なオブジェクトのみ実行&return可能だそうです。
そのため、インスタンスメソッドを用いない(引数にselfを取らないメソッドを用いる)ことで回避できます。
修正例
インスタンス変数を使っていた場合の修正例です。self.calc
, self.hoge
がインスタンス変数です。
"""
インスタンス変数を使用していた場合
"""
import concurrent
import concurrent.futures
class Calc:
def __init__(self, a):
self.a = a
def calc(self, n):
return self.a + n
class Process:
def __init__(self):
self.process_list = []
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
# self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
self.calc = Calc(10) # 処理を行うクラスのインスタンス
self.hoge = 3 # インスタンス変数
def _process_bad(self, n):
res = self.calc.calc(n) * self.hoge
return res
@staticmethod
def _process(calc, n, hoge):
res = calc.calc(n) * hoge
return res
def start_process(self, n):
# 実行部
# self.process_list.append(self.executor.submit(self._process_bad, n)) # NG
self.process_list.append(self.executor.submit(self._process, self.calc, n, self.hoge)) # OK
def get_result(self):
# 省略
if __name__ == "__main__":
process = Process()
for i in range(10):
process.start_process(i)
result = process.get_result()
print(result)
_process_bad()
のように書いていた場合は、_process()
のようにメソッド内で使用する変数をすべて引数に渡す必要があります。
また、Calc
クラスにもpickle化できないオブジェクトを含まないようにしなければなりません。
ちなみにstaticmethodにする以外に、classmethodにしたりクラス外のメソッドを呼び出したりしても動きます。
# クラスメソッドにした例
@classmethod
def _process(cls, calc, n, hoge):
res = calc.calc(n) * hoge
return res
おわりに
私がこの問題に遭遇したときは以下のように、結果取得前にshutdownしていました。
def get_result(self):
# 結果取得
self.executor.shutdown(wait=True) # 取得前にshutdown
res_list = [res.result() for res in self.process_list]
self.process_list = []
return res_list
こうするとエラーメッセージが表示されないまま止まってしまいます。
また、pickle化できないものは戻り値にさえ入れなければ問題ないという勘違いもしており、しばらく原因がわかりませんでした。
エラーに苦しんで書いたマルチプロセス処理ですが、私がやりたかった処理ではマルチスレッドと大差ない実行時間となりました。
外部ライブラリを使う場合はマルチスレッド処理でも複数のCPUを使ってくれることがあるようです。
参考
[1] https://bugs.python.org/issue29423 (using concurrent.futures.ProcessPoolExecutor in class giving 'TypeError: can't pickle _thread.RLock objects' in 3.6, but not 3.5)
[2] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (ProcessPoolExector.mapのfuncにlambda式を渡せない)
[3] https://qiita.com/kokumura/items/2e3afc1034d5aa7c6012 (concurrent.futures使い方メモ)
[4] https://docs.python.org/ja/3.6/library/concurrent.futures.html (17.4. concurrent.futures -- 並列タスク実行)
[5] https://qiita.com/walkure/items/2751b5b8932873e7a5bf (ProcessPoolExector.mapのfuncにlambda式を渡せない)
[6] https://qiita.com/kaitolucifer/items/e4ace07bd8e112388c75#4-concurrentfutures (Pythonのthreadingとmultiprocessingを完全理解)