4
2

More than 1 year has passed since last update.

SharedMemoryを使ってnumpy配列を共有しmultiprocessing.Poolで並列処理する

Last updated at Posted at 2021-10-29

はじめに

Python3.8からmultiprocessing.shared_memory.SharedMemoryをつかってプロセス間でのメモリ共有が可能になっている。メモリ共有したnumpy配列をmultiprocessing.Poolを使って並列処理する方法を記載する。

以下記事を一部参考にした。そこではmulriprocessing.Processを使う場合について記載しているが、ここではmultiprocessing.Poolを使う場合についてが主な話題となる。

準備

共有するnumpy配列はnp.uint8型で1000行1000列の行列とする。
このとき確保する共有メモリの大きさは、1000 x 1000 x np.uint8型の大きさであり次のsz値となる。

import numpy as np

sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()

共有メモリ名をshared00とすると大きさszの共有メモリを作成する処理は以下のようになる。

import numpy as np
from multiprocessing import shared_memory

sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name='shared00' )

この共有メモリ領域を使用するnumpy配列は次のようにして得られる。

ary = np.ndarray ( shape=(1000, 1000), dtype=np.uint8, buffer=shm.buf )

なおこの共有メモリ領域は使用が終わったら開放する必要がある。try ... finallyを使えば間違いなく開放できる。メモリの確保から開放までをまとめると次のようになる。

import numpy as np
from multiprocessing import shared_memory

sz = np.prod ( 1000, 1000, np.dtype ( 'uint8' ).itemsize ).item()
shm = shared_memory.SharedMemory ( create=True, size=sz, name='shared00' )
try :
    ary = np.ndarray ( shape=(1000,1000), dtype=np.uint8, buffer=shm.buf )
    # なにかすごい計算をする
finally :
    shm.close()
    shm.unlink()

この共有メモリ領域を別プロセスから使うことができる。別プロセスでも共有メモリ領域を使い終わったらshm.close()でデタッチしなければならない。ここでもtry...finallyをつかえば必ずデタッチできる。

import numpy as np
from multiprocessing import shared_memory

# 別プロセスで共有メモリをアタッチ
shm = shared_memory.SharedMemory ( name='shared00' )
try :
    ary = np.ndarray ( shape=(1000,1000), dtype=np.uint8, buffer=shm.buf )
    # なにかすごい処理
finally :
    # 使い終わったらデタッチ
    shm.close()

この共有メモリのmultiprocessing.Poolでの使い方がこの記事の主題となる。

multiprocessing.PoolでSharedMemoryを並列処理(遅い)

前の章に記載したように、メインプロセスで作成した共有メモリ上のnumpy配列を別プロセス(サブプロセス)で使用するためには、共有メモリ名とnumpy配列のshapeが必要になる。

これらをglobal変数に格納し、fork()あるいはspawn()する際にサブプロセスと共有してみる。
前の章のコードをmultiprocessing.Pool向きに整理すると、メインプロセスは次のようになる。

import numpy as np
from multiprocessing import Pool, shared_memory

if __name__ == "__main__" : 
    # 共有メモリの大きさと名前をグローバル変数で共有する
    global gSHAPE, gNAME
    gSHAPE = ( 1000, 1000 ) # 1000行1000列のnumpy配列を共有する
    gNAME = 'shared00'  # 共有メモリ名を設定
    sz = np.prod ( gSHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
    shm = shared_memory.SharedMemory ( create=True, size=sz, name=gNAME )
    try:
        # 共有メモリ領域を配列とみなし初期値をセット
        ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
        ary[:] = 0
        # メイン処理
        with Pool() as pool :
            # なにかすごい並列処理
    finally:
        # メインプロセスの共有メモリをデタッチし開放
        shm.close()
        shm.unlink()

Windowsの場合はspawnとなるため、global変数を共有するための関数をPoolインスタンス作成時に指定する必要がある。

def init ( shape, name ) :
    global gSHAPE, gNAME
    gSHAPE = shape
    gNAME = name

Windowsの場合のPool()記述は次のようになる。

        with Pool ( initializer=init, initargs=(gSHAPE, gNAME) ) as pool :

さて、上述の# なにかすごい並列処理 の部分を以下のように記述する。
関数testを処理するnumpy配列の行番号をパラメータで指定し並列で実行することにする。このとき並列実行するメモリ領域は重ならないため排他制御は考慮しない。
性能評価のため、numpy配列全体に対する並列処理をを100回繰り返すことにした。

        with Pool() as pool :
            for _ in range ( 100 ) :    # 100回繰り返す
                # 処理する行番号をパラメータで渡す
                pool.map ( test, range ( gSHAPE[0] ) )

次にサブプロセスでは、以下のように共有メモリ領域をアタッチしnumpy配列を得てからtestのメイン処理を実行する。

def getAry() :
    """共有メモリ上の"shared00"をshmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
    global gSHAPE, gNAME
    shm = shared_memory.SharedMemory ( name=gNAME )
    ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
    return ary, shm

そしてtest関数では、指定された行の値を1インクリメントする。getAry()でアタッチした共有メモリは必ずデタッチすること。

def test ( idx ) :
    """ サブプロセスが実行する関数。idxは処理する行番号 """
    ary, shm = getAry()
    try:
        ary [ idx ] += 1
    finally:
        # 共有メモリをデタッチ
        shm.close()
    return

この章のコードをまとめると以下のようになる。

mp_SharedMem.py
from multiprocessing import Pool, shared_memory
import numpy as np

def getAry() :
    """共有メモリ上の"shared00"をshmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
    global gSHAPE, gNAME
    shm = shared_memory.SharedMemory ( name=gNAME )
    ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
    return ary, shm

def test ( idx ) :
    """ サブプロセスが実行する関数。idxは処理する行番号 """
    ary, shm = getAry()
    try:
        ary [ idx ] += 1
    finally:
        # 共有メモリをデタッチ
        shm.close()
    return

def init ( shape, name ) :
    global gSHAPE, gNAME
    gSHAPE = shape
    gNAME = name

if __name__ == "__main__" : 
    # 共有メモリの大きさと名前をグローバル変数で共有する
    global gSHAPE, gNAME
    gSHAPE = ( 1000, 1000 ) # 1000行1000列のnumpy配列を共有する
    gNAME = 'shared00'
    sz = np.prod ( gSHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
    shm = shared_memory.SharedMemory ( create=True, size=sz, name=gNAME )
    try:
        # 共有メモリ領域を配列とみなし初期値をセット
        ary = np.ndarray ( shape=gSHAPE, dtype=np.uint8, buffer=shm.buf )
        ary[:] = 0
        # メイン処理
        with Pool ( initializer=init, initargs=(gSHAPE, gNAME) ) as pool :
            for _ in range ( 100 ) :    # 100回繰り返す
                # 処理する行番号をパラメータで渡す
                pool.map ( test, range ( gSHAPE[0] ) )
        print ( '最初の行: %s' % ary [ 0 ][:10] )
        print ( '最後の行: %s' % ary [ -1 ][:10] )
    finally:
        # メインプロセスの共有メモリをデタッチし開放
        shm.close()
        shm.unlink()

このコードをmp_SharedMem.pyとして保存し実行すると以下のようになる。

> time python mp_SharedMem.py
最初の行: [100 100 100 100 100 100 100 100 100 100]
最後の行: [100 100 100 100 100 100 100 100 100 100]

real    0m6.418s
user    0m0.016s
sys     0m0.015s

結構遅い。。。

multiprocessing.PoolでSharedMemoryを並列処理(改良)

前章でSharedMemoryに並列アクセスしたが結構遅い。呼び出しの都度行っている共有メモリのアタッチとデタッチがオーバーヘッドになっているのかもしれない。

そもそもmultiprocessing.Poolでは、インスタンス作成時に指定の数だけプロセスを作成しそれを使いまわしている。
つまりサブプロセス作成時に共有メモリをアタッチし、全ての処理が完了したら共有メモリをデタッチする、というように変更したら効率がよいのかもしれない。

次のような関数を作成しPoolのinitializerで指定する。ここで共有メモリをアタッチし、サブプロセスのグローバル変数に保存する。

def initShm ( ary_shape, shmName ) :
    """共有メモリ上の"shared00"をgShmにアタッチしnumpy配列とみなす。shmは別途デタッチが必要"""
    global gShm, gAry
    gShm = shared_memory.SharedMemory ( name=shmName )
    gAry = np.ndarray ( shape=ary_shape, dtype=np.uint8, buffer=gShm.buf )
    return

そしてすべての処理が完了し、もうサブプロセスで共有メモリを使わなくなったタイミングで以下の関数を呼び出す。

def closeShm ( ) :
    """グローバル変数に保存したgShmをデタッチする"""
    global gShm
    if gShm is not None :
        gShm.close()
    gShm = None
    return

メインプロセスは前の章とほぼ同じだが要点を抜き出すと以下のようになる。

        # メイン処理 共有メモリの大きさと名前をサブプロセスに渡しそこでアタッチさせる
        with Pool ( initializer=initShm, initargs=(SHAPE, NAME) ) as pool :
            # なにかすごい処理
            ...
            # すごい処理完了; サブプロセスの共有メモリをデタッチする
            pool.starmap ( closeShm, [ (), ] * pool._processes )

ここでPoolインスタンス作成時のパラメータが変わっている。また、メイン処理完了後に上述のcloseShmを呼び出す処理も追加になっている。

サブプロセスの共有メモリをデタッチする部分で使用しているpool._processesは、Poolクラスの内部変数であり保有しているプロセス数を格納している。サブプロセスの共有メモリをデタッチできないことがあるかもしれないが、どうせすぐ後でunlink()するし、サブプロセスもその共有メモリもすぐにdestroyされると期待して気にしないことにした。

なお、closeShm()はパラメータがないため、Pool.map()が使えずPool.starmap()を採用している。以下の記事を参考にした。

ここでのコードをまとめると次のようになる。

mp_SharedMem2.py
from multiprocessing import Pool, shared_memory
import numpy as np

def closeShm ( ) :
    """グローバル変数に保存したgShmをデタッチする"""
    global gShm
    if gShm is not None :
        gShm.close()
    gShm = None
    return

def initShm ( ary_shape, shmName ) :
    """共有メモリ上の"shared00"をgShmにアタッチしnumpy配列とみなす。gShmは別途デタッチが必要"""
    global gShm, gAry
    gShm = shared_memory.SharedMemory ( name=shmName )
    gAry = np.ndarray ( shape=ary_shape, dtype=np.uint8, buffer=gShm.buf )
    return

def test ( idx ) :
    """ サブプロセスが実行する関数。idxは処理する行番号 """
    global gAry
    gAry [ idx ] += 1
    return

if __name__ == "__main__" : 
    # 共有メモリの大きさと名前を設定する
    SHAPE = ( 1000, 1000 )  # 1000行1000列のnumpy配列を共有する
    NAME = 'shared00'
    sz = np.prod ( SHAPE + ( np.dtype ( "uint8" ).itemsize, ) ).item()
    shm = shared_memory.SharedMemory ( create=True, size=sz, name=NAME )
    try:
        # 共有メモリ領域を配列とみなし初期値をセット
        ary = np.ndarray ( shape=SHAPE, dtype=np.uint8, buffer=shm.buf )
        ary[:] = 0
        # メイン処理 共有メモリの大きさと名前をサブプロセスに渡す
        with Pool ( initializer=initShm, initargs=(SHAPE, NAME) ) as pool :
            for _ in range ( 100 ) :    # 100回繰り返す
                # 処理する行番号をパラメータで渡す
                pool.map ( test, range ( SHAPE[0] ) )
            # サブプロセスの共有メモリをデタッチする
            pool.starmap ( closeShm, [ (), ] * pool._processes )
        print ( '最初の行: %s' % ary [ 0 ][:10] )
        print ( '最後の行: %s' % ary [ -1 ][:10] )
    finally:
        # メインプロセスの共有メモリをデタッチし開放
        shm.close()
        shm.unlink()

このコードをmp_SharedMem2.pyとして保存し実行すると以下のようになる。

> time python mp_SharedMem2.py
最初の行: [100 100 100 100 100 100 100 100 100 100]
最後の行: [100 100 100 100 100 100 100 100 100 100]

real    0m1.157s
user    0m0.015s
sys     0m0.000s

速くなった!

まとめ

multiprocessing.shared_memory.SharedMemorymultiprocessing.Poolで並列処理する場合、呼び出しの都度共有メモリをアタッチするのではなく、Poolインスタンス作成時に共有メモリをアタッチする方が5.5倍(6.418 / 1.157 = 5.547..)速かった!

ただし、ここでは触れなかったがmultiprocessing.Arrayを使ってメモリを共有した場合でも同じくらいの速さが得られる。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2