LoginSignup
3
1

More than 5 years have passed since last update.

multiprocessing.pool.Pool.mapの挙動

Last updated at Posted at 2017-08-31

multiprocessingパッケージはプロセスを利用して並行処理を行うためのライブラリです。で、タイトルのmultiprocessing.pool.Pool.mapはmapの並行処理版です。

以下の様な使い方になります。

from multiprocessing import Pool
import time

def iter():
    for i in range(100):
        print("{0} : iter {1}".format(time.time(), i))
        yield i
    time.sleep(2)
    print("{0} : iter finished".format(time.time()))

def fun(n):
    print("{0} : {1}".format(time.time(), n))

with Pool(4) as p: #4プロセスでmapを行う
    p.map(fun, iter())

さて、これを実行するとどうなるでしょうか。結果は以下のように、iter関数で生成されるイテレータがすべてのイテレーションを終了し終えてから、マルチプロセスで関数を適用しています。

$ python mp.py  | sort -n
1424411166.882628 : iter 0
1424411166.882708 : iter 1
1424411166.882714 : iter 2
1424411166.882725 : iter 3
1424411166.88273 : iter 4
1424411166.882734 : iter 5
1424411166.882738 : iter 6
1424411166.882741 : iter 7
1424411166.882745 : iter 8
1424411166.882748 : iter 9
1424411166.882752 : iter 10
1424411166.882755 : iter 11
1424411166.882758 : iter 12
1424411166.882763 : iter 13
1424411166.882766 : iter 14
1424411166.88277 : iter 15
1424411166.882773 : iter 16
1424411166.882776 : iter 17
1424411166.88278 : iter 18
1424411166.882784 : iter 19
1424411168.884807 : iter finished
1424411168.890891 : 0
1424411168.891006 : 2
1424411168.891053 : 1
1424411168.891174 : 3
1424411168.891351 : 4
1424411168.891527 : 5
1424411168.891707 : 8
1424411168.89173 : 9
1424411168.89206 : 10
1424411168.892085 : 11
1424411168.892139 : 12
1424411168.892162 : 13
1424411168.892473 : 14
1424411168.892483 : 16
1424411168.892495 : 15
1424411168.892506 : 17
1424411168.892599 : 18
1424411168.892619 : 19

実装を見てみると以下のようになっていました。

multiprocessing/pool.py
    def map(self, func, iterable, chunksize=None):
        '''
        Apply `func` to each element in `iterable`, collecting the results
        in a list that is returned.
        '''
        return self._map_async(func, iterable, mapstar, chunksize).get()
        :
        :
    def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
            error_callback=None):
        '''
        Helper function to implement map, starmap and their async counterparts.
        '''
        if self._state != RUN:
            raise ValueError("Pool not running")
        if not hasattr(iterable, '__len__'):
            iterable = list(iterable)

        if chunksize is None:
            chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
            if extra:
                chunksize += 1
        if len(iterable) == 0:
            chunksize = 0

        task_batches = Pool._get_tasks(func, iterable, chunksize)
        result = MapResult(self._cache, chunksize, len(iterable), callback,
                           error_callback=error_callback)
        self._taskqueue.put((((result._job, i, mapper, (x,), {})
                              for i, x in enumerate(task_batches)), None))
        return result


if not hasattr(iterable, '__len__')lenプロパティがなければ、list(iterable)によってイテレータをリストに変換しているようですね。なので、一旦関数を適用する前に、イテレータがすべて終了しているのですね。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1