multiprocessingパッケージはプロセスを利用して並行処理を行うためのライブラリです。で、タイトルのmultiprocessing.pool.Pool.map
はmapの並行処理版です。
以下の様な使い方になります。
from multiprocessing import Pool
import time
def iter():
for i in range(100):
print("{0} : iter {1}".format(time.time(), i))
yield i
time.sleep(2)
print("{0} : iter finished".format(time.time()))
def fun(n):
print("{0} : {1}".format(time.time(), n))
with Pool(4) as p: #4プロセスでmapを行う
p.map(fun, iter())
さて、これを実行するとどうなるでしょうか。結果は以下のように、iter関数で生成されるイテレータがすべてのイテレーションを終了し終えてから、マルチプロセスで関数を適用しています。
$ python mp.py | sort -n
1424411166.882628 : iter 0
1424411166.882708 : iter 1
1424411166.882714 : iter 2
1424411166.882725 : iter 3
1424411166.88273 : iter 4
1424411166.882734 : iter 5
1424411166.882738 : iter 6
1424411166.882741 : iter 7
1424411166.882745 : iter 8
1424411166.882748 : iter 9
1424411166.882752 : iter 10
1424411166.882755 : iter 11
1424411166.882758 : iter 12
1424411166.882763 : iter 13
1424411166.882766 : iter 14
1424411166.88277 : iter 15
1424411166.882773 : iter 16
1424411166.882776 : iter 17
1424411166.88278 : iter 18
1424411166.882784 : iter 19
1424411168.884807 : iter finished
1424411168.890891 : 0
1424411168.891006 : 2
1424411168.891053 : 1
1424411168.891174 : 3
1424411168.891351 : 4
1424411168.891527 : 5
1424411168.891707 : 8
1424411168.89173 : 9
1424411168.89206 : 10
1424411168.892085 : 11
1424411168.892139 : 12
1424411168.892162 : 13
1424411168.892473 : 14
1424411168.892483 : 16
1424411168.892495 : 15
1424411168.892506 : 17
1424411168.892599 : 18
1424411168.892619 : 19
実装を見てみると以下のようになっていました。
multiprocessing/pool.py
def map(self, func, iterable, chunksize=None):
'''
Apply `func` to each element in `iterable`, collecting the results
in a list that is returned.
'''
return self._map_async(func, iterable, mapstar, chunksize).get()
:
:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
if not hasattr(iterable, '__len__')
で__len__プロパティがなければ、list(iterable)
によってイテレータをリストに変換しているようですね。なので、一旦関数を適用する前に、イテレータがすべて終了しているのですね。