4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

ProcessPoolExector.mapのfuncにlambda式を渡せない

Last updated at Posted at 2018-10-14

はじめに

Python3系に並列実行を抽象化したconcurrent.futuresモジュールがあります。

このモジュールにあるクラスExecutorにはmapメソッドが生えていて、イメージとしては処理系組み込みの高階関数mapが並列実行される感じ。

スレッド単位で並列実行するThreadPoolExecutorと、プロセス単位で並列実行するProcessPoolExecutorがそれぞれExecutorクラスの実装クラスとして実装されていて、どっちのクラスを使うか変えるだけでスレッド単位の並列実行とプロセス単位の並列実行を切り替えられるので試すのも楽。

便利だなぁと思って試していると、タイトルにある「ProcessPoolExector.mapのfuncにlambda式を渡せない」という点で転んだ。ので、ここにメモを書いておきます。

説明

Executor.mapに渡すfuncは引数一つである必要があるので、例えば引数3つ取って、最後の一つだけiterableにしたい場合はlambdaでくるむのが楽。

例えば、「水平分割したデータベースに対してクエリを流すときに並列実行を簡単に書けて便利じゃん」という感じで関数を作ると、関数の引数にはクエリ文字列とDB指定の2つが入って、クエリは全DBに対して一緒だけどDB指定だけばらばら、みたいなことが生じる。

まずはスレッド単位の並列実行で書いてみた。

from concurrent.futures import ThreadPoolExecutor

def func(a,b,i):
    print('a:{0} b:{1} i:{2}'.format(a,b,i))
    return a*b+i

def test(p,q):
    with ThreadPoolExecutor() as executor:
        for it in executor.map(lambda _ : func(p,q,_) , range(10)):
            print("return:{}".format(it))

if __name__ == '__main__':
   test(2,4)

実行すると、こんな感じ。

a:2 b:4 i:0a:2 b:4 i:4a:2 b:4 i:2a:2 b:4 i:1a:2 b:4 i:5a:2 b:4 i:6a:2 b:4 i:3a:2 b:4 i:7a:2 b:4 i:8a:2 b:4 i:9
return:8
return:9
return:10
return:11
return:12
return:13
return:14
return:15
return:16
return:17

なるほどな、という感じで、ここでThreadPoolExecutorProcessPoolExecutorに変えてみる。

from concurrent.futures import ProcessPoolExecutor

def func(a,b,i):
    print('a:{0} b:{1} i:{2}'.format(a,b,i))
    return a*b+i

def test(p,q):
    with ProcessPoolExecutor() as executor:
        for it in executor.map(lambda _ : func(p,q,_) , range(10)):
            print("return:{0}".format(it))

if __name__ == '__main__':
   test(2,4)

ThreadをProcessに変えるだけ。これだけでスレッド単位の並列実行からプロセス単位の並列実行に変わる!

で、実行すると

Traceback (most recent call last):
  File "E:\Python35\lib\multiprocessing\queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "E:\Python35\lib\multiprocessing\reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'test.<locals>.<lambda>'

死にました。理由は簡単で、pickleがlambda式に対応していないから(pickleのドキュメントにも書いてある)。

で、対応策を考えてみた。

ちなみに、ThreadPoolExecutorの場合はpickle出る幕ないのでちゃんと動くんですね。なるほどな。

ProcessPoolExector.mapに複数の引数を持つfuncを渡したい

pickleの代わりにdillを使う

lambda式にも対応したpickleの拡張であるdillを使えばいいんじゃね、という発想。dillをpip installした上でmultiprocessing\reduction.pyimport pickleimport dill as pickleに変えてみた。

これで動いた。

でも__動いたはいいんだけど標準ライブラリいじるのいかんでしょ__という気持ちで以下続行。

lambda式でなく通常の関数として書く

こんな感じ

from concurrent.futures import ProcessPoolExecutor

def func(a,b,i):
    print('a:{0} b:{1} i:{2}'.format(a,b,i))
    return a*b+i

def func2(a):
    return func(*a)

def test(p,q):
    with ProcessPoolExecutor() as executor:
        for it in executor.map(func2 , [(p,q,i) for i in range(10)]):
            print("return:{0}".format(it))

if __name__ == '__main__':
   test(2,4)

これもちゃんと動く。動くけど、なんかfunc2邪魔くさい感じも?

functools.partial を使う

今回は前半の引数を固定した関数を生成すればいいので、functools.partialを使ってもいけそう。

import functools
from concurrent.futures import ProcessPoolExecutor

def func(a,b,i):
    print('a:{0} b:{1} i:{2}'.format(a,b,i))
    return a*b+i

def test(p,q):
    with ProcessPoolExecutor() as executor:
        for it in executor.map(functools.partial(func,p,q) , range(10)):
            print("return:{0}".format(it))

if __name__ == '__main__':
   test(2,4)

これも期待の動作をする。

けど、制限があって、functools.partialで生成したpartialオブジェクトを呼ぶ際に引数をつけると、生成時に渡された引数に__続いて__渡される。

なので、関数定義時にdef func(a,b,i)だったらiにmapで渡したiterableの展開された結果が入るけど、def func(i,a,b)と定義していたらbにiterableの展開結果が入る。

おわりに

そもそもスレッド単位の並列実行とプロセス単位の並列実行どっちがいいか?という話は既存のドキュメントぐぐったらいろいろ出てきたので、ここでは述べていません。

concurrent.futures便利!!!!

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?