はじめに
OpenCVのVideoCaptureでは、カメラや動画ファイルから映像を取得することができます。しかし、これはそれなりに時間がかかる処理です。以下の簡単なコードで確認してみます。
from time import perf_counter
import cv2
if __name__ == "__main__":
video_capture = cv2.VideoCapture(0)
try:
while True:
start_time = perf_counter()
_, mat = video_capture.read()
print(perf_counter() - start_time)
cv2.imshow("window", mat)
cv2.waitKey(1)
except KeyboardInterrupt:
pass
video_capture.release()
ThinkPad X1 Carbon(Core i5-8350U、2018モデル)では以下の結果になりました。
0.020526600000000172
0.02119850000000012
0.024764599999999692
0.02197910000000025
0.020199199999999973
0.024426599999999965
0.04262340000000009
0.032673699999999695
0.03489969999999998
0.02527530000000011
0.03051029999999999
0.024188600000000005
0.020945799999999792
0.035633499999999874
...
映像のがたつきなどは感じられないものの、取得だけで平均して0.03秒近い時間がかかっており、描画も考慮すると30FPS(フレームにつき0.033秒)も出ていないといえます。
映像を順繰りに表示するだけなら問題にはなりませんが、GUIアプリケーションに組み込んで表示するなどとすると、取得ごとに0.03秒間GUIの描画を妨げてしまうのは相当なストレスになることでしょう。
方針
そこで、フレームの取得を別プロセスで実行することを考えます。こちらの記事を参考にしました。
- 別プロセスで常時
cv2.VideoCapture#read
を実行し、共有メモリに書き込む - 利用側では、
read
が呼び出された際に共有メモリから読み出し、コピーして渡す
共有メモリから読み出して複製する時間はcv2.VideoCapture#read
よりはるかに短いため、readで処理が止まることがなくなります。
ただし、readを呼び出すたびに新たなフレームを取得できるかは別の話です。別プロセスが共有メモリを更新するより短いスパンでreadを呼ぶと、以前のreadと同じ内容の画像を返します。
実装
Python 3.7で動作させる必要があったため、先ほどの記事と同様に(SharedMemoryではなく)RawArrayで作成しました。
画像のサイズやチャンネルなど、共有メモリのサイズを決める情報は、「呼び出し元のプロセスで一度cv2.VideoCapture
を初期化して、実際に1枚取得する」という非常に原始的な方法で確認することにしました。したがって初期化に時間はかかりますが、事前に引数のエラーをチェックできるので、これはこれでよいと思います(マルチプロセス処理のデバッグは骨が折れます)。
すでに完成させたものから記事用に一部を抜粋しているので、不足があるかもしれません。
from __future__ import annotations
import ctypes
import multiprocessing
import multiprocessing.sharedctypes
import multiprocessing.synchronize
import signal
from time import perf_counter
from typing import cast
import cv2
import numpy as np
def _update(args: tuple, buffer: ctypes.Array[ctypes.c_uint8], ready: multiprocessing.synchronize.Event, cancel: multiprocessing.synchronize.Event):
signal.signal(signal.SIGINT, signal.SIG_IGN)
video_capture = cv2.VideoCapture(*args)
if not video_capture.isOpened():
raise IOError()
try:
while not cancel.is_set():
ret, mat = cast("tuple[bool, cv2.Mat]", video_capture.read())
if not ret:
continue
ready.clear()
memoryview(buffer).cast('B')[:] = memoryview(mat).cast('B')[:]
ready.set()
finally:
video_capture.release()
def _get_information(args: tuple):
video_capture = cv2.VideoCapture(*args)
if not video_capture.isOpened():
raise IOError()
try:
ret, mat = cast("tuple[bool, cv2.Mat]", video_capture.read())
if not ret:
raise IOError()
return mat.shape
finally:
video_capture.release()
class VideoCaptureWrapper:
def __init__(self, *args) -> None:
self.__shape = _get_information(args)
height, width, channels = self.__shape
self.__buffer = multiprocessing.sharedctypes.RawArray(
ctypes.c_uint8, height * width * channels)
self.__ready = multiprocessing.Event()
self.__cancel = multiprocessing.Event()
self.__enqueue = multiprocessing.Process(target=_update, args=(
args, self.__buffer, self.__ready, self.__cancel), daemon=True)
self.__enqueue.start()
self.__released = cast(bool, False)
def read(self):
self.__ready.wait()
return cast(bool, True), np.reshape(self.__buffer, self.__shape).copy()
def release(self):
if self.__released:
return
self.__cancel.set()
self.__enqueue.join()
self.__released = True
def __del__(self):
try:
self.release()
except:
pass
if __name__ == "__main__":
video_capture = VideoCaptureWrapper(0)
try:
while True:
start_time = perf_counter()
_, mat = video_capture.read()
print(perf_counter() - start_time)
cv2.imshow("window", mat)
cv2.waitKey(1)
except KeyboardInterrupt:
pass
video_capture.release()
VideoCaptureWrapper
はcv2.VideoCapture
と同じシグネチャを備えており、video_capture = VideoCaptureWrapper(0)
の行をvideo_capture = cv2.VideoCapture(0)
に書き換えても動作します。
パフォーマンスはどうでしょうか。
0.0004926999999987913
0.0014219999999998123
0.0003393000000002644
0.0007544999999993252
0.001865399999999795
0.0004807000000006667
0.0006626999999994609
0.0024015000000012776
0.0003846999999996825
0.0007199999999993878
...
安定して0.001秒を下回る秒数で取得することができています。
おわりに
以上の方法で、cv2.VideoCapture#read
で停止する時間を可能な限り短くすることができます。説明している通り、この方法では画像が更新される頻度自体を改善することはできませんが、readにかかる時間がボトルネックになっている限定的な状況では役に立つかもしれません。
get
、set
への対応など、発展させたものはこちらから確認できます。