はじめに
CQ出版社の月刊誌「Interface」の2021年1月号には、「Jetson/ラズパイ/PCで自習 Python画像処理100」が掲載された。この掲載記事の中で、”便利ワザ”として「HTTPによるビデオ・ストリーミング配信」、「ビデオ・ストリーミング・サーバ&レシーバ」が紹介されていた。これをまんま、筆者の環境で動かしたく思った。筆者はJetsonNanoに接続できるUSBなカメラはD455しか持っていない。D455のカラーカメラ映像をHTTPでJpegストリーミングしてみる。
OpenCV でD455 の映像を扱う
掲載されていた便利ワザでは、カメラ映像をOpenCVで取得して、Jpeg圧縮している。D455の映像をOpenCVで扱うにはどうしたらよいのか、調べると、次のサイトに情報があった。
RealSenseをcv2.VideoCaptureライクに使う
https://qiita.com/yumion/items/6eeb820c1f06839d57a7
こちらのpythonスクリプトをコピーさせていただいておく。使い方のポイントは、read()での画像読み込みがOpenCVの cv2.VideCaptureクラスライクなところ。
cap = RealsenseCapture()
# プロパティの設定
cap.WIDTH = 640
cap.HEIGHT = 480
cap.FPS = 30
# cv2.VideoCapture()と違ってcap.start()を忘れずに
cap.start()
while true:
ret, frames = cap.read() # frames[0]にRGB、frames[1]にDepthの画像がndarrayが入っている
Jetson Nano のIPアドレスをスクリプト内で取得する
掲載されていた便利ワザは、IPアドレスを決め打ちの設定で書き込んでいる。JetsonNanoをLAN内据え置き型で使うなら問題ないだろうが、持ち運んだ先で使おうかなとか思う、ネットワーク管理者からしたら傍迷惑な輩が使うにはちょっと不便である。JetsonNanoは、装備されている有線LANポートがあるし据え付けの利用前提であれば有線LAN接続すれば良さそうでもあるけれど、Wifi接続にして可能な限りモビリティを高めたい(モビリティを追求できないのは、電源(モバイルバッテリー)の調達の問題)。
調べると、次のサイトに複数LANアダプタのIPアドレス取得方法があった。
Pythonで自身のIPアドレスを取得
https://qiita.com/mm_sys/items/f991be3b69d4947616b7
使わせてもらったところ、microUSB端子にもIPアドレスの割り当てがあって、eth0, wlan0, l4tbr0 の3つの有効なアドレスが得られることがわかった。
Jetson Nano から HTTPストリーミング
掲載されている便利ワザを少し改造して、D455の映像をpyrealsense2で取得して、JetsonNanoのwlan0から配信するスクリプトを作ってみた。なお、bottleライブラリをインポートして、webサーバにしている点は、CQ出版Interfaceと同じだ。
import pyrealsense2 as rs
import numpy as np
class RealsenseCapture:
def __init__(self):
self.WIDTH = 640
self.HEGIHT = 480
self.FPS = 30
# Configure depth and color streams
self.config = rs.config()
self.config.enable_stream(rs.stream.color, self.WIDTH, self.HEGIHT, rs.format.bgr8, self.FPS)
self.config.enable_stream(rs.stream.depth, self.WIDTH, self.HEGIHT, rs.format.z16, self.FPS)
def start(self):
# Start streaming
self.pipeline = rs.pipeline()
self.pipeline.start(self.config)
print('pipline start')
def read(self, is_array=True):
# Flag capture available
ret = True
# get frames
frames = self.pipeline.wait_for_frames()
# separate RGB and Depth image
self.color_frame = frames.get_color_frame() # RGB
self.depth_frame = frames.get_depth_frame() # Depth
if not self.color_frame or not self.depth_frame:
ret = False
return ret, (None, None)
elif is_array:
# Convert images to numpy arrays
color_image = np.array(self.color_frame.get_data())
depth_image = np.array(self.depth_frame.get_data())
return ret, (color_image, depth_image)
else:
return ret, (self.color_frame, self.depth_frame)
def release(self):
import netifaces as ni
import psutil
import os
import socket
def get_ip() -> list:
if os.name == "nt":
# Windows
return socket.gethostbyname_ex(socket.gethostname())[2]
pass
else:
# それ以外
result = []
address_list = psutil.net_if_addrs()
for nic in address_list.keys():
ni.ifaddresses(nic)
try:
ip = ni.ifaddresses(nic)[ni.AF_INET][0]['addr']
if ip not in ["127.0.0.1"]:
result.append({'name': nic, 'address': ip})
except KeyError as err:
# print(err)
pass
return result
if __name__ == "__main__":
print(get_ip())
"""
Webサーバーを使用したビデオストリーミング
"""
import cv2
import bottle
import time
from realsensecv import RealsenseCapture
web = bottle.Bottle()
def __main():
try:
cap = cv2.VideoCapture(0, cv2.CAP_V4L)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
if not cap.isOpened(): # ビデオキャプチャー可能か判断
print("Not Opened Video Camera")
exit()
except:
cap = RealsenseCapture()
# プロパティの設定
cap.WIDTH = 640
cap.HEIGHT = 480
cap.FPS = 30
# cv2.VideoCapture()と違ってcap.start()を忘れずに
cap.start()
while True:
ret, frames = cap.read()
color_frame = frames[0]
depth_frame = frames[1]
if not ret: # キャプチャー画像取得に失敗したら終了
print("Video Capture Err")
break
# ここで処理を実行する
result, jpgImg = cv2.imencode('.jpg', img=color_frame, params=[
int(cv2.IMWRITE_JPEG_QUALITY), 80]) # 0 - 100
yield b'--frame\r\n' + b'Content-Type: image/jpeg\r\n\r\n' + bytearray(jpgImg) + b'\r\n\r\n'
time.sleep(1 / 60)
'''
cv2.imshow("Final result", color_frame) # 画面表示
if cv2.waitKey(10) > -1:
break
'''
cap.release()
cv2.destroyAllWindows()
return 0
@web.route('/')
def main():
return bottle.static_file('index.html', root='./')
@web.route('/video_recv')
def video_recv():
bottle.response.content_type = 'multipart/x-mixed-replace;boundary=frame'
return __main()
if __name__ == '__main__':
# print(cv2.__version__)
web.run(host='0.0.0.0', port=8080)
# web.run(host='0.0.0.0', port=8080, reloader=True, debug=True)
<!DOCTYPE html>
<html>
<head>
<title>Video Stream</title>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<style type="text/css">
<!--
#ID_01{
text-align : center;
}
#ID_TITLE{
font-size : 30pt;
text-align : center;
}
-->
</style>
</head>
<body>
<div id="ID_TITLE">シングルボードコンピュータを使用したストリーミング配信</div>
<div id="ID_01">
<img src="./video_recv">
</div>
</body>
</html>
別のPCから、当該PCへhttpで8080ポートへブラウザでアクセスすると・・・
成功。
htmlは、アクセスされたらimgタグのとおり参照先を示し、pythonのbottleで参照された場所へ次々と映像をjpg圧縮して送っている。FPS30までは拙宅のLAN、Wifiでも十分に綺麗な映像をみることができた。
まとめ
RealsenseD455のカラーカメラ映像をJetsonNanoを通してOpenCVでJpeg圧縮しながらHTTPストリーミングできた。