31
26

More than 1 year has passed since last update.

OpenCV VideoCaptureをめっちゃ速くする

Posted at

はじめに

OpenCVのVideoCaptureでは、カメラや動画ファイルから映像を取得することができます。しかし、これはそれなりに時間がかかる処理です。以下の簡単なコードで確認してみます。

from time import perf_counter

import cv2


if __name__ == "__main__":
    video_capture = cv2.VideoCapture(0)

    try:
        while True:
            start_time = perf_counter()
            _, mat = video_capture.read()
            print(perf_counter() - start_time)

            cv2.imshow("window", mat)
            cv2.waitKey(1)

    except KeyboardInterrupt:
        pass

    video_capture.release()

ThinkPad X1 Carbon(Core i5-8350U、2018モデル)では以下の結果になりました。

0.020526600000000172
0.02119850000000012
0.024764599999999692
0.02197910000000025
0.020199199999999973
0.024426599999999965
0.04262340000000009
0.032673699999999695
0.03489969999999998
0.02527530000000011
0.03051029999999999
0.024188600000000005
0.020945799999999792
0.035633499999999874
...

映像のがたつきなどは感じられないものの、取得だけで平均して0.03秒近い時間がかかっており、描画も考慮すると30FPS(フレームにつき0.033秒)も出ていないといえます。

映像を順繰りに表示するだけなら問題にはなりませんが、GUIアプリケーションに組み込んで表示するなどとすると、取得ごとに0.03秒間GUIの描画を妨げてしまうのは相当なストレスになることでしょう。

方針

そこで、フレームの取得を別プロセスで実行することを考えます。こちらの記事を参考にしました。

  • 別プロセスで常時cv2.VideoCapture#readを実行し、共有メモリに書き込む
  • 利用側では、readが呼び出された際に共有メモリから読み出し、コピーして渡す

共有メモリから読み出して複製する時間はcv2.VideoCapture#readよりはるかに短いため、readで処理が止まることがなくなります。

ただし、readを呼び出すたびに新たなフレームを取得できるかは別の話です。別プロセスが共有メモリを更新するより短いスパンでreadを呼ぶと、以前のreadと同じ内容の画像を返します。

実装

Python 3.7で動作させる必要があったため、先ほどの記事と同様に(SharedMemoryではなく)RawArrayで作成しました。

画像のサイズやチャンネルなど、共有メモリのサイズを決める情報は、「呼び出し元のプロセスで一度cv2.VideoCaptureを初期化して、実際に1枚取得する」という非常に原始的な方法で確認することにしました。したがって初期化に時間はかかりますが、事前に引数のエラーをチェックできるので、これはこれでよいと思います(マルチプロセス処理のデバッグは骨が折れます)。

すでに完成させたものから記事用に一部を抜粋しているので、不足があるかもしれません。

from __future__ import annotations

import ctypes
import multiprocessing
import multiprocessing.sharedctypes
import multiprocessing.synchronize
import signal
from time import perf_counter
from typing import cast

import cv2
import numpy as np


def _update(args: tuple, buffer: ctypes.Array[ctypes.c_uint8], ready: multiprocessing.synchronize.Event, cancel: multiprocessing.synchronize.Event):

    signal.signal(signal.SIGINT, signal.SIG_IGN)

    video_capture = cv2.VideoCapture(*args)
    if not video_capture.isOpened():
        raise IOError()

    try:
        while not cancel.is_set():
            ret, mat = cast("tuple[bool, cv2.Mat]", video_capture.read())
            if not ret:
                continue

            ready.clear()
            memoryview(buffer).cast('B')[:] = memoryview(mat).cast('B')[:]
            ready.set()

    finally:
        video_capture.release()


def _get_information(args: tuple):

    video_capture = cv2.VideoCapture(*args)
    if not video_capture.isOpened():
        raise IOError()

    try:
        ret, mat = cast("tuple[bool, cv2.Mat]", video_capture.read())
        if not ret:
            raise IOError()

        return mat.shape

    finally:
        video_capture.release()


class VideoCaptureWrapper:

    def __init__(self, *args) -> None:
        self.__shape = _get_information(args)
        height, width, channels = self.__shape
        self.__buffer = multiprocessing.sharedctypes.RawArray(
            ctypes.c_uint8, height * width * channels)

        self.__ready = multiprocessing.Event()
        self.__cancel = multiprocessing.Event()
        self.__enqueue = multiprocessing.Process(target=_update, args=(
            args, self.__buffer, self.__ready, self.__cancel), daemon=True)
        self.__enqueue.start()

        self.__released = cast(bool, False)

    def read(self):
        self.__ready.wait()
        return cast(bool, True), np.reshape(self.__buffer, self.__shape).copy()

    def release(self):
        if self.__released:
            return

        self.__cancel.set()
        self.__enqueue.join()
        self.__released = True

    def __del__(self):
        try:
            self.release()
        except:
            pass


if __name__ == "__main__":

    video_capture = VideoCaptureWrapper(0)

    try:
        while True:
            start_time = perf_counter()
            _, mat = video_capture.read()
            print(perf_counter() - start_time)

            cv2.imshow("window", mat)
            cv2.waitKey(1)

    except KeyboardInterrupt:
        pass

    video_capture.release()

VideoCaptureWrappercv2.VideoCaptureと同じシグネチャを備えており、video_capture = VideoCaptureWrapper(0)の行をvideo_capture = cv2.VideoCapture(0)に書き換えても動作します。

パフォーマンスはどうでしょうか。

0.0004926999999987913
0.0014219999999998123
0.0003393000000002644
0.0007544999999993252
0.001865399999999795
0.0004807000000006667
0.0006626999999994609
0.0024015000000012776
0.0003846999999996825
0.0007199999999993878
...

安定して0.001秒を下回る秒数で取得することができています。

おわりに

以上の方法で、cv2.VideoCapture#readで停止する時間を可能な限り短くすることができます。説明している通り、この方法では画像が更新される頻度自体を改善することはできませんが、readにかかる時間がボトルネックになっている限定的な状況では役に立つかもしれません。

getsetへの対応など、発展させたものはこちらから確認できます。

31
26
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
31
26