Edited at

Python 3のmultiprocessingでプロセス間で大量のデータを受け渡しつつnumpyで処理する

ので、CPUコアを複数用いたプロセスの間で、大量のデータをやりとりする場合別の仕組みを用いる必要がある。ここではsharedctypes.RawArrayを用いる方法を紹介する。mmap.mmapを用いても同等なことができるはず。

下記の例では


  • 関数step1で乱数をファイルから読み込んで共有メモリー1に書き込んでいる

  • 関数step2では得られた乱数の逆数を共有メモリー2に書き込んでいる

  • 関数step3では共有メモリー2をnumpyの行列だと思って行列ノルムを算出している

import multiprocessing

import multiprocessing.sharedctypes
import time
import numpy as np

byte_buf_size=4096
float_buf_size=4096

def step1(out_buf, buf1_ready):
f=open("/dev/urandom", "rb", buffering=True) # /dev/randomにすると遅くなるので試してみて下さい
while True:
buf=f.read(byte_buf_size) # readintoでoutbufに向かって直接読まない
buf1_ready.clear()
memoryview(out_buf).cast('B')[:]=buf # memoryviewで囲まないと超遅い
buf1_ready.set()
print("Step 1 finished")

def step2(in_buf, out_buf, buf1_ready, buf2_ready):
while True:
buf1_ready.wait()
denominator = np.array(in_buf,dtype=np.uint8, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
buf1_ready.clear()
#print(in_buf is denominator) Falseだとコピーされている
ones = np.ones_like(denominator, dtype=np.float32)
float_array = ones / (ones + denominator.astype(np.float32))
buf2_ready.clear()
# 複写元のデータがnumpy配列のときはmemoryviewよりnp.ctypeslib.as_arrayを使うほうが速い
np.asarray(out_buf)[:] = float_array # np.asarrayで囲まないと超遅い
buf2_ready.set()
#print("Step 2 finished")

def step3(in_buf, buf2_ready):
while True:
buf2_ready.wait()
float_array = np.array(in_buf,dtype=np.float32, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
buf2_ready.clear()
#print(in_buf is float_array) Falseだとコピーされている
print(np.linalg.norm(np.reshape(float_array, (64,64))))

if __name__ == '__main__':
multiprocessing.set_start_method('fork')
byte_buf=multiprocessing.sharedctypes.RawArray('B', byte_buf_size)
float_buf=multiprocessing.sharedctypes.RawArray('f', float_buf_size)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
buf2_ready = multiprocessing.Event()
buf2_ready.clear()
p1=multiprocessing.Process(target=step1, args=(byte_buf,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=step2, args=(byte_buf,float_buf,buf1_ready, buf2_ready), daemon=True)
p3=multiprocessing.Process(target=step3, args=(float_buf,buf2_ready), daemon=True)
p3.start()
p2.start()
p1.start()
time.sleep(1)
exit(0)

なお2つ以上のプロセスが同一の共有メモリを読み書きしないようにmultiprocessing.Lock や似たようなので排他制御するとデッドロックしてデバッグできなかったのでいい感じに排他制御できた人はコメント欄とかで教えてください


参考文献