- PytnonのThreadingはCPUコアを1個しか使わないため話にならない
- Queue を用いるとデータのやりとりが遅い
- multiprocessing.Array もめっちゃ遅い!
- multiprocessing.shared_memory はかなり速そうだけれどpython 3.8以降でしか使えない
ので、CPUコアを複数用いたプロセスの間で、大量のデータをやりとりする場合別の仕組みを用いる必要がある。ここではsharedctypes.RawArrayを用いる方法を紹介する。mmap.mmapを用いても同等なことができるはず。~~https://qiita.com/kakinaguru_zo/items/c875ca7452c30a22289d に書いたようにOpenCV 4.2だと import cv2
を multiprocessing.Process
の中で個別に行わないとUSBカメラからの読み出しで常にエラーが起きてどうにもならなかった…~~OpenCV 4以降で並列処理した例は USBカメラの映像をPythonのOpenCVで表示する にある。
- multiprocessing.sharedctypes.RawArrayもそのまま使うと遅い❣。遅くならないためには必ずmemoryviewまたはnumpy.asarray で囲って使わないと相当遅い…
- データコピーのコピー元がnumpy配列の場合
numpy.asarray
でコピー先のRawArray
を囲むと10倍以上速くなる - コピー元が'RawArray'の「全体」でコピー先が
numpy
配列の場合RawArray
をそのまま使っても遅くならない -
RawArray
の「一部」をスライシング[start:end]
で読み出す場合はmemoryviewで囲わないとデータコピーが発生して無駄にメモリが消費される上に遅くなる。 - 上記の
RawArray
に関する記述はarray, bytearray, およびバイナリオープンしたファイルからread
したときに返されるbytesにも当てはまる
下記の例では
- 関数step1で乱数をファイルから読み込んで共有メモリー1に書き込んでいる
- 関数step2では得られた乱数の逆数を共有メモリー2に書き込んでいる
- 関数step3では共有メモリー2を
numpy
の行列だと思って行列ノルムを算出している
import multiprocessing
import multiprocessing.sharedctypes
import time
import numpy as np
byte_buf_size=4096
float_buf_size=4096
def step1(out_buf, buf1_ready):
f=open("/dev/urandom", "rb", buffering=True) # /dev/randomにすると遅くなるので試してみて下さい
while True:
buf=f.read(byte_buf_size) # readintoでoutbufに向かって直接読まない
buf1_ready.clear()
memoryview(out_buf).cast('B')[:]=buf # memoryviewで囲まないと超遅い
buf1_ready.set()
print("Step 1 finished")
def step2(in_buf, out_buf, buf1_ready, buf2_ready):
while True:
buf1_ready.wait()
denominator = np.array(in_buf,dtype=np.uint8, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
buf1_ready.clear()
#print(in_buf is denominator) Falseだとコピーされている
ones = np.ones_like(denominator, dtype=np.float32)
float_array = ones / (ones + denominator.astype(np.float32))
buf2_ready.clear()
# 複写元のデータがnumpy配列のときはmemoryviewよりnp.ctypeslib.as_arrayを使うほうが速い
np.asarray(out_buf)[:] = float_array # np.asarrayで囲まないと超遅い
buf2_ready.set()
#print("Step 2 finished")
def step3(in_buf, buf2_ready):
while True:
buf2_ready.wait()
float_array = np.array(in_buf,dtype=np.float32, copy=True) # コピーを作成しin_bufの読み出しをここだけで終わらせる
buf2_ready.clear()
#print(in_buf is float_array) Falseだとコピーされている
print(np.linalg.norm(np.reshape(float_array, (64,64))))
if __name__ == '__main__':
multiprocessing.set_start_method('fork')
byte_buf=multiprocessing.sharedctypes.RawArray('B', byte_buf_size)
float_buf=multiprocessing.sharedctypes.RawArray('f', float_buf_size)
buf1_ready = multiprocessing.Event()
buf1_ready.clear()
buf2_ready = multiprocessing.Event()
buf2_ready.clear()
p1=multiprocessing.Process(target=step1, args=(byte_buf,buf1_ready), daemon=True)
p2=multiprocessing.Process(target=step2, args=(byte_buf,float_buf,buf1_ready, buf2_ready), daemon=True)
p3=multiprocessing.Process(target=step3, args=(float_buf,buf2_ready), daemon=True)
p3.start()
p2.start()
p1.start()
time.sleep(1)
exit(0)
なお2つ以上のプロセスが同一の共有メモリを読み書きしないようにmultiprocessing.Lock や似たようなので排他制御するとデッドロックしてデバッグできなかったのでいい感じに排他制御できた人はコメント欄とかで教えてください