はじめに
自己紹介
大学2年生で情報系の学科にいます。AIに興味があって、AIエンジニアのインターンをしたり、NLPの研究をちょっとしたりしています。もし記事に不備があればコメントで教えていただけると幸いです。
背景
6万枚以上のtif画像から、データの前処理をする際に並列処理で高速化できないかなと思って調べながら実装してみました。
並列処理とは?
非同期処理、並行処理、並列処理の3つはかなり混同しやすい概念ですが、参考文献[1]、[2]の記事がわかりやすいなと思ったので、並列処理自体の話は下記のサイトにお任せします。
実装
pythonにおける並列処理の記述
pythonで並列処理を記述するにはconcurrent.futuresとmultiprocessingの2つの標準ライブラリが使えます。今回はmultiprocessingを使用しており、concurrent.futuresには触れません。なお、multiprocessingを使用したのは、
そのため、IO関係での待ち時間が結構ある場合にはProcessを使ったほうが速くなるケースがある・・らしい。
と[1]に記載があったためです。今回は画像の読み込みが律速になりそうだったのでProcessを採用することに決めました。
並列で処理したい関数
今回は単純化のために単なるresizeのみとしますが、仮に以下の関数の処理を並列化したいとします。わざわざimg_npでndarrayとして画像を保存していく目的はデータセットとして以後扱う際にデータサイズを落とす(画像のメタ情報を削ぎ落とす)ためです。
def preprocess(img_paths: list[str]) -> None:
img_np = np.empty((0, new_height, new_width))
for path in img_paths:
img = cv2.imread(path)
img = cv2.resize(img, (new_width, new_height))
img_np = np.vstack((img_arr, img))
並列処理のスクリプト
import cv2
import numpy as np
from multiprocessing import shared_memory
def multiprocess_img_preprocess(img_paths, shared_mem_name, shape):
existing_shm = shared_memory.SharedMemory(name=shared_mem_name)
img_np = np.ndarray(shape, dtype=np.uint8, buffer=existing_shm.buf)
for i, path in enumerate(img_paths):
img = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
img = cv2.resize(img, (shape[2], shape[1]))
img_np[i, :, :] = img
if __name__ == "__main__":
img_paths = [] # 画像のパスのリスト
height = # 画像の高さ
width = # 画像の幅
array_shape = (len(img_paths), height, width)
shm = shared_memory.SharedMemory(create=True, size=np.prod(array_shape) * np.dtype(np.uint8).itemsize)
images = np.ndarray(array_shape, dtype=np.uint8, buffer=shm.buf)
p = Process(target=multiprocess_img_preprocess, args=(img_paths, shm.name, array_shape))
p.start()
p.join()
images = np.ndarray(array_shape, dtype=np.uint8, buffer=shm.buf)
# 共有メモリの解放
shm.close()
shm.unlink()
共有メモリの作成
array_shape = (len(img_paths), height, width)
shm = shared_memory.SharedMemory(create=True, size=np.prod(array_shape) * np.dtype(np.uint8).itemsize)
images = np.ndarray(array_shape, dtype=np.uint8, buffer=shm.buf)
並列処理するプロセス間でデータを共有するには、共有メモリを使う必要があります。multiprocessingにはValue, Array, RawValue, RawArrayという共有メモリ上でデータを保持するためのクラスが用意されています。しかしながらいずれのクラスにおいてもctypesオブジェクトしか保持できません。ですので今回はshared_memory.SharedMemoryクラスを用いて適切な大きさのメモリを確保し、np.ndarrayのbuffer引数で行列を共有メモリ上に配置する形で実装しました。
参考文献
[1]その並列処理待った! 「Python 並列処理」でググったあなたに捧ぐasync, threading, multiprocessingのざっくりとした説明