21
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Python 3のmultiprocessingでプロセス間で大量のデータを受け渡しつつnumpyで処理する

Last updated at Posted at 2019-07-26

ので、CPUコアを複数用いたプロセスの間で、大量のデータをやりとりする場合別の仕組みを用いる必要がある。ここではsharedctypes.RawArrayを用いる方法を紹介する。mmap.mmapを用いても同等なことができるはず。~~https://qiita.com/kakinaguru_zo/items/c875ca7452c30a22289d に書いたようにOpenCV 4.2だと import cv2multiprocessing.Process の中で個別に行わないとUSBカメラからの読み出しで常にエラーが起きてどうにもならなかった…~~OpenCV 4以降で並列処理した例は USBカメラの映像をPythonのOpenCVで表示する にある。

下記の例では

  • 関数step1で乱数をファイルから読み込んで共有メモリー1に書き込んでいる
  • 関数step2では得られた乱数の逆数を共有メモリー2に書き込んでいる
  • 関数step3では共有メモリー2をnumpyの行列だと思って行列ノルムを算出している
import multiprocessing
import multiprocessing.sharedctypes
import time
import numpy as np

byte_buf_size=4096
float_buf_size=4096

def step1(out_buf, buf1_ready):
    f=open("/dev/urandom", "rb", buffering=True) # /dev/randomにすると遅くなるので試してみて下さい
    while True:
        buf=f.read(byte_buf_size) # readintoでoutbufに向かって直接読まない
        buf1_ready.clear()
        memoryview(out_buf).cast('B')[:]=buf # memoryviewで囲まないと超遅い
        buf1_ready.set()
        print("Step 1 finished")

def step2(in_buf, out_buf, buf1_ready, buf2_ready):
    while True:
        buf1_ready.wait()
        denominator = np.array(in_buf,dtype=np.uint8, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
        buf1_ready.clear()
        #print(in_buf is denominator) Falseだとコピーされている
        ones = np.ones_like(denominator, dtype=np.float32)
        float_array = ones / (ones + denominator.astype(np.float32))
        buf2_ready.clear()
        # 複写元のデータがnumpy配列のときはmemoryviewよりnp.ctypeslib.as_arrayを使うほうが速い
        np.asarray(out_buf)[:] = float_array # np.asarrayで囲まないと超遅い
        buf2_ready.set()
        #print("Step 2 finished")

def step3(in_buf, buf2_ready):
    while True:
        buf2_ready.wait()
        float_array = np.array(in_buf,dtype=np.float32, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
        buf2_ready.clear()
        #print(in_buf is float_array) Falseだとコピーされている
        print(np.linalg.norm(np.reshape(float_array, (64,64))))


if __name__ == '__main__':
    multiprocessing.set_start_method('fork')
    byte_buf=multiprocessing.sharedctypes.RawArray('B', byte_buf_size)
    float_buf=multiprocessing.sharedctypes.RawArray('f', float_buf_size)
    buf1_ready = multiprocessing.Event()
    buf1_ready.clear()
    buf2_ready = multiprocessing.Event()
    buf2_ready.clear()
    p1=multiprocessing.Process(target=step1, args=(byte_buf,buf1_ready), daemon=True)
    p2=multiprocessing.Process(target=step2, args=(byte_buf,float_buf,buf1_ready, buf2_ready), daemon=True)
    p3=multiprocessing.Process(target=step3, args=(float_buf,buf2_ready), daemon=True)
    p3.start()
    p2.start()
    p1.start()
    time.sleep(1)
    exit(0)

なお2つ以上のプロセスが同一の共有メモリを読み書きしないようにmultiprocessing.Lock や似たようなので排他制御するとデッドロックしてデバッグできなかったのでいい感じに排他制御できた人はコメント欄とかで教えてください

参考文献

21
21
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?