PythonとOpenCVを組み合わせてWebカメラからのフレームを効率的に取得し処理する方法について説明します。この記事では、threadingとqueueモジュールを使用して、リアルタイムのカメラフィードを処理するクラスCamera_Threadの作成を行います。特に、マルチスレッディングを利用してカメラからのフレームを連続的に取得し、それをメインアプリケーションが処理できるようにキューに保存する方法に焦点を当てます。
基本的な概念
Threading
Pythonのthreadingモジュールを使用すると、複数のタスクを並行して実行できます。 これにより、重い処理をバックグラウンドで実行しながら、メインプログラムのレスポンス性を保つことができます。本例では、Webカメラからのフレームを取得するために別のスレッドを使用し、メインスレッドは取得したフレームを表示および処理します。
Queue
queue.Queueは、スレッドセーフなキュー実装で、プロデューサー(データを生成するスレッド)とコンシューマー(データを消費するスレッド)間でデータをやり取りするのに適しています。 キューは、入れた順にデータを取り出せる先入れ先出し(FIFO)の構造を持っています。スレッドセーフであるため、複数のスレッドから安全にアクセスすることができます。
Camera_Threadクラスの解説
クラスの初期設定
Camera_Thread
クラスは、カメラの設定(解像度やフレームレートなど)と、フレームを保存するためのキューを初期化します。カメラはOpenCVのcv2.VideoCaptureを使用してアクセスします。
class Camera_Thread:
def __init__(self,buffer_all = False):
# Initialize camera
self.camera_source = 0
self.camera_width = 640
self.camera_height = 480
self.camera_frame_rate = 30
self.camera_fourcc = cv2.VideoWriter_fourcc(*"MJPG")
self.buffer_length = 5
self.buffer_all = buffer_all
self.camera = None
self.buffer = None
self.frame_grab_run = False
self.frame_grab_on = False
self.frame_count = 0
self.frames_returned = 0
self.current_frame_rate = 0
self.loop_start_time = 0
self.thread = None
スタートとストップメソッド
start()
メソッドは、キューとカメラを初期化し、カメラからフレームを取得するスレッドを開始します。stop()
メソッドは、スレッドを安全に停止し、カメラリソースを解放します。
def start(self):
if self.buffer_all:
self.buffer = queue.Queue(self.buffer_length)
else:
self.buffer = queue.Queue(1)
self.camera = cv2.VideoCapture(self.camera_source)
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
self.camera.set(cv2.CAP_PROP_FPS, self.camera_frame_rate)
self.camera.set(cv2.CAP_PROP_FOURCC, self.camera_fourcc)
time.sleep(0.5) # Allow camera to initialize
self.frame_grab_run = True
self.thread = threading.Thread(target=self.loop)
self.thread.start()
def stop(self):
self.frame_grab_run = False
if self.thread:
self.thread.join()
if self.camera:
self.camera.release()
self.buffer = None
フレームの取得とバッファへの保存
loop()
メソッドは、カメラが稼働中である限り、連続してフレームを読み取り、キューに追加します。このメソッドは専用のスレッドで実行され、カメラからのフレームをリアルタイムで取得し続けることができます。
def loop(self):
self.frame_grab_on = True
while self.frame_grab_run:
if not self.buffer.full():
ret, frame = self.camera.read()
if not ret:
break
self.buffer.put(frame)
self.frame_count += 1
self.frame_grab_on = False
フレームの取得
next()
メソッドはキューからフレームを取得します。指定された待ち時間内にフレームが利用可能でない場合、黒フレームまたはNoneを返します。
def next(self, black=True, wait=1):
frame = np.zeros((self.camera_height, self.camera_width, 3), np.uint8) if black else None
try:
frame = self.buffer.get(timeout=wait)
self.frames_returned += 1
except queue.Empty:
pass
return frame
結論
このクラスを使用することで、Webカメラからのフレームを効率的に処理することが可能になります。スレッドとキューを利用することで、アプリケーションのパフォーマンスを向上させ、UIの応答性を保ちながらリアルタイムでの映像処理を実現できます。
全体コード
import cv2
import numpy as np
import threading
import queue
import time
# Camera Thread Class from your provided code
class Camera_Thread:
def __init__(self,buffer_all = False):
# Initialize camera
self.camera_source = 0
self.camera_width = 640
self.camera_height = 480
self.camera_frame_rate = 30
self.camera_fourcc = cv2.VideoWriter_fourcc(*"MJPG")
self.buffer_length = 5
self.buffer_all = buffer_all
self.camera = None
self.buffer = None
self.frame_grab_run = False
self.frame_grab_on = False
self.frame_count = 0
self.frames_returned = 0
self.current_frame_rate = 0
self.loop_start_time = 0
self.thread = None
def start(self):
if self.buffer_all:
self.buffer = queue.Queue(self.buffer_length)
else:
self.buffer = queue.Queue(1)
self.camera = cv2.VideoCapture(self.camera_source)
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
self.camera.set(cv2.CAP_PROP_FPS, self.camera_frame_rate)
self.camera.set(cv2.CAP_PROP_FOURCC, self.camera_fourcc)
time.sleep(0.5) # Allow camera to initialize
self.frame_grab_run = True
self.thread = threading.Thread(target=self.loop)
self.thread.start()
def stop(self):
self.frame_grab_run = False
if self.thread:
self.thread.join()
if self.camera:
self.camera.release()
self.buffer = None
def loop(self):
self.frame_grab_on = True
while self.frame_grab_run:
if not self.buffer.full():
ret, frame = self.camera.read()
if not ret:
break
self.buffer.put(frame)
self.frame_count += 1
self.frame_grab_on = False
def next(self, black=True, wait=1):
frame = np.zeros((self.camera_height, self.camera_width, 3), np.uint8) if black else None
try:
frame = self.buffer.get(timeout=wait)
self.frames_returned += 1
except queue.Empty:
pass
return frame
# Main function to start the camera and measure FPS
def main():
camera_thread = Camera_Thread(buffer_all=True)
camera_thread.start()
frame_count = 0
start_time = time.time()
while True:
frame = camera_thread.next(black=False)
if frame is not None:
cv2.imshow("Camera Thread", frame)
frame_count += 1
if frame_count >= 100:
elapsed_time = time.time() - start_time
fps = frame_count / elapsed_time
print(f"Measured FPS: {fps:.2f}")
frame_count = 0
start_time = time.time()
if cv2.waitKey(1) & 0xFF == ord('q'):
break
camera_thread.stop()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()
参考資料