3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonでリアルタイムWebカメラ処理を行うときに最新フレームを常に取得する

Posted at

https://qiita.com/kotai2003/items/466b2be885ff4cdbbde2
↑の記事にてPythonでリアルタイムWebカメラ処理を行うときの方法が書いてある.
この方法を参考に最新フレームを取得できるようにしたものが以下のコードである.
尚,必要のないコードは消したり改変したりしてしまったので,必要な人は適宜直してもらうと良いと思います.

全体コード

import cv2
import numpy as np
import threading
import queue
import time

class CameraThread:
    def __init__(self):
        # Initialize camera
        self.camera_source = 0
        self.camera_width = 640
        self.camera_height = 480
        self.camera_frame_rate = 30
        self.camera_fourcc = cv2.VideoWriter_fourcc(*"MJPG")

        self.camera = None
        self.buffer = None
        self.frame_grab_run = False
        self.frame_grab_on = False
        self.frame_count = 0
        self.frames_returned = 0
        self.current_frame_rate = 0
        self.loop_start_time = 0
        self.thread = None

    def start(self):
        self.buffer = queue.Queue(1)

        self.camera = cv2.VideoCapture(self.camera_source)

        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
        self.camera.set(cv2.CAP_PROP_FPS, self.camera_frame_rate)
        self.camera.set(cv2.CAP_PROP_FOURCC, self.camera_fourcc)
        self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)

        time.sleep(0.5)  # Allow camera to initialize

        self.frame_grab_run = True
        self.thread = threading.Thread(target=self.loop)
        self.thread.start()

    def stop(self):
        self.frame_grab_run = False
        if self.thread:
            self.thread.join()

        if self.camera:
            self.camera.release()
        self.buffer = None

    def loop(self):
        self.frame_grab_on = True
        while self.frame_grab_run:
            ret, frame = self.camera.read()
            if not ret:
                break
            if self.buffer.full():
                try:
                    self.buffer.get(block=False)
                    self.frame_count -= 1
                except queue.Empty:
                    pass
            self.buffer.put(frame)
            self.frame_count += 1
        self.frame_grab_on = False

    def next(self):
        frame = None
        try:
            frame = self.buffer.get()
            self.frames_returned += 1
        except queue.Empty:
            pass
        return frame

変更点

loop関数

カメラフレームを取得してからqueueputするようにした.
queueに既にフレームが入っていればそのフレームを破棄するようにした.

    def loop(self):
        self.frame_grab_on = True
        while self.frame_grab_run:
            ret, frame = self.camera.read()
            if not ret:
                break
            if self.buffer.full():
                try:
                    self.buffer.get(block=False)
                    self.frame_count -= 1
                except queue.Empty:
                    pass
            self.buffer.put(frame)
            self.frame_count += 1
        self.frame_grab_on = False

next関数

queueから取り出せるまでずっと待機するようにした.
tryで囲む必要はないかもしれない.

    def next(self):
        frame = None
        try:
            frame = self.buffer.get()
            self.frames_returned += 1
        except queue.Empty:
            pass
        return frame
3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?