はじめに
gRPC は、あらゆる環境で実行できるオープンソースの高性能リモートプロシージャコール (RPC)フレームワークです。
バイナリシリアル化ツールセットおよび言語である Protocol Buffers を使用してサービスを定義します。
今回は、PythonでgRPCを使う練習のために、サーバー側で取得したUSBカメラの映像を複数のクライアントにストリーム送信する仕組みを作ってみます。
環境
- Windows 10
- USBカメラ: ELP-USB4KHDR01-V100
- Python 3.9.7
- grpcio 1.51.1
- grpcio-tools 1.51.1
- numpy 1.24.2
- opencv-python 4.7.0.72
- protobuf 4.21.11
パッケージのインストール
$ pip install grpcio
$ pip install grpcio-tools
$ pip install opencv-python
protoファイルの作成
syntax = "proto3";
service WebcamStreaming {
rpc StartWebcamStreaming(ClientName) returns (stream CaptureImage) {}
}
message ClientName {
string clientName = 1;
}
message CaptureImage {
bytes imageBytes = 1;
}
WebcamStreamingというサービスにStartWebcamStreamingという関数を一つ用意します。
戻り値にはstreamを設定し、CaptureImageというメッセージを返すようにします。
CaptureImageのimageBytesというメンバーに、サーバーで取得したカメラ画像をバイト列にしたものが入ることになります。
これをクライアント側で受け取り、デコードして表示するという流れになります。
protoファイルのコンパイル
from grpc.tools import protoc
protoc.main(
(
"",
"-I.",
"--python_out=.", # 書き出し先指定
"--grpc_python_out=.", # 書き出し先指定
"./webcam_streaming.proto", # 書き出し元のファイル指定
)
)
$ python gen.py
同フォルダにwebcam_streaming_pb2_grpc.pyとwebcam_streaming_pb2.pyの二つのファイルが生成されます。
webcam_streaming_pb2_grpc.py
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import webcam_streaming_pb2 as webcam__streaming__pb2
class WebcamStreamingStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.StartWebcamStreaming = channel.unary_stream(
'/WebcamStreaming/StartWebcamStreaming',
request_serializer=webcam__streaming__pb2.ClientName.SerializeToString,
response_deserializer=webcam__streaming__pb2.CaptureImage.FromString,
)
class WebcamStreamingServicer(object):
"""Missing associated documentation comment in .proto file."""
def StartWebcamStreaming(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_WebcamStreamingServicer_to_server(servicer, server):
rpc_method_handlers = {
'StartWebcamStreaming': grpc.unary_stream_rpc_method_handler(
servicer.StartWebcamStreaming,
request_deserializer=webcam__streaming__pb2.ClientName.FromString,
response_serializer=webcam__streaming__pb2.CaptureImage.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'WebcamStreaming', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class WebcamStreaming(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def StartWebcamStreaming(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/WebcamStreaming/StartWebcamStreaming',
webcam__streaming__pb2.ClientName.SerializeToString,
webcam__streaming__pb2.CaptureImage.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
webcam_streaming_pb2.py
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: webcam_streaming.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x16webcam_streaming.proto\" \n\nClientName\x12\x12\n\nclientName\x18\x01 \x01(\t\"\"\n\x0c\x43\x61ptureImage\x12\x12\n\nimageBytes\x18\x01 \x01(\x0c\x32I\n\x0fWebcamStreaming\x12\x36\n\x14StartWebcamStreaming\x12\x0b.ClientName\x1a\r.CaptureImage\"\x00\x30\x01\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'webcam_streaming_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_CLIENTNAME._serialized_start=26
_CLIENTNAME._serialized_end=58
_CAPTUREIMAGE._serialized_start=60
_CAPTUREIMAGE._serialized_end=94
_WEBCAMSTREAMING._serialized_start=96
_WEBCAMSTREAMING._serialized_end=169
# @@protoc_insertion_point(module_scope)
サーバー側の実装
- サービス
from queue import Queue
import time
import threading
import cv2
import webcam_streaming_pb2
import webcam_streaming_pb2_grpc
class WebcamStreamingService(
webcam_streaming_pb2_grpc.WebcamStreamingServicer
):
def __init__(self):
super(WebcamStreamingService, self).__init__()
self.__capture_data_queue_list = []
self.__capture_thread = threading.Thread(
target=self.__capture_image, name="Capture Thread"
)
self.__capture_thread.setDaemon(True)
def __capture_image(self):
# global cap_msmf.cpp:1759 CvCapture_MSMF::grabFrame videoio(MSMF): can't grab frame. Error: -1072873822
# のようなエラーが出てしまったため、cv2.CAP_DSHOWオプションを追加した
# cap = cv2.VideoCapture(0)
cap = cv2.VideoCapture(0, cv2.CAP_DSHOW)
# 解像度の設定
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
# デコードされたビデオフレームを取得
ret, frame = cap.read()
if ret:
# JPEG形式でイメージをエンコード
_, buf = cv2.imencode(".jpg", frame)
# 接続されているクライアントそれぞれに、エンコードされたバイト列を送るためにキューに追加
for queue in self.__capture_data_queue_list:
queue.put(buf.tobytes())
# 30fpsに設定
time.sleep(1 / 30)
def StartWebcamStreaming(self, request, context):
print(f"WebcamStreaming Start! Request from {request.clientName}")
capture_data_queue = Queue()
self.__capture_data_queue_list.append(capture_data_queue)
if not self.__capture_thread.is_alive():
self.__capture_thread.start()
while True:
# Capture Thread内でキューが積まれるまで待機
data = capture_data_queue.get()
# クライアントにデータ送信
yield webcam_streaming_pb2.CaptureImage(imageBytes=data)
capture_data_queue.task_done()
クライアントから最初にStartWebcamStreaming関数が呼ばれると、ループでカメラ画像を取得するためのCapture Threadが立ち上がります。
また、クライアント側はデータを受け取るためのQueueを登録します。
Capture Thread内でカメラ画像を取得するたびに、そのデータをそれぞれのクライアントのQueueに積みます。
StartWebcamStreaming関数内のループでQueueからデータを取り出し、クライアントに送信します。
- サーバー立ち上げ用ファイル
from concurrent import futures
import grpc
from webcam_streaming_service import WebcamStreamingService
import webcam_streaming_pb2_grpc
def serve():
# サーバの作成
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
webcam_streaming_pb2_grpc.add_WebcamStreamingServicer_to_server(
WebcamStreamingService(), server
)
# ポートを開く
server.add_insecure_port("[::]:1234")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
クライアント側の実装
import socket
import sys
import cv2
import grpc
import numpy as np
import webcam_streaming_pb2
import webcam_streaming_pb2_grpc
class GrpcClient:
def __init__(
self, connect_ip: str, port: str = "1234",
):
super(GrpcClient, self).__init__()
# gRPCサーバへのチャネルを作成
channel = grpc.insecure_channel(f"{connect_ip}:{port}")
self.__stub = webcam_streaming_pb2_grpc.WebcamStreamingStub(channel)
def start_webcam_streaming(self):
print("gRPC client: webcam streaming Start")
# リクエストの作成
# clientNameにコンピュータ名を設定
request = webcam_streaming_pb2.ClientName(
clientName=socket.gethostname()
)
# サーバの関数呼び出し
responses = self.__stub.StartWebcamStreaming(request)
for response in responses:
frame_byte = response.imageBytes
# サーバから受け取ったバイト列を一次元配列として解釈
data_array = np.frombuffer(frame_byte, dtype=np.uint8)
# 3チャネルのBGRカラーイメージに変換
image = cv2.imdecode(data_array, cv2.IMREAD_COLOR)
# 画像の表示
cv2.imshow("Received Frame", image)
# 画像がすぐに消えてしまい(?)、表示されないためwaitKeyを入れる
cv2.waitKey(1)
if __name__ == "__main__":
# コマンドライン引数でサーバ側のIPアドレスを設定する
# ローカルで試すなら 127.0.0.1
argv = sys.argv
connect_ip = argv[1]
client = GrpcClient(connect_ip)
client.start_webcam_streaming()
動作確認
- サーバー側
$ python grpc_server.py
- クライアント側
$ python grpc_client {サーバーのIP}
参考URL
- [Python]Grpcでカメラ映像をストリームで共有する
- Python で cv2.imshow としても画像が表示されない
- https://numpy.org/doc/stable/reference/generated/numpy.frombuffer.html
- https://docs.opencv.org/3.4/d4/da8/group__imgcodecs.html
- https://docs.opencv.org/3.4/d8/d6a/group__imgcodecs__flags.html#ga61d9b0126a3e57d9277ac48327799c80
- https://docs.opencv.org/3.4/d8/dfe/classcv_1_1VideoCapture.html#a473055e77dd7faa4d26d686226b292c1
- Video Capture from webcam only works when debugging
- 【Python×OpenCV】カメラから画像を取得するコマンドcv2.readとは
- https://grpc.io/