multiprocessingで生成したプロセス同士で変数を共有するには共有メモリを生成する必要があります。生成した共有メモリとnumpy配列のデータを相互に変換する方法がわからなかったのでここにメモしておきます。
共有メモリを生成する方法はいくつかありますが、ここではmultiprocessingのValueクラスを使用します。Valueクラスはctypesオブジェクトの同期ラッパーです。つまりnumpy配列とctypes配列の相互変換ができれば、今回やろうとしていることが実現できます。
numpy配列とctypes配列の相互変換についてはこちら
https://qiita.com/maiueo/items/493d321f0c9a804b3672
ライブラリの読み込み
import ctypes
import numpy as np
import multiprocessing as mp
ndarray -> Value
n = np.zeros((3, 4)).astype(np.uint8) # ndarray型の3x4の配列を生成
n_w, n_h = n.shape # nのサイズを取得
v = mp.Value((ctypes.c_uint8 * n_h) * n_w) # nと同じサイズのctypesの配列を共有メモリに生成
v_c = v.get_obj() # 生成したctypesオブジェクトを取得
v_n = np.ctypeslib.as_array(v_c) # 生成したctypesオブジェクトを参照したndarrayを生成
v_n[:] = n # nから値をコピー(v_nのインスタンスはそのまま)
np.uint8, ctypes.c_uint8の部分は配列に用いる型の指定
n
とv
はそれぞれ別のアドレスを参照しているので、片方を書き換えても、もう片方には書き換えは反映されません。
同じアドレスを参照させる方法もあるかもしれませんが、見つけることができませんでした。
Value -> ndarray
v = mp.Value((ctypes.c_uint8 * 4) * 3) # ctypesの3x4の配列を共有メモリに生成
v_c = v.get_obj() # 生成したctypesオブジェクトを取得
n = np.ctypeslib.as_array(v_c) # 生成したctypesオブジェクトを参照したndarrayを生成
ctypes.c_uint8の部分は配列に用いる型の指定
v
とn
はともに同じアドレスを参照しているので、片方を書き換えるともう片方にも書き換えが反映されます。
サンプル1
cv2で読み込んだ画像(ndarray型)をValueクラスに変換し、それをまたndarray型に戻すプログラムです。
画像ファイルのパスの箇所を適宜変更してください。
import ctypes
import numpy as np
import multiprocessing as mp
import cv2
def valueToNdarray(v):
return np.ctypeslib.as_array(v.get_obj()) # Valueからndarrayに変換
def ndarrayToValue(n):
n_h, n_w, n_ch = n.shape # サイズ、チャンネル数を取得
v = mp.Value(((ctypes.c_uint8 * n_ch) * n_w) * n_h) # 同サイズ、同チャンネル数の共有メモリを生成
valueToNdarray(v)[:] = n # コピー
return v
if __name__ == '__main__':
# 適当な画像からndarrayの生成
input_img = cv2.imread("画像ファイルのパス")
# Valueに変換
v = ndarrayToValue(input_img)
# ndarrayに変換
output_img = valueToNdarray(v)
# プレビュー
cv2.imshow("input", input_img)
cv2.imshow("output", output_img)
cv2.waitKey(0)
サンプル2
cv2で読み込んだ画像(ndarray型)に別プロセスでフィルターをかけるプログラムです。
読み込んだ画像を45度回転します。
画像ファイルのパスの箇所を適宜変更してください。
import ctypes
import numpy as np
import multiprocessing as mp
import cv2
def valueToNdarray(v):
return np.ctypeslib.as_array(v.get_obj()) # Valueからndarrayに変換
def ndarrayToValue(n):
n_h, n_w, n_ch = n.shape # サイズ、チャンネル数を取得
v = mp.Value(((ctypes.c_uint8 * n_ch) * n_w) * n_h) # 同サイズ、同チャンネル数の共有メモリを生成
valueToNdarray(v)[:] = n # コピー
return v
def filter(v):
n = valueToNdarray(v) # Valueからndarrayに変換
n_h, n_w, n_ch = n.shape # 画像サイズ、チャンネル数の取得
M = cv2.getRotationMatrix2D((n_w / 2, n_h / 2), 45, 1) # 画像の中心を軸に45度回転する行列を求める
n[:] = cv2.warpAffine(n, M, (n_w, n_h)) # 生成した行列で画像を回転させる
if __name__ == '__main__':
# 適当な画像からndarrayの生成
input_img = cv2.imread("画像ファイルのパス")
# Valueに変換
v = ndarrayToValue(input_img)
# 画像を45度回転するプロセスを生成
p = mp.Process(target = filter, args = (v, ))
p.start() # プロセスの開始
p.join() # プロセスの終了待機
# 処理結果をndarrayに変換
output_img = valueToNdarray(v)
# プレビュー
cv2.imshow("input", input_img)
cv2.imshow("output", output_img)
cv2.waitKey(0)
参考
https://docs.python.org/ja/3/library/multiprocessing.html#multiprocessing.Value
https://codeday.me/jp/qa/20181210/61068.html