LoginSignup
1
0

4B(bookworm)、Managerで、巨大500MBのndarrayを共有

Last updated at Posted at 2024-01-22

前回に続き、ラズベリーパイ4B (8G, bookworm(x64)) で、multiprocessingのManagerを使い、ndarrayを共有します。ただ今回は、500MBという巨大なndarrayを共有します。

最初に、core1で4000w x 3000h の画像ndarrayを、vconcatとhconcatで結合し、巨大ndarrayを作成します。同じcore1で、この巨大ndarrayを「Managerのlist」に書き込みます。次にcore4で、「Managerのlist」から巨大ndarrayを読み出します。最後に、同じcore4で、巨大ndarrayを縮小して表示します。以下がコードです。

動かしてみると、「Managerのlistへの書き込み」と「Managerのlistからの読み出し」部分は、共に7秒です。mod==1のndarrayサイズは 24000x8000x3 = 576,000,000 = 576MBです。576/7 = 82MB/sです。1GbpsのLANのMAXスピードに近い。ストリーミングのProxyListとして納得できるスピードです。

さらに今回は、前設定の簡単なPythonのmultiprocessingのManagerを使って、ラズベリーパイ4B (8G, bookworm(x64))で、500MBの巨大ndarrayを共有できたことが、大きな収穫です。

# manager-ndarray---bookworm-x64-500MB.py
import time
from multiprocessing import Manager, Value, Process
import numpy as np
import cv2 as cv


# Raspberry Pi 4B, 8G   under cooling   on Thonny
# ----------------------------------------------------------
# Bookworm(x64) OS:  Linux raspberrypi 6.1.0-rpi7-rpi-v8 #1 SMP PREEMPT Debian 1:6.1.63-1+rpt1 (2023-11-24) aarch64 GNU/Linux
# Python 3.11.2 (main, Mar 13 2023, 12:18:29)
# numpy  1.24.2
# OpenCV  4.6.0 (sudo apt install python3-opencv)
# ----------------------------------------------------------
# https://qiita.com/like_ndarray/items/d39dbde96d5cdc81421d
# ref
# https://qiita.com/like_ndarray/items/b6accb010bde24c477d3


def process1(vcount00, f_plist0, plist0, f_plist1, plist1):

    img00 = cv.imread("d00-4000.png")   #4000w x 3000h (100K)
    img01 = cv.imread("d01-4000.png")
    img02 = cv.imread("d02-4000.png")
    img03 = cv.imread("d03-4000.png")   #warning for bookworm

    try:
        while(True):

            count00_old = vcount00.value
            q,  mod = divmod(count00_old, 2)

            if mod == 0:
                img480 = cv.vconcat([img00, img01, img02, img03, img00, img01, img02, img03])
                img480_4d  = np.reshape(img480, [1, 24000, 4000, 3])
                while True:
                    time.sleep(0.01)
                    if f_plist0.value == 0:
                        f_plist0.value = 9
                    if f_plist0.value == 9:
                        f_plist0.value = 1
                        break

                plist0[:]= []
                for i in img480_4d:
                    plist0.append(i)
                    
                if len(plist0)==0:
                    print("plist0==0")
                f_plist0.value = 4

            elif mod == 1:
                img480v = cv.vconcat([img00, img01, img02, img03, img00, img01, img02, img03])
                img480  = cv.hconcat([img480v, img480v])
                img480_4d  = np.reshape(img480, [1, 24000, 8000, 3])
                while True:
                    time.sleep(0.01)
                    if f_plist1.value == 0:
                        f_plist1.value = 9
                    if f_plist1.value == 9:
                        f_plist1.value = 1
                        break

                plist1[:]= []
                pretime=time.time()
                for i in img480_4d:
                    plist1.append(i)
                print("Writing")
                print(time.time()-pretime) ## 7sec for writing
                
                if len(plist1)==0:
                    print("plist1==0")
                f_plist1.value = 4


            vcount00.value  += 1
            

    except KeyboardInterrupt:
        print('process1  excepted')


def process4(vcount00, f_plist0, plist0, f_plist1, plist1):

    try:
        while(True):
            time.sleep(0.1)
            count0_old = vcount00.value - 1
            q,  mod = divmod(count0_old, 2)

            if f_plist0.value == 4 and mod == 0: # 5sec

                i_retry = 0
                while True:
                    l_ndarray00=[]
                    for i in plist0:
                        l_ndarray00.append(i)

                    i_retry += 1
                    if i_retry > 100:
                        print("i_retry >= 100  error")
                        f_plist0.value = 9
                        break
                    if len(l_ndarray00) == 0:
                        time.sleep(0.0001)
                        continue
                    else:
                        print(i_retry)
                        a_ndarray00 = np.array(l_ndarray00, dtype="uint8")
                        a_ndarray00c = a_ndarray00.copy()
                        f_plist0.value = 9
                        break

                a_ndarray00c333  = np.reshape(a_ndarray00c, [24000, 4000, 3])
                img480 = cv.resize(a_ndarray00c333, (160, 960))
                cv.imshow("Image", img480)
                key = cv.waitKey(1000)
                
                # for check
                a_ndarray00c333aa  = cv.hconcat([a_ndarray00c333, a_ndarray00c333])

            elif f_plist1.value == 4 and mod == 1:  # 10sec
                i_retry = 0
                while True:
                    l_ndarray00=[]
                    pretime=time.time()
                    for i in plist1:
                        l_ndarray00.append(i)
                    print("Reading")
                    print(time.time()-pretime) ## 7sec for reading 

                    i_retry += 1
                    if i_retry > 100:
                        print("i_retry >= 100  error")
                        f_plist1.value = 9
                        break
                    if len(l_ndarray00) == 0:
                        time.sleep(0.0001)
                        continue
                    else:
                        print(i_retry)
                        a_ndarray00 = np.array(l_ndarray00, dtype="uint8")
                        a_ndarray00c = a_ndarray00.copy()
                        f_plist1.value = 9
                        break

                a_ndarray00c333bb  = np.reshape(a_ndarray00c, [24000, 8000, 3])
                img480 = cv.resize(a_ndarray00c333bb, (320, 960))
                cv.imshow("Image", img480)
                key = cv.waitKey(1000)
                
                # for check
                print(np.array_equal(a_ndarray00c333aa, a_ndarray00c333bb))

    except KeyboardInterrupt:
        cv.destroyAllWindows()
        print('process4  excepted')



if __name__ == "__main__":

    with Manager() as manager:

        vcount00 = manager.Value('i', 0)
        vcount00.value  = 0

        f_plist0 = manager.Value('i', 0)
        f_plist0.value  = 9
        plist0  =  manager.list()

        f_plist1 = manager.Value('i', 0)
        f_plist1.value  = 9
        plist1  =  manager.list()

        process1 = Process(target=process1, args=[vcount00, f_plist0, plist0, f_plist1, plist1])
        process4 = Process(target=process4, args=[vcount00, f_plist0, plist0, f_plist1, plist1])

        process1.start()
        process4.start()

        process1.join()
        process4.join()

今回のプログラムより、100GBの巨大なcsvファイルも、あらかじめ500MB毎に分割すれば、ラズベリーパイ4B (8G, bookworm(x64))で、multiprocessingのManagerを使って共有できます。

※ csvファイルは、UTF-8のテキストファイルです。「テキストファイル分割」などで検索すれば、ヒットすると思います。Windowsであれば、「PowerShell」を使い、1行のコマンドで分割してくれます。

※ 仮に、ラズベリーパイ(bookworm(x64))のMainメモリー(8G)が不足した場合は、「Swapの設定」をすれば、Swapメモリー増設が可能です。「Swapメモリーの動作確認」もされています。ただしSwapメモリーとして、SSDの追加購入が必要です。▶2024FEB22追記:ラズベリーパイ5では、PCIeのM.2,NVMeのSSDが使えるようです。ただbookworm(x64)のSwapメモリーは100MB毎のシーケンシャルRead・Writeなので、USB型SSDでもSwapメモリーとして充分に実用に耐えると思います。
 
お忙しい中、ご覧いただきありがとうございました。
なお公開プログラムは、コピー・改変も自由です。著作権に関する問題も発生しません。
 
以下はプログラム中で使用した画像です。
d00-4000.png
d00-4000.png
d01-4000.png
d01-4000.png
d02-4000.png
d02-4000.png
d03-4000.png
d03-4000.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0