pythonでmultiprocessingを使って並列処理する方法を調べたまとめです。
公式ドキュメント
https://docs.python.org/ja/3/library/multiprocessing.html
並列処理と平行処理
並列処理: 別のCPUコアの別のpythonプロセスで複数の処理を同時にやる
平行処理: 同じCPUコアの同じpythonプロセスで複数の処理を同時にやる
待機が多いような楽な処理は平行処理で、負荷が重い処理は並列処理でやるのが良いでしょう。今回は並列処理をmultiprocessing.Pool()でやる話です。
平行処理については別に書いたのでそちらを見てください
https://qiita.com/studio_haneya/items/a3485ea837e17e37bae9
試行環境
Windows10
python 3.6
一気にまとめて処理する (Pool.map)
こちらのコードが分かりやすかったのでちょっとだけ書き換えてやってみました
http://iatlex.com/python/parallel_first
import time
from multiprocessing import Pool
# 並列処理させる関数
def nijou(x):
print('input: %d' % x)
time.sleep(2)
retValue = x * x
print('double: %d' % (retValue))
return(retValue)
if __name__ == "__main__":
p = Pool(4) # プロセス数を4に設定
result = p.map(nijou, range(10)) # nijou()に0,1,..,9を与えて並列演算
print(result)
上記コードを実行すると下の結果が返ってきます。p = multiprocessing.Pool(4)で同時実行するプロセス数を指定しておいてp.map()で実行するという使い方です。p.map()の第1引数に使う関数を渡し第2引数が関数に渡す引数になります。この書き方だと渡せる引数は1つだけです。
input: 0
input: 1
input: 2
input: 3
double: 0
input: 4
double: 1
input: 5
double: 4
input: 6
double: 9
input: 7
double: 16
input: 8
double: 25
input: 9
double: 36
double: 49
double: 64
double: 81
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4つずつ実行していって、プロセスが終了するたびに次のプロセスが始まっています。返り値はlistとして返してくれます。
Pool.mapで複数引数を渡したい
p.map()が渡してくれる引数は1個だけですが、listとかでまとめちゃえば複数の値を渡すことは普通にできます。
import time
from multiprocessing import Pool
def nijou(inputs):
x, y = inputs
print('input: %d, %d' % (x, y))
time.sleep(2)
retValue = [x * x, y * y]
print('double: %d, %d' % (retValue[0], retValue[1]))
return(retValue)
if __name__ == "__main__":
p = Pool(4)
values = [(x, y) for x in range(4) for y in range(4)]
print(values)
result = p.map(nijou, values)
print(result)
p.map()がvaluesの中の値を1個ずつ渡してくれるので、(0, 0) → (0, 1) → (0, 2)の順で渡していきます。
[(0, 0), (0, 1), (0, 2), (0, 3), (1, 0), (1, 1), (1, 2), (1, 3), (2, 0), (2, 1), (2, 2), (2, 3), (3, 0), (3, 1), (3, 2), (3, 3)]
input: 0, 0
input: 0, 1
input: 0, 2
input: 0, 3
double: 0, 0
input: 1, 0
double: 0, 1
input: 1, 1
double: 0, 4
input: 1, 2
double: 0, 9
input: 1, 3
double: 1, 0
(略)
double: 9, 4
double: 9, 9
[[0, 0], [0, 1], [0, 4], [0, 9], [1, 0], [1, 1], [1, 4], [1, 9], [4, 0], [4, 1], [4, 4], [4, 9], [9, 0], [9, 1], [9, 4], [9, 9]]
Pool.mapで複数引数を渡す (wrapper経由)
関数が複数引数を受け取るような書き方になってる場合は、複数引数をまとめるwrapper関数をつくります。既にある関数を利用する場合はこの書き方の方がやりやすいと思います。
import time
from multiprocessing import Pool
def nijou(x, y):
print('input: %d %d' % (x, y))
time.sleep(2)
print('double: %d %d' % ((x * x), (y * y)))
def nijou_wrapper(args):
return nijou(*args)
if __name__ == "__main__":
p = Pool(4)
values = [(x, y) for x in range(4) for y in range(4)]
print(values)
p.map(nijou_wrapper, values)
Pool.applyで1つずつバラバラに使う
まとめてドカっと処理したいときにはPool.map()が便利ですが、様子を見ながら適宜実行したい場合などはバラバラに実行したくなると思います。その場合はPool.apply()またはPool.apply_async()を使います。Pool.apply()の場合は終わるまで待つので並列処理じゃなくなります。
import time
from multiprocessing import Pool, Process
def nijou(inputs):
print(inputs)
x = inputs
print('input: %d' % x)
time.sleep(2)
retValue = x * x
print('double: %d' % retValue)
return(retValue)
if __name__ == "__main__":
p = Pool(4)
values = [x for x in range(10)]
print(values)
result = p.apply(nijou, args=[values[0]])
print(result)
p.close()
上記のような使い方は関数を普通に呼ぶのと変わらないように思えますが、p.close()すると普通に関数を呼んだときよりも上手くメモリクリアされるようですのでそういう使い方もあるのかもしれません。
Pool.apply_asyncで1つずつ並列に実行
apply_async()すると終了待ちをしないので並列処理での実行が出来るようになります。get()すると結果を受け取れるまで待機してくれる筈ですが、使い方によってはエラーが出たりするようなので、ready()を使って終わるまで監視して、ready()がTrueになったらget()して受け取るという使い方が良いと思います。この書き方ならメモリ容量を監視して様子をみながらプロセスを足していくような使い方ができて便利です。
import time
from multiprocessing import Pool, Process
def nijou(inputs):
x = inputs
print('input: %d' % x)
time.sleep(2)
retValue = x * x
print('double: %d' % retValue)
return(retValue)
if __name__ == "__main__":
# Pool()を定義
p = Pool()
# プロセスを2つ非同期で実行
result = p.apply_async(nijou, args=[3])
result2 = p.apply_async(nijou, args=[5])
# 1秒間隔で終了チェックして終了したら結果を表示
for k in range(5):
if result.ready():
break
print(result.get())
print(result2.get())
p.close()
簡単ですね。
レッツトライ!
更新履歴
20190907: apply(), apply_async()の使い方を追記しました