Help us understand the problem. What is going on with this article?

Python 3のmultiprocessingでプロセス間で大量のデータを受け渡しつつnumpyで処理する

ので、CPUコアを複数用いたプロセスの間で、大量のデータをやりとりする場合別の仕組みを用いる必要がある。ここではsharedctypes.RawArrayを用いる方法を紹介する。mmap.mmapを用いても同等なことができるはず。

下記の例では

  • 関数step1で乱数をファイルから読み込んで共有メモリー1に書き込んでいる
  • 関数step2では得られた乱数の逆数を共有メモリー2に書き込んでいる
  • 関数step3では共有メモリー2をnumpyの行列だと思って行列ノルムを算出している
import multiprocessing
import multiprocessing.sharedctypes
import time
import numpy as np

byte_buf_size=4096
float_buf_size=4096

def step1(out_buf, buf1_ready):
    f=open("/dev/urandom", "rb", buffering=True) # /dev/randomにすると遅くなるので試してみて下さい
    while True:
        buf=f.read(byte_buf_size) # readintoでoutbufに向かって直接読まない
        buf1_ready.clear()
        memoryview(out_buf).cast('B')[:]=buf # memoryviewで囲まないと超遅い
        buf1_ready.set()
        print("Step 1 finished")

def step2(in_buf, out_buf, buf1_ready, buf2_ready):
    while True:
        buf1_ready.wait()
        denominator = np.array(in_buf,dtype=np.uint8, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
        buf1_ready.clear()
        #print(in_buf is denominator) Falseだとコピーされている
        ones = np.ones_like(denominator, dtype=np.float32)
        float_array = ones / (ones + denominator.astype(np.float32))
        buf2_ready.clear()
        # 複写元のデータがnumpy配列のときはmemoryviewよりnp.ctypeslib.as_arrayを使うほうが速い
        np.asarray(out_buf)[:] = float_array # np.asarrayで囲まないと超遅い
        buf2_ready.set()
        #print("Step 2 finished")

def step3(in_buf, buf2_ready):
    while True:
        buf2_ready.wait()
        float_array = np.array(in_buf,dtype=np.float32, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
        buf2_ready.clear()
        #print(in_buf is float_array) Falseだとコピーされている
        print(np.linalg.norm(np.reshape(float_array, (64,64))))


if __name__ == '__main__':
    multiprocessing.set_start_method('fork')
    byte_buf=multiprocessing.sharedctypes.RawArray('B', byte_buf_size)
    float_buf=multiprocessing.sharedctypes.RawArray('f', float_buf_size)
    buf1_ready = multiprocessing.Event()
    buf1_ready.clear()
    buf2_ready = multiprocessing.Event()
    buf2_ready.clear()
    p1=multiprocessing.Process(target=step1, args=(byte_buf,buf1_ready), daemon=True)
    p2=multiprocessing.Process(target=step2, args=(byte_buf,float_buf,buf1_ready, buf2_ready), daemon=True)
    p3=multiprocessing.Process(target=step3, args=(float_buf,buf2_ready), daemon=True)
    p3.start()
    p2.start()
    p1.start()
    time.sleep(1)
    exit(0)

なお2つ以上のプロセスが同一の共有メモリを読み書きしないようにmultiprocessing.Lock や似たようなので排他制御するとデッドロックしてデバッグできなかったのでいい感じに排他制御できた人はコメント欄とかで教えてください

参考文献

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした