前回に続き、ラズベリーパイ4B (8G, bookworm(x64)) で、multiprocessingのManagerを使い、ndarrayを共有します。ただ今回は、500MBという巨大なndarrayを共有します。
最初に、core1で4000w x 3000h の画像ndarrayを、vconcatとhconcatで結合し、巨大ndarrayを作成します。同じcore1で、この巨大ndarrayを「Managerのlist」に書き込みます。次にcore4で、「Managerのlist」から巨大ndarrayを読み出します。最後に、同じcore4で、巨大ndarrayを縮小して表示します。以下がコードです。
動かしてみると、「Managerのlistへの書き込み」と「Managerのlistからの読み出し」部分は、共に7秒です。mod==1のndarrayサイズは 24000x8000x3 = 576,000,000 = 576MBです。576/7 = 82MB/sです。1GbpsのLANのMAXスピードに近い。ストリーミングのProxyListとして納得できるスピードです。
さらに今回は、前設定の簡単なPythonのmultiprocessingのManagerを使って、ラズベリーパイ4B (8G, bookworm(x64))で、500MBの巨大ndarrayを共有できたことが、大きな収穫です。
# manager-ndarray---bookworm-x64-500MB.py
import time
from multiprocessing import Manager, Value, Process
import numpy as np
import cv2 as cv
# Raspberry Pi 4B, 8G under cooling on Thonny
# ----------------------------------------------------------
# Bookworm(x64) OS: Linux raspberrypi 6.1.0-rpi7-rpi-v8 #1 SMP PREEMPT Debian 1:6.1.63-1+rpt1 (2023-11-24) aarch64 GNU/Linux
# Python 3.11.2 (main, Mar 13 2023, 12:18:29)
# numpy 1.24.2
# OpenCV 4.6.0 (sudo apt install python3-opencv)
# ----------------------------------------------------------
# https://qiita.com/like_ndarray/items/d39dbde96d5cdc81421d
# ref
# https://qiita.com/like_ndarray/items/b6accb010bde24c477d3
def process1(vcount00, f_plist0, plist0, f_plist1, plist1):
img00 = cv.imread("d00-4000.png") #4000w x 3000h (100K)
img01 = cv.imread("d01-4000.png")
img02 = cv.imread("d02-4000.png")
img03 = cv.imread("d03-4000.png") #warning for bookworm
try:
while(True):
count00_old = vcount00.value
q, mod = divmod(count00_old, 2)
if mod == 0:
img480 = cv.vconcat([img00, img01, img02, img03, img00, img01, img02, img03])
img480_4d = np.reshape(img480, [1, 24000, 4000, 3])
while True:
time.sleep(0.01)
if f_plist0.value == 0:
f_plist0.value = 9
if f_plist0.value == 9:
f_plist0.value = 1
break
plist0[:]= []
for i in img480_4d:
plist0.append(i)
if len(plist0)==0:
print("plist0==0")
f_plist0.value = 4
elif mod == 1:
img480v = cv.vconcat([img00, img01, img02, img03, img00, img01, img02, img03])
img480 = cv.hconcat([img480v, img480v])
img480_4d = np.reshape(img480, [1, 24000, 8000, 3])
while True:
time.sleep(0.01)
if f_plist1.value == 0:
f_plist1.value = 9
if f_plist1.value == 9:
f_plist1.value = 1
break
plist1[:]= []
pretime=time.time()
for i in img480_4d:
plist1.append(i)
print("Writing")
print(time.time()-pretime) ## 7sec for writing
if len(plist1)==0:
print("plist1==0")
f_plist1.value = 4
vcount00.value += 1
except KeyboardInterrupt:
print('process1 excepted')
def process4(vcount00, f_plist0, plist0, f_plist1, plist1):
try:
while(True):
time.sleep(0.1)
count0_old = vcount00.value - 1
q, mod = divmod(count0_old, 2)
if f_plist0.value == 4 and mod == 0: # 5sec
i_retry = 0
while True:
l_ndarray00=[]
for i in plist0:
l_ndarray00.append(i)
i_retry += 1
if i_retry > 100:
print("i_retry >= 100 error")
f_plist0.value = 9
break
if len(l_ndarray00) == 0:
time.sleep(0.0001)
continue
else:
print(i_retry)
a_ndarray00 = np.array(l_ndarray00, dtype="uint8")
a_ndarray00c = a_ndarray00.copy()
f_plist0.value = 9
break
a_ndarray00c333 = np.reshape(a_ndarray00c, [24000, 4000, 3])
img480 = cv.resize(a_ndarray00c333, (160, 960))
cv.imshow("Image", img480)
key = cv.waitKey(1000)
# for check
a_ndarray00c333aa = cv.hconcat([a_ndarray00c333, a_ndarray00c333])
elif f_plist1.value == 4 and mod == 1: # 10sec
i_retry = 0
while True:
l_ndarray00=[]
pretime=time.time()
for i in plist1:
l_ndarray00.append(i)
print("Reading")
print(time.time()-pretime) ## 7sec for reading
i_retry += 1
if i_retry > 100:
print("i_retry >= 100 error")
f_plist1.value = 9
break
if len(l_ndarray00) == 0:
time.sleep(0.0001)
continue
else:
print(i_retry)
a_ndarray00 = np.array(l_ndarray00, dtype="uint8")
a_ndarray00c = a_ndarray00.copy()
f_plist1.value = 9
break
a_ndarray00c333bb = np.reshape(a_ndarray00c, [24000, 8000, 3])
img480 = cv.resize(a_ndarray00c333bb, (320, 960))
cv.imshow("Image", img480)
key = cv.waitKey(1000)
# for check
print(np.array_equal(a_ndarray00c333aa, a_ndarray00c333bb))
except KeyboardInterrupt:
cv.destroyAllWindows()
print('process4 excepted')
if __name__ == "__main__":
with Manager() as manager:
vcount00 = manager.Value('i', 0)
vcount00.value = 0
f_plist0 = manager.Value('i', 0)
f_plist0.value = 9
plist0 = manager.list()
f_plist1 = manager.Value('i', 0)
f_plist1.value = 9
plist1 = manager.list()
process1 = Process(target=process1, args=[vcount00, f_plist0, plist0, f_plist1, plist1])
process4 = Process(target=process4, args=[vcount00, f_plist0, plist0, f_plist1, plist1])
process1.start()
process4.start()
process1.join()
process4.join()
今回のプログラムより、100GBの巨大なcsvファイルも、あらかじめ500MB毎に分割すれば、ラズベリーパイ4B (8G, bookworm(x64))で、multiprocessingのManagerを使って共有できます。
※ csvファイルは、UTF-8のテキストファイルです。「テキストファイル分割」などで検索すれば、ヒットすると思います。Windowsであれば、「PowerShell」を使い、1行のコマンドで分割してくれます。
※ 仮に、ラズベリーパイ(bookworm(x64))のMainメモリー(8G)が不足した場合は、「Swapの設定」をすれば、Swapメモリー増設が可能です。「Swapメモリーの動作確認」もされています。ただしSwapメモリーとして、SSDの追加購入が必要です。▶2024FEB22追記:ラズベリーパイ5では、PCIeのM.2,NVMeのSSDが使えるようです。ただbookworm(x64)のSwapメモリーは100MB毎のシーケンシャルRead・Writeなので、USB型SSDでもSwapメモリーとして充分に実用に耐えると思います。
お忙しい中、ご覧いただきありがとうございました。
なお公開プログラムは、コピー・改変も自由です。著作権に関する問題も発生しません。
以下はプログラム中で使用した画像です。
d00-4000.png
d01-4000.png
d02-4000.png
d03-4000.png