はじめに
ラズパイのカメラ画像をローカルネットワーク内で複数ブラウザ(タブ)にストリーミングするのに少しハマったので、ここに自分の勉強も兼ねてまとめます。
今回のスクリプトは以下に置いてあります。
https://github.com/RIckyBan/pi-cam-streaming
環境
- Raspberry Pi 4 Model B (4GB)
- Python: 3.7.3
- OpenCV: 4.1.0
- Flask: 1.0.2
- カメラ: Raspberry Pi Camera Module V2
OpenCV+Flaskによるシンプルなストリーミング
ラズパイのカメラ画像をストリーミングするには、mjpg-streamerを使う方法と、PicameraやOpenCVなどのライブラリでカメラからフレームを直接取得しサーバー上で処理する方法の2つがあります。今回は、後々OpenCVでフレームに何らかの画像処理をすることを考えて、後者を採用します。
この実装に関しては以下の記事が大変詳しく参考になりました。
FlaskとOpenCVで live streaming
以下のような構成をとれば、カメラ画像をcv2.VideoCapture
で取得してFlaskに渡すことができます。シンプルですね。
.
├── camera.py
├── app.py
└── templates
└── stream.html
<html>
<head>
<title>Flask Streaming Test</title>
</head>
<body>
<h1>Flask Streaming Test</h1>
<img src="{{ url_for('video_feed') }}">
</body>
</html>
import cv2
from flask import Flask, render_template, Response
from camera import Camera
app = Flask(__name__)
@app.route("/")
def index():
return "Hello World!"
@app.route("/stream")
def stream():
return render_template("stream.html")
def gen(camera):
while True:
frame = camera.get_frame()
if frame is not None:
yield (b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + frame.tobytes() + b"\r\n")
else:
print("frame is none")
@app.route("/video_feed")
def video_feed():
return Response(gen(Camera()),
mimetype="multipart/x-mixed-replace; boundary=frame")
if __name__ == "__main__":
app.debug = True
app.run(host="0.0.0.0", port=5000)
# camera.py
import cv2
class Camera(object):
def __init__(self):
self.video = cv2.VideoCapture(0)
def __del__(self):
self.video.release()
def get_frame(self):
success, image = self.video.read()
ret, frame = cv2.imencode('.jpg', image)
return frame
そして以下を実行します。
$ python3 app.py
上手く行けば、ラズパイのプライベートIPを確認して、xx.xx.xx.xx:5000/stream
またはlocalhost:5000/stream
にアクセスすると、以下のような出力を得られるはずです。
しかしこの実装では複数ブラウザ・タブからのアクセスに対応しておらず、別のタブからアクセスしても画像は表示されません。
原因と解決法について調べたところ、以下の記事が解決策を教えてくれました。
記事: Flask Video Streaming Revisited
実装: https://github.com/miguelgrinberg/flask-video-streaming
素晴らしい実装で感銘を受けたので、ここからはこの記事の実装が何をしているのか、自分が理解できた範囲で解説する形を取りたいと思います(※コードは適宜簡略化して載せています)。
最終的な構成は以下の通りです。
.
├── app.py
├── base_camera.py
├── camera.py
└── templates
└── stream.html
Flaskでの同時アクセス
この記事にもある通り、そもそもFlaskでは同時アクセスがデフォルトで無効になっています。
同時アクセスを許可するには、app.py
のapp.run
の引数を以下の通り変更します。
app.run(host="0.0.0.0", port=5000, threaded=True)
フレーム取得スレッドのバックグラウンド実行
これは自分の推測ですが、恐らくOpenCVによるカメラへのアクセスは排他制御で、複数のブラウザ・タブからアクセスする場合も、カメラからフレームを取得するスレッドの数は1つである必要があります。現状の実装では/stream
にアクセスする際に、それぞれCamera
クラスのインスタンスが呼び出され、そのメンバ変数にcv2.VideoCapture
が格納されるため、ストリーミングが行われているタブでのプロセスが止まるまで他のタブは固まってしまいます。
そこで先程の記事では、cv2.VideoCapture
をstaticmethod内に実装することで、各インスタンスで共通の処理を参照する工夫をしています(BaseCamera
クラスについては後述)。
import cv2
from base_camera import BaseCamera
class Camera(BaseCamera):
def __init__(self):
super().__init__()
@staticmethod
def frames():
camera = cv2.VideoCapture(0)
if not camera.isOpened():
raise RuntimeError('Could not start camera.')
while True:
# read current frame
_, img = camera.read()
# encode as a jpeg image and return it
yield cv2.imencode('.jpg', img)[1].tobytes()
しかしCamera
クラス内には、app.py
で呼び出すget_frame
メソッドが実装されていません。これは、親クラスのBaseCamera
で実装します。実はこちらの実装ではthreading.Event
を用いており、フレーム取得の処理が終わるまで各ブラウザが呼び出したフレーム表示の処理を待機させ、複数クライアントへの効率的なフレームの受け渡しを実現しています。
イベント制御
フレーム取得のイベント制御を行うのが、以下のCameraEvent
クラスです。
なお、threading.Event
の使い方については以下の記事が参考になりました。
おまいらのthreading.Eventの使い方は間違っている
CameraEventクラス
class CameraEvent(object):
"""An Event-like class that signals all active clients when a new frame is
available.
"""
def __init__(self):
self.events = {}
def wait(self):
"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait()
def set(self):
"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove = []
for ident, event in self.events.items():
if not event[0].isSet():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than 5 seconds, then assume
# the client is gone and remove it
if now - event[1] > 5:
remove.append(ident)
for ident in remove:
del self.events[ident]
def clear(self):
"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
このクラスでは、各ブラウザから呼び出されたフレーム表示のスレッドを、self.events
という辞書で管理します。
wait
メソッドでは呼び出したスレッドのスレッドidを取得して、これがself.events
に入っていなければ、threading.Event()
とタイムスタンプを一緒に登録します。そしてreturn文では、今登録したthreading.Event()
のwait
メソッドを呼び出し、set
メソッドが呼び出されるまで処理を待機させます。
set
メソッドではself.events
内の全てのスレッドについて、threading.Event()
のisSet
メソッドにより、各Event
が待機状態にあるかを確認します。もしFalse
なら待機状態を解除するためset
メソッドを呼び出し、タイムスタンプを更新します。もしTrue
なら、この前にwait
メソッドが呼び出されていないということを意味するので、処理がタイムアウトしていると分かります。よってここでは、タイムスタンプが5秒以上前なら辞書から削除するという処理を実行します。
最後のclear
メソッドでは、Event
を繰り返し使うために必要なclear
メソッドを呼び出します。
BaseCameraクラス
フレーム取得の処理を実装しているのが、以下のBaseCamera
クラスです。
class BaseCamera(object):
thread = None # background thread that reads frames from camera
frame = None # current frame is stored here by background thread
last_access = 0 # time of last client access to the camera
event = CameraEvent()
def __init__(self):
"""Start the background camera thread if it isn't running yet."""
if BaseCamera.thread is None:
BaseCamera.last_access = time.time()
# start background frame thread
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()
# wait until frames are available
while self.get_frame() is None:
time.sleep(0)
def get_frame(self):
"""Return the current camera frame."""
BaseCamera.last_access = time.time()
# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()
return BaseCamera.frame
@staticmethod
def frames():
""""Generator that returns frames from the camera."""
raise RuntimeError('Must be implemented by subclasses.')
@classmethod
def _thread(cls):
"""Camera background thread."""
print('Starting camera thread.')
frames_iterator = cls.frames()
for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set() # send signal to clients
time.sleep(0)
# if there hasn't been any clients asking for frames in
# the last 10 seconds then stop the thread
if time.time() - BaseCamera.last_access > 10:
frames_iterator.close()
print('Stopping camera thread due to inactivity.')
break
BaseCamera.thread = None
ここでもthread
, frame
, last_access
, event
に関してはメンバ変数ではなくクラス変数に保持することで、各インスタンスから共通の値を参照しています。このthread
が、バックグラウンドでカメラからフレームを取得する処理に相当します。
コンストラクタでは、last_access
の更新と後述する_thread
メソッドのバックグラウンド実行を行い、またフレームが取得可能な状態になるまでsleep
します。
app.py
で呼び出されるget_frame
メソッドでは、last_access
の値を更新し、フレームをreturnする前にBaseCamera.event.wait()
の処理が実行されます。すなわち、カメラ側でのフレームの準備が整う前にreturnが実行されることを防いでいます。
_thread
では先程Camera
クラスで定義したframes
メソッドからフレームを取得して、クラス変数であるframe
に代入しCameraEvent
のset
メソッドを呼び出します。これにより、get_frame
メソッドの実行で待機していた各スレッドがフレームをreturnすることができます。その後ではlast_access
の値を確認し、タイムアウトしていればカメラ取得のスレッドを停止させます。
以上の実装を行うことで、複数タブでフレームをストリーミングすることが出来るようになります!
最後に
用語の使い方や理解が怪しい部分があるので、もしお気づきの点があればお知らせください🙏