はじめに
Python3.8からmultiprocessing.shared_memory.SharedMemory
をつかってプロセス間でのメモリ共有が可能になっている。メモリ共有したnumpy配列をmultiprocessing.Pool
を使って並列処理する方法を記載する。
以下記事を一部参考にした。そこではmulriprocessing.Process
を使う場合について記載しているが、ここではmultiprocessing.Pool
を使う場合についてが主な話題となる。
準備
共有するnumpy配列はnp.uint8
型で1000行1000列の行列とする。
このとき確保する共有メモリの大きさは、1000 x 1000 x np.uint8型の大きさであり次のsz
値となる。
import numpy as np
sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()
共有メモリ名をshared00
とすると大きさsz
の共有メモリを作成する処理は以下のようになる。
import numpy as np
from multiprocessing import shared_memory
sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name='shared00' )
この共有メモリ領域を使用するnumpy配列は次のようにして得られる。
ary = np.ndarray ( shape=(1000, 1000), dtype=np.uint8, buffer=shm.buf )
なおこの共有メモリ領域は使用が終わったら開放する必要がある。try ... finally
を使えば間違いなく開放できる。メモリの確保から開放までをまとめると次のようになる。
import numpy as np
from multiprocessing import shared_memory
sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name='shared00' )
try :
ary = np.ndarray ( shape=(1000,1000), dtype=np.uint8, buffer=shm.buf )
# なにかすごい計算をする
finally :
shm.close()
shm.unlink()
この共有メモリ領域を別プロセスから使うことができる。別プロセスでも共有メモリ領域を使い終わったらshm.close()
でデタッチしなければならない。ここでもtry...finally
をつかえば必ずデタッチできる。
import numpy as np
from multiprocessing import shared_memory
# 別プロセスで共有メモリをアタッチ
shm = shared_memory.SharedMemory ( name='shared00' )
try :
ary = np.ndarray ( shape=(1000,1000), dtype=np.uint8, buffer=shm.buf )
# なにかすごい処理
finally :
# 使い終わったらデタッチ
shm.close()
この共有メモリのmultiprocessing.Pool
での使い方がこの記事の主題となる。
multiprocessing.PoolでSharedMemoryを並列処理(遅い)
前の章に記載したように、メインプロセスで作成した共有メモリ上のnumpy配列を別プロセス(サブプロセス)で使用するためには、共有メモリ名とnumpy配列のshapeが必要になる。
これらをglobal変数に格納し、fork()あるいはspawn()する際にサブプロセスと共有してみる。
前の章のコードをmultiprocessing.Pool
向きに整理すると、メインプロセスは次のようになる。
import numpy as np
from multiprocessing import Pool, shared_memory
if __name__ == "__main__" :
# 共有メモリの大きさと名前をグローバル変数で共有する
global gSHAPE, gNAME
gSHAPE = ( 1000, 1000 ) # 1000行1000列のnumpy配列を共有する
gNAME = 'shared00' # 共有メモリ名を設定
sz = np.prod ( gSHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name=gNAME )
try:
# 共有メモリ領域を配列とみなし初期値をセット
ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
ary[:] = 0
# メイン処理
with Pool() as pool :
# なにかすごい並列処理
finally:
# メインプロセスの共有メモリをデタッチし開放
shm.close()
shm.unlink()
Windowsの場合はspawnとなるため、global変数を共有するための関数をPoolインスタンス作成時に指定する必要がある。
def init ( shape, name ) :
global gSHAPE, gNAME
gSHAPE = shape
gNAME = name
Windowsの場合のPool()記述は次のようになる。
with Pool ( initializer=init, initargs=(gSHAPE, gNAME) ) as pool :
さて、上述の# なにかすごい並列処理
の部分を以下のように記述する。
関数test
を処理するnumpy配列の行番号をパラメータで指定し並列で実行することにする。このとき並列実行するメモリ領域は重ならないため排他制御は考慮しない。
性能評価のため、numpy配列全体に対する並列処理をを100回繰り返すことにした。
with Pool() as pool :
for _ in range ( 100 ) : # 100回繰り返す
# 処理する行番号をパラメータで渡す
pool.map ( test, range ( gSHAPE[0] ) )
次にサブプロセスでは、以下のように共有メモリ領域をアタッチしnumpy配列を得てからtest
のメイン処理を実行する。
def getAry() :
"""共有メモリ上の"shared00"をshmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
global gSHAPE, gNAME
shm = shared_memory.SharedMemory ( name=gNAME )
ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
return ary, shm
そしてtest
関数では、指定された行の値を1インクリメントする。getAry()
でアタッチした共有メモリは必ずデタッチすること。
def test ( idx ) :
""" サブプロセスが実行する関数。idxは処理する行番号 """
ary, shm = getAry()
try:
ary [ idx ] += 1
finally:
# 共有メモリをデタッチ
shm.close()
return
この章のコードをまとめると以下のようになる。
from multiprocessing import Pool, shared_memory
import numpy as np
def getAry() :
"""共有メモリ上の"shared00"をshmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
global gSHAPE, gNAME
shm = shared_memory.SharedMemory ( name=gNAME )
ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
return ary, shm
def test ( idx ) :
""" サブプロセスが実行する関数。idxは処理する行番号 """
ary, shm = getAry()
try:
ary [ idx ] += 1
finally:
# 共有メモリをデタッチ
shm.close()
return
def init ( shape, name ) :
global gSHAPE, gNAME
gSHAPE = shape
gNAME = name
if __name__ == "__main__" :
# 共有メモリの大きさと名前をグローバル変数で共有する
global gSHAPE, gNAME
gSHAPE = ( 1000, 1000 ) # 1000行1000列のnumpy配列を共有する
gNAME = 'shared00'
sz = np.prod ( gSHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name=gNAME )
try:
# 共有メモリ領域を配列とみなし初期値をセット
ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
ary[:] = 0
# メイン処理
with Pool ( initializer=init, initargs=(gSHAPE, gNAME) ) as pool :
for _ in range ( 100 ) : # 100回繰り返す
# 処理する行番号をパラメータで渡す
pool.map ( test, range ( gSHAPE[0] ) )
print ( '最初の行: %s' % ary [ 0 ][:10] )
print ( '最後の行: %s' % ary [ -1 ][:10] )
finally:
# メインプロセスの共有メモリをデタッチし開放
shm.close()
shm.unlink()
このコードをmp_SharedMem.py
として保存し実行すると以下のようになる。
> time python mp_SharedMem.py
最初の行: [100 100 100 100 100 100 100 100 100 100]
最後の行: [100 100 100 100 100 100 100 100 100 100]
real 0m6.418s
user 0m0.016s
sys 0m0.015s
結構遅い。。。
multiprocessing.PoolでSharedMemoryを並列処理(改良)
前章でSharedMemoryに並列アクセスしたが結構遅い。呼び出しの都度行っている共有メモリのアタッチとデタッチがオーバーヘッドになっているのかもしれない。
そもそもmultiprocessing.Pool
では、インスタンス作成時に指定の数だけプロセスを作成しそれを使いまわしている。
つまりサブプロセス作成時に共有メモリをアタッチし、全ての処理が完了したら共有メモリをデタッチする、というように変更したら効率がよいのかもしれない。
次のような関数を作成しPoolのinitializerで指定する。ここで共有メモリをアタッチし、サブプロセスのグローバル変数に保存する。
def initShm ( ary_shape, shmName ) :
"""共有メモリ上の"shared00"をgShmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
global gShm, gAry
gShm = shared_memory.SharedMemory ( name=shmName )
gAry = np.ndarray ( shape=ary_shape, dtype=np.uint8, buffer=gShm.buf )
return
そしてすべての処理が完了し、もうサブプロセスで共有メモリを使わなくなったタイミングで以下の関数を呼び出す。
def closeShm ( ) :
"""グローバル変数に保存したgShmをデタッチする"""
global gShm
if gShm is not None :
gShm.close()
gShm = None
return
メインプロセスは前の章とほぼ同じだが要点を抜き出すと以下のようになる。
# メイン処理 共有メモリの大きさと名前をサブプロセスに渡しそこでアタッチさせる
with Pool ( initializer=initShm, initargs=(SHAPE, NAME) ) as pool :
# なにかすごい処理
...
# すごい処理完了; サブプロセスの共有メモリをデタッチする
pool.starmap ( closeShm, [ (), ] * pool._processes )
ここでPool
インスタンス作成時のパラメータが変わっている。また、メイン処理完了後に上述のcloseShm
を呼び出す処理も追加になっている。
サブプロセスの共有メモリをデタッチする部分で使用しているpool._processes
は、Pool
クラスの内部変数であり保有しているプロセス数を格納している。サブプロセスの共有メモリをデタッチできないことがあるかもしれないが、どうせすぐ後でunlink()するし、サブプロセスもその共有メモリもすぐにdestroyされると期待して気にしないことにした。
なお、closeShm()
はパラメータがないため、Pool.map()
が使えずPool.starmap()
を採用している。以下の記事を参考にした。
ここでのコードをまとめると次のようになる。
from multiprocessing import Pool, shared_memory
import numpy as np
def closeShm ( ) :
"""グローバル変数に保存したgShmをデタッチする"""
global gShm
if gShm is not None :
gShm.close()
gShm = None
return
def initShm ( ary_shape, shmName ) :
"""共有メモリ上の"shared00"をgShmにアタッチしnumpy配列とみなす。gShmは別途デタッチが必要"""
global gShm, gAry
gShm = shared_memory.SharedMemory ( name=shmName )
gAry = np.ndarray ( shape=ary_shape, dtype=np.uint8, buffer=gShm.buf )
return
def test ( idx ) :
""" サブプロセスが実行する関数。idxは処理する行番号 """
global gAry
gAry [ idx ] += 1
return
if __name__ == "__main__" :
# 共有メモリの大きさと名前を設定する
SHAPE = ( 1000, 1000 ) # 1000行1000列のnumpy配列を共有する
NAME = 'shared00'
sz = np.prod ( SHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name=NAME )
try:
# 共有メモリ領域を配列とみなし初期値をセット
ary = np.ndarray ( shape=SHAPE, dtype=np.uint8, buffer=shm.buf )
ary[:] = 0
# メイン処理 共有メモリの大きさと名前をサブプロセスに渡す
with Pool ( initializer=initShm, initargs=(SHAPE, NAME) ) as pool :
for _ in range ( 100 ) : # 100回繰り返す
# 処理する行番号をパラメータで渡す
pool.map ( test, range ( SHAPE[0] ) )
# サブプロセスの共有メモリをデタッチする
pool.starmap ( closeShm, [ (), ] * pool._processes )
print ( '最初の行: %s' % ary [ 0 ][:10] )
print ( '最後の行: %s' % ary [ -1 ][:10] )
finally:
# メインプロセスの共有メモリをデタッチし開放
shm.close()
shm.unlink()
このコードをmp_SharedMem2.py
として保存し実行すると以下のようになる。
> time python mp_SharedMem2.py
最初の行: [100 100 100 100 100 100 100 100 100 100]
最後の行: [100 100 100 100 100 100 100 100 100 100]
real 0m1.157s
user 0m0.015s
sys 0m0.000s
速くなった!
まとめ
multiprocessing.shared_memory.SharedMemory
をmultiprocessing.Pool
で並列処理する場合、呼び出しの都度共有メモリをアタッチするのではなく、Poolインスタンス作成時に共有メモリをアタッチする方が5.5倍(6.418 / 1.157 = 5.547..
)速かった!
ただし、ここでは触れなかったがmultiprocessing.Array
を使ってメモリを共有した場合でも同じくらいの速さが得られる。