Pythonでネットワーク越しにカメラの映像をストリーミング


はじめに

こんにちは。Pythonに腕を噛まれて入院中のトコロテンです。

本記事では、Pythonを学んでいる最中の初心者が画像処理やソケット通信、GUIのライブラリの†力†に頼ってタイトルの機能を実装したプログラムを紹介します。


注意点

データの送信にUDPではなく、TCPを利用しています。遅いです。UDPを使って自分で色々制御するのは正直めちゃくちゃ面倒くさいです。紹介するプログラムは、「カメラの映像データをネットワーク越しにストリーミングできる」だけのプログラムです。謎の黒魔術の使用、バイナリレベルでの限界高速化といった悪魔的改造は行なっていません。そのため、使える場面は限られていることにご注意ください。


環境


  • macOS Mojave (10.14.5)

  • Python3.7.3


    • numpy (1.16.3)

    • opencv-python (4.1.0.25)

    • Kivy (1.11.1)




プログラム

下に紹介しているプログラムは、GitHubにて公開しています。バグ報告やアルゴリズムの改善案がある場合、このリポジトリにプルリクをいただけると嬉しいです。


通信用設定

サーバとクライアントの両方が読み込む設定ファイルです。サーバのポートや通信するメッセージのヘッダーサイズ等が記述されています。


connection.ini

[server]

ip = 127.0.0.1
port = 12345

[packet]
# [bytes]
header_size = 4
# [pixels]
image_width = 640
image_height = 480



ストリーミングサーバ

サーバはCUIで動作します。指定されたFPSの間隔で定期的にカメラの映像を読み込み、リサイズと圧縮を行ってからクライアントに送信します。圧縮後の画像のバイナリサイズは常に同じにはならないため、送信するメッセージの先頭にバイナリサイズをヘッダ情報として付加します。


streaming_server.py

# coding:utf-8

import socket
import numpy as np
import cv2
import time
import configparser

config = configparser.ConfigParser()
config.read('./connection.ini', 'UTF-8')

# 全体の設定
FPS = 12
INDENT = ' '

# カメラ設定
CAMERA_ID = 0
CAMERA_FPS = 12
CAMERA_WIDTH = 1280
CAMERA_HEIGHT = 720

# サーバ設定
SERVER_IP = '127.0.0.1'
SERVER_PORT = int(config.get('server', 'port'))

# パケット設定
HEADER_SIZE = int(config.get('packet', 'header_size'))

# 画像設定
IMAGE_WIDTH = int(config.get('packet', 'image_width'))
IMAGE_HEIGHT = int(config.get('packet', 'image_height'))
IMAGE_QUALITY = 30

# カメラ設定適用
cam = cv2.VideoCapture(CAMERA_ID)
cam.set(cv2.CAP_PROP_FPS, CAMERA_FPS)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)

# カメラ情報表示
print('Camera {')
print(INDENT + 'ID : {},'.format(CAMERA_ID))
print(INDENT + 'FPS : {},'.format(cam.get(cv2.CAP_PROP_FPS)))
print(INDENT + 'WIDTH : {},'.format(cam.get(cv2.CAP_PROP_FRAME_WIDTH)))
print(INDENT + 'HEIGHT: {}'.format(cam.get(cv2.CAP_PROP_FRAME_HEIGHT)))
print('}')

# クライアントに接続
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((SERVER_IP, SERVER_PORT))
s.listen(1)
soc, addr = s.accept()

print('Server {')
print(INDENT + 'IP : {},'.format(SERVER_IP))
print(INDENT + 'PORT : {}'.format(SERVER_PORT))
print('}')

# クライアント情報表示
print('Client {')
print(INDENT + 'IP : {},'.format(addr[0]))
print(INDENT + 'PORT : {}'.format(addr[1]))
print('}')

# メインループ
while True:
loop_start_time = time.time()

# 送信用画像データ作成
flag, img = cam.read()
resized_img = cv2.resize(img, (IMAGE_WIDTH, IMAGE_HEIGHT))
(status, encoded_img) = cv2.imencode('.jpg', resized_img, [int(cv2.IMWRITE_JPEG_QUALITY), IMAGE_QUALITY])

# パケット構築
packet_body = encoded_img.tostring()
packet_header = len(packet_body).to_bytes(HEADER_SIZE, 'big')
packet = packet_header + packet_body

# パケット送信
try:
soc.sendall(packet)
except socket.error as e:
print('Connection closed.')
break

# FPS制御
time.sleep(max(0, 1 / FPS - (time.time() - loop_start_time)))

s.close()



ストリーミングクライアント

サーバからカメラの映像データを受信し、画面に表示するクライアントです。バッファの中の最新の完成しているパケットの画像を画面に表示し、それ以前のパケット破棄するといったことをしています。この操作を行わない場合、クライアントの処理速度がサーバのパケット送信速度よりも遅かった時に、クライアントのバッファにパケットが溜まり続けてしまい、メモリーが大幅に消費されてしまいます。また、古いデータから処理していこうとすると、パケットがバッファに多く溜まっていた場合、それだけ前の映像を表示することになり、遅延が酷くなります。


streaming_client.py

# coding:utf-8

from kivy.app import App
from kivy.uix.image import Image
from kivy.clock import Clock
from kivy.graphics.texture import Texture
from kivy.core.window import Window
import sys
import cv2
import numpy as np
import socket
import configparser

class StreamView(Image):

def __init__(self, server_ip, server_port, image_width, image_height, view_fps, view_width, view_height, **kwargs):
super(StreamView, self).__init__(**kwargs)

# 通信用設定
self.buff = bytes()
self.PACKET_HEADER_SIZE = 4
self.soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.SERVER_IP = server_ip
self.SERVER_PORT = server_port
self.IMAGE_WIDTH = image_width
self.IMAGE_HEIGHT = image_height

# 表示設定
self.allow_stretch = True
self.VIEW_FPS = view_fps
self.VIEW_WIDTH = view_width
self.VIEW_HEIGHT = view_height

# 画面更新メソッドの呼び出し設定
Clock.schedule_interval(self.update, 1.0 / view_fps)

# サーバに接続
try:
self.soc.connect((self.SERVER_IP, self.SERVER_PORT))
except socket.error as e:
print('Connection failed.')
sys.exit(-1)

def update(self, dt):
# サーバからのデータをバッファに蓄積
data = self.soc.recv(self.IMAGE_HEIGHT * self.IMAGE_WIDTH * 3)
self.buff += data

# 最新のパケットの先頭までシーク
# バッファに溜まってるパケット全ての情報を取得
packet_head = 0
packets_info = list()
while True:
if len(self.buff) >= packet_head + self.PACKET_HEADER_SIZE:
binary_size = int.from_bytes(self.buff[packet_head:packet_head + self.PACKET_HEADER_SIZE], 'big')
if len(self.buff) >= packet_head + self.PACKET_HEADER_SIZE + binary_size:
packets_info.append((packet_head, binary_size))
packet_head += self.PACKET_HEADER_SIZE + binary_size
else:
break
else:
break

# バッファの中に完成したパケットがあれば、画像を更新
if len(packets_info) > 0:
# 最新の完成したパケットの情報を取得
packet_head, binary_size = packets_info.pop()
# パケットから画像のバイナリを取得
img_bytes = self.buff[packet_head + self.PACKET_HEADER_SIZE:packet_head + self.PACKET_HEADER_SIZE + binary_size]
# バッファから不要なバイナリを削除
self.buff = self.buff[packet_head + self.PACKET_HEADER_SIZE + binary_size:]

# 画像をバイナリから復元
img = np.frombuffer(img_bytes, dtype=np.uint8)
img = cv2.imdecode(img, 1)
# 画像を表示用に加工
img = cv2.flip(img, 0)
img = cv2.resize(img, (self.VIEW_WIDTH, self.VIEW_HEIGHT))
# 画像をバイナリに変換
img = img.tostring()

# 作成した画像をテクスチャに設定
img_texture = Texture.create(size=(self.VIEW_WIDTH, self.VIEW_HEIGHT), colorfmt='bgr')
img_texture.blit_buffer(img, colorfmt='bgr', bufferfmt='ubyte')
self.texture = img_texture

def disconnect(self):
# サーバとの接続を切断
self.soc.shutdown(socket.SHUT_RDWR)
self.soc.close()

class StreamingClientApp(App):

def __init__(self, view_fps, view_width, view_height, **kwargs):
super(StreamingClientApp, self).__init__(**kwargs)
self.VIEW_FPS = view_fps
self.VIEW_WIDTH = view_width
self.VIEW_HEIGHT = view_height

def build(self):
# 通信用設定をコンフィグファイルからロード
config = configparser.ConfigParser()
config.read('./connection.ini', 'UTF-8')
config_server_ip = config.get('server', 'ip')
config_server_port = int(config.get('server', 'port'))
config_header_size = int(config.get('packet', 'header_size'))
config_image_width = int(config.get('packet', 'image_width'))
config_image_height = int(config.get('packet', 'image_height'))

# ウィンドウサイズをビューサイズに合わせる
Window.size = (self.VIEW_WIDTH, self.VIEW_HEIGHT)

# ストリームビューを生成し、画面に設定
self.stream_view = StreamView(
server_ip=config_server_ip,
server_port=config_server_port,
image_width=config_image_width,
image_height=config_image_height,
view_fps=self.VIEW_FPS,
view_width=self.VIEW_WIDTH,
view_height=self.VIEW_HEIGHT
)
return self.stream_view

def on_stop(self):
# サーバとの接続を切断
self.stream_view.disconnect()

if __name__ == '__main__':
StreamingClientApp(view_fps=30, view_width=800, view_height=600).run()



結論

Pythonやばいですね。言語機能やライブラリが色々やってくれるため、考えることが殆どないです。†力†、感じました。