9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonでリアルタイムWebカメラ処理を最適化する:スレッドとキューの利用

Last updated at Posted at 2024-04-30

PythonとOpenCVを組み合わせてWebカメラからのフレームを効率的に取得し処理する方法について説明します。この記事では、threadingqueueモジュールを使用して、リアルタイムのカメラフィードを処理するクラスCamera_Threadの作成を行います。特に、マルチスレッディングを利用してカメラからのフレームを連続的に取得し、それをメインアプリケーションが処理できるようにキューに保存する方法に焦点を当てます。

基本的な概念

Threading

Pythonのthreadingモジュールを使用すると、複数のタスクを並行して実行できます。 これにより、重い処理をバックグラウンドで実行しながら、メインプログラムのレスポンス性を保つことができます。本例では、Webカメラからのフレームを取得するために別のスレッドを使用し、メインスレッドは取得したフレームを表示および処理します。

Queue

queue.Queueは、スレッドセーフなキュー実装で、プロデューサー(データを生成するスレッド)とコンシューマー(データを消費するスレッド)間でデータをやり取りするのに適しています。 キューは、入れた順にデータを取り出せる先入れ先出し(FIFO)の構造を持っています。スレッドセーフであるため、複数のスレッドから安全にアクセスすることができます。

Camera_Threadクラスの解説

クラスの初期設定

Camera_Threadクラスは、カメラの設定(解像度やフレームレートなど)と、フレームを保存するためのキューを初期化します。カメラはOpenCVのcv2.VideoCaptureを使用してアクセスします。

class Camera_Thread:

    def __init__(self,buffer_all = False):
        # Initialize camera
        self.camera_source = 0
        self.camera_width = 640
        self.camera_height = 480
        self.camera_frame_rate = 30
        self.camera_fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        self.buffer_length = 5
        self.buffer_all = buffer_all

        self.camera = None
        self.buffer = None
        self.frame_grab_run = False
        self.frame_grab_on = False
        self.frame_count = 0
        self.frames_returned = 0
        self.current_frame_rate = 0
        self.loop_start_time = 0
        self.thread = None

スタートとストップメソッド

start() メソッドは、キューとカメラを初期化し、カメラからフレームを取得するスレッドを開始します。stop() メソッドは、スレッドを安全に停止し、カメラリソースを解放します。

        def start(self):
        if self.buffer_all:
            self.buffer = queue.Queue(self.buffer_length)
        else:
            self.buffer = queue.Queue(1)

        self.camera = cv2.VideoCapture(self.camera_source)
        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
        self.camera.set(cv2.CAP_PROP_FPS, self.camera_frame_rate)
        self.camera.set(cv2.CAP_PROP_FOURCC, self.camera_fourcc)

        time.sleep(0.5)  # Allow camera to initialize

        self.frame_grab_run = True
        self.thread = threading.Thread(target=self.loop)
        self.thread.start()

    def stop(self):
        self.frame_grab_run = False
        if self.thread:
            self.thread.join()

        if self.camera:
            self.camera.release()
        self.buffer = None

フレームの取得とバッファへの保存

loop() メソッドは、カメラが稼働中である限り、連続してフレームを読み取り、キューに追加します。このメソッドは専用のスレッドで実行され、カメラからのフレームをリアルタイムで取得し続けることができます。

     def loop(self):
        self.frame_grab_on = True
        while self.frame_grab_run:
            if not self.buffer.full():
                ret, frame = self.camera.read()
                if not ret:
                    break
                self.buffer.put(frame)
                self.frame_count += 1

        self.frame_grab_on = False

フレームの取得

next() メソッドはキューからフレームを取得します。指定された待ち時間内にフレームが利用可能でない場合、黒フレームまたはNoneを返します。

    def next(self, black=True, wait=1):
        frame = np.zeros((self.camera_height, self.camera_width, 3), np.uint8) if black else None
        try:
            frame = self.buffer.get(timeout=wait)
            self.frames_returned += 1
        except queue.Empty:
            pass
        return frame

結論

このクラスを使用することで、Webカメラからのフレームを効率的に処理することが可能になります。スレッドとキューを利用することで、アプリケーションのパフォーマンスを向上させ、UIの応答性を保ちながらリアルタイムでの映像処理を実現できます。

全体コード

import cv2
import numpy as np
import threading
import queue
import time

# Camera Thread Class from your provided code
class Camera_Thread:

    def __init__(self,buffer_all = False):
        # Initialize camera
        self.camera_source = 0
        self.camera_width = 640
        self.camera_height = 480
        self.camera_frame_rate = 30
        self.camera_fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        self.buffer_length = 5
        self.buffer_all = buffer_all

        self.camera = None
        self.buffer = None
        self.frame_grab_run = False
        self.frame_grab_on = False
        self.frame_count = 0
        self.frames_returned = 0
        self.current_frame_rate = 0
        self.loop_start_time = 0
        self.thread = None

    def start(self):
        if self.buffer_all:
            self.buffer = queue.Queue(self.buffer_length)
        else:
            self.buffer = queue.Queue(1)

        self.camera = cv2.VideoCapture(self.camera_source)
        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
        self.camera.set(cv2.CAP_PROP_FPS, self.camera_frame_rate)
        self.camera.set(cv2.CAP_PROP_FOURCC, self.camera_fourcc)

        time.sleep(0.5)  # Allow camera to initialize

        self.frame_grab_run = True
        self.thread = threading.Thread(target=self.loop)
        self.thread.start()

    def stop(self):
        self.frame_grab_run = False
        if self.thread:
            self.thread.join()

        if self.camera:
            self.camera.release()
        self.buffer = None

    def loop(self):
        self.frame_grab_on = True
        while self.frame_grab_run:
            if not self.buffer.full():
                ret, frame = self.camera.read()
                if not ret:
                    break
                self.buffer.put(frame)
                self.frame_count += 1

        self.frame_grab_on = False

    def next(self, black=True, wait=1):
        frame = np.zeros((self.camera_height, self.camera_width, 3), np.uint8) if black else None
        try:
            frame = self.buffer.get(timeout=wait)
            self.frames_returned += 1
        except queue.Empty:
            pass
        return frame

# Main function to start the camera and measure FPS
def main():
    camera_thread = Camera_Thread(buffer_all=True)
    camera_thread.start()

    frame_count = 0
    start_time = time.time()

    while True:
        frame = camera_thread.next(black=False)
        if frame is not None:
            cv2.imshow("Camera Thread", frame)
            frame_count += 1

            if frame_count >= 100:
                elapsed_time = time.time() - start_time
                fps = frame_count / elapsed_time
                print(f"Measured FPS: {fps:.2f}")
                frame_count = 0
                start_time = time.time()

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    camera_thread.stop()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


参考資料

9
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?