Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
67
Help us understand the problem. What is going on with this article?

More than 1 year has passed since last update.

@studio_haneya

pythonで並列化入門 (multiprocessing.Pool)

pythonでmultiprocessingを使って並列処理する方法を調べたまとめです。

公式ドキュメント
https://docs.python.org/ja/3/library/multiprocessing.html

並列処理と平行処理

並列処理: 別のCPUコアの別のpythonプロセスで複数の処理を同時にやる
平行処理: 同じCPUコアの同じpythonプロセスで複数の処理を同時にやる

待機が多いような楽な処理は平行処理で、負荷が重い処理は並列処理でやるのが良いでしょう。今回は並列処理をmultiprocessing.Pool()でやる話です。

平行処理については別に書いたのでそちらを見てください
https://qiita.com/studio_haneya/items/a3485ea837e17e37bae9

試行環境

Windows10
python 3.6

一気にまとめて処理する (Pool.map)

こちらのコードが分かりやすかったのでちょっとだけ書き換えてやってみました
http://iatlex.com/python/parallel_first

python
import time
from multiprocessing import Pool

# 並列処理させる関数
def nijou(x):
    print('input: %d' % x)
    time.sleep(2)
    retValue = x * x
    print('double: %d' % (retValue))
    return(retValue)

if __name__ == "__main__":
    p = Pool(4) # プロセス数を4に設定
    result = p.map(nijou, range(10))  # nijou()に0,1,..,9を与えて並列演算
    print(result)

上記コードを実行すると下の結果が返ってきます。p = multiprocessing.Pool(4)で同時実行するプロセス数を指定しておいてp.map()で実行するという使い方です。p.map()の第1引数に使う関数を渡し第2引数が関数に渡す引数になります。この書き方だと渡せる引数は1つだけです。

結果
input: 0
input: 1
input: 2
input: 3
double: 0
input: 4
double: 1
input: 5
double: 4
input: 6
double: 9
input: 7
double: 16
input: 8
double: 25
input: 9
double: 36
double: 49
double: 64
double: 81
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

4つずつ実行していって、プロセスが終了するたびに次のプロセスが始まっています。返り値はlistとして返してくれます。

Pool.mapで複数引数を渡したい

p.map()が渡してくれる引数は1個だけですが、listとかでまとめちゃえば複数の値を渡すことは普通にできます。

python
import time
from multiprocessing import Pool

def nijou(inputs):
    x, y = inputs
    print('input: %d, %d' % (x, y))
    time.sleep(2)
    retValue = [x * x, y * y]
    print('double: %d, %d' % (retValue[0], retValue[1]))
    return(retValue)

if __name__ == "__main__":
    p = Pool(4)
    values = [(x, y) for x in range(4) for y in range(4)]
    print(values)
    result = p.map(nijou, values)
    print(result)

p.map()がvaluesの中の値を1個ずつ渡してくれるので、(0, 0) → (0, 1) → (0, 2)の順で渡していきます。

結果
[(0, 0), (0, 1), (0, 2), (0, 3), (1, 0), (1, 1), (1, 2), (1, 3), (2, 0), (2, 1), (2, 2), (2, 3), (3, 0), (3, 1), (3, 2), (3, 3)]
input: 0, 0
input: 0, 1
input: 0, 2
input: 0, 3
double: 0, 0
input: 1, 0
double: 0, 1
input: 1, 1
double: 0, 4
input: 1, 2
double: 0, 9
input: 1, 3
double: 1, 0
()
double: 9, 4
double: 9, 9
[[0, 0], [0, 1], [0, 4], [0, 9], [1, 0], [1, 1], [1, 4], [1, 9], [4, 0], [4, 1], [4, 4], [4, 9], [9, 0], [9, 1], [9, 4], [9, 9]]

Pool.mapで複数引数を渡す (wrapper経由)

関数が複数引数を受け取るような書き方になってる場合は、複数引数をまとめるwrapper関数をつくります。既にある関数を利用する場合はこの書き方の方がやりやすいと思います。

python
import time
from multiprocessing import Pool

def nijou(x, y):
    print('input: %d %d' % (x, y))
    time.sleep(2)
    print('double: %d %d' % ((x * x), (y * y)))

def nijou_wrapper(args):
    return nijou(*args)

if __name__ == "__main__":
    p = Pool(4)
    values = [(x, y) for x in range(4) for y in range(4)]
    print(values)
    p.map(nijou_wrapper, values)

Pool.applyで1つずつバラバラに使う

まとめてドカっと処理したいときにはPool.map()が便利ですが、様子を見ながら適宜実行したい場合などはバラバラに実行したくなると思います。その場合はPool.apply()またはPool.apply_async()を使います。Pool.apply()の場合は終わるまで待つので並列処理じゃなくなります。

python
import time
from multiprocessing import Pool, Process

def nijou(inputs):
    print(inputs)
    x = inputs
    print('input: %d' % x)
    time.sleep(2)
    retValue = x * x
    print('double: %d' % retValue)
    return(retValue)

if __name__ == "__main__":

    p = Pool(4)
    values = [x for x in range(10)]
    print(values)
    result = p.apply(nijou, args=[values[0]])
    print(result)

    p.close()

上記のような使い方は関数を普通に呼ぶのと変わらないように思えますが、p.close()すると普通に関数を呼んだときよりも上手くメモリクリアされるようですのでそういう使い方もあるのかもしれません。

Pool.apply_asyncで1つずつ並列に実行

apply_async()すると終了待ちをしないので並列処理での実行が出来るようになります。get()すると結果を受け取れるまで待機してくれる筈ですが、使い方によってはエラーが出たりするようなので、ready()を使って終わるまで監視して、ready()がTrueになったらget()して受け取るという使い方が良いと思います。この書き方ならメモリ容量を監視して様子をみながらプロセスを足していくような使い方ができて便利です。

python
import time
from multiprocessing import Pool, Process

def nijou(inputs):
    x = inputs
    print('input: %d' % x)
    time.sleep(2)
    retValue = x * x
    print('double: %d' % retValue)
    return(retValue)

if __name__ == "__main__":

    # Pool()を定義
    p = Pool()

    # プロセスを2つ非同期で実行
    result = p.apply_async(nijou, args=[3])
    result2 = p.apply_async(nijou, args=[5])

    # 1秒間隔で終了チェックして終了したら結果を表示
    for k in range(5):
        if result.ready():
            break
    print(result.get())
    print(result2.get())

    p.close()

簡単ですね。
レッツトライ!

更新履歴

20190907: apply(), apply_async()の使い方を追記しました

67
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
67
Help us understand the problem. What is going on with this article?