はじめに
Python3系に並列実行を抽象化したconcurrent.futuresモジュールがあります。
このモジュールにあるクラスExecutorにはmapメソッドが生えていて、イメージとしては処理系組み込みの高階関数mapが並列実行される感じ。
スレッド単位で並列実行するThreadPoolExecutorと、プロセス単位で並列実行するProcessPoolExecutorがそれぞれExecutorクラスの実装クラスとして実装されていて、どっちのクラスを使うか変えるだけでスレッド単位の並列実行とプロセス単位の並列実行を切り替えられるので試すのも楽。
便利だなぁと思って試していると、タイトルにある「ProcessPoolExector.mapのfuncにlambda式を渡せない」という点で転んだ。ので、ここにメモを書いておきます。
説明
Executor.mapに渡すfuncは引数一つである必要があるので、例えば引数3つ取って、最後の一つだけiterableにしたい場合はlambdaでくるむのが楽。
例えば、「水平分割したデータベースに対してクエリを流すときに並列実行を簡単に書けて便利じゃん」という感じで関数を作ると、関数の引数にはクエリ文字列とDB指定の2つが入って、クエリは全DBに対して一緒だけどDB指定だけばらばら、みたいなことが生じる。
まずはスレッド単位の並列実行で書いてみた。
from concurrent.futures import ThreadPoolExecutor
def func(a,b,i):
print('a:{0} b:{1} i:{2}'.format(a,b,i))
return a*b+i
def test(p,q):
with ThreadPoolExecutor() as executor:
for it in executor.map(lambda _ : func(p,q,_) , range(10)):
print("return:{}".format(it))
if __name__ == '__main__':
test(2,4)
実行すると、こんな感じ。
a:2 b:4 i:0a:2 b:4 i:4a:2 b:4 i:2a:2 b:4 i:1a:2 b:4 i:5a:2 b:4 i:6a:2 b:4 i:3a:2 b:4 i:7a:2 b:4 i:8a:2 b:4 i:9
return:8
return:9
return:10
return:11
return:12
return:13
return:14
return:15
return:16
return:17
なるほどな、という感じで、ここでThreadPoolExecutor
をProcessPoolExecutor
に変えてみる。
from concurrent.futures import ProcessPoolExecutor
def func(a,b,i):
print('a:{0} b:{1} i:{2}'.format(a,b,i))
return a*b+i
def test(p,q):
with ProcessPoolExecutor() as executor:
for it in executor.map(lambda _ : func(p,q,_) , range(10)):
print("return:{0}".format(it))
if __name__ == '__main__':
test(2,4)
ThreadをProcessに変えるだけ。これだけでスレッド単位の並列実行からプロセス単位の並列実行に変わる!
で、実行すると
Traceback (most recent call last):
File "E:\Python35\lib\multiprocessing\queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "E:\Python35\lib\multiprocessing\reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'test.<locals>.<lambda>'
死にました。理由は簡単で、pickleがlambda式に対応していないから(pickleのドキュメントにも書いてある)。
で、対応策を考えてみた。
ちなみに、ThreadPoolExecutorの場合はpickle出る幕ないのでちゃんと動くんですね。なるほどな。
ProcessPoolExector.mapに複数の引数を持つfuncを渡したい
pickleの代わりにdillを使う
lambda式にも対応したpickleの拡張であるdillを使えばいいんじゃね、という発想。dillをpip installした上でmultiprocessing\reduction.pyのimport pickle
をimport dill as pickle
に変えてみた。
これで動いた。
でも__動いたはいいんだけど標準ライブラリいじるのいかんでしょ__という気持ちで以下続行。
lambda式でなく通常の関数として書く
こんな感じ
from concurrent.futures import ProcessPoolExecutor
def func(a,b,i):
print('a:{0} b:{1} i:{2}'.format(a,b,i))
return a*b+i
def func2(a):
return func(*a)
def test(p,q):
with ProcessPoolExecutor() as executor:
for it in executor.map(func2 , [(p,q,i) for i in range(10)]):
print("return:{0}".format(it))
if __name__ == '__main__':
test(2,4)
これもちゃんと動く。動くけど、なんかfunc2
邪魔くさい感じも?
functools.partial を使う
今回は前半の引数を固定した関数を生成すればいいので、functools.partialを使ってもいけそう。
import functools
from concurrent.futures import ProcessPoolExecutor
def func(a,b,i):
print('a:{0} b:{1} i:{2}'.format(a,b,i))
return a*b+i
def test(p,q):
with ProcessPoolExecutor() as executor:
for it in executor.map(functools.partial(func,p,q) , range(10)):
print("return:{0}".format(it))
if __name__ == '__main__':
test(2,4)
これも期待の動作をする。
けど、制限があって、functools.partialで生成したpartialオブジェクトを呼ぶ際に引数をつけると、生成時に渡された引数に__続いて__渡される。
なので、関数定義時にdef func(a,b,i)
だったらiにmapで渡したiterableの展開された結果が入るけど、def func(i,a,b)
と定義していたらbにiterableの展開結果が入る。
おわりに
そもそもスレッド単位の並列実行とプロセス単位の並列実行どっちがいいか?という話は既存のドキュメントぐぐったらいろいろ出てきたので、ここでは述べていません。
concurrent.futures便利!!!!