はじめに
メカトラックス株式会社のmaeda01です。
以前 Local Console のセットアップ と オリジナルモデルの作成 の記事を作成しました。
実際にオリジナルのモデルを運用していくうえで推論実行時に Local Console を毎回起動し、推論を行っている間 PC を起動し続けなければならないのは少し不便です。
そこで本記事では Local Console ではなく Raspberry Pi からエッジデバイス AIH-IVRW2 を操作し、Local Console を使わず、スタンドアロンで推論結果を取得する方法を紹介します。
参考記事
本記事を作成するにあたり、以下の記事を参考にしました。
Windows PC で推論の開始と停止を行っている部分を Raspberry Pi に置き換えています。
構成
- エッジデバイス AIH-IVRW2
- メインチップファームウェアバージョン:
0400FEWS
- センサチップファームウェアバージョン:
020300
- AI モデル: 手話の検出モデル (オリジナルモデルの作成の記事 で作成)
- メインチップファームウェアバージョン:
- Raspberry Pi 4 Model B
- 使用 OS: Raspberry Pi OS (64bit) Bookworm
※ エッジデバイス AIH-IVRW2 は Wifi で、Raspberry Pi は有線 LAN で同一のネットワークに接続しています
Raspberry Pi での設定
MQTT ブローカー
Local Console を使用する場合は Local Console をインストールしている PC (Windows) の mosquitto を MQTT ブローカーに使用していました。
これを Raspberry Pi の中の MQTT ブローカーに置き換えます。
以下のコマンドで MQTT ブローカー (mosquitto) をインストールします。
sudo apt install -y mosquitto mosquitto-clients
必要に応じてポートの設定などを行ってください。
本記事ではデフォルトのポート 1883 を使用します。
HTTP サーバー
エッジデバイス AIH-IVRW2 から画像とメタデータを送信する際、HTTP メソッドが使用されます。データを受信するための HTTP サーバーを構築します。
以下のサイトを参考にパッケージのインストールと Python プログラムの作成を行ってください。
本記事ではパッケージのインストールの際に仮想環境を使用しています。
AIH-IVRW2 の設定
QR コードの書き換え
Local Console でデバイスの登録をする際 (この手順) に発行される QR コードは以下のような構成になっています。
AAIAAAAAAAAAAAAAAAAAAA==N=11;E=<ブローカーのアドレス>;H=<ブローカーのポート>;t=1;S=<Wifi の SSID>;P=<Wifi のパスワード>;T=pool.ntp.org;U1FS
Local Console で作成される QR コードではブローカーのアドレスは Local Console が入っている PC の IP アドレス、ポートは 1883 になっていました。
これを Raspberry Pi の MQTT ブローカーのアドレスとポートに置き換えます。
置き換えた後は再度 QR コードを発行します。
以下のようなサイトで QR コードが作成できます。
QR コードを再作成したらカメラの Reset / Mode Button を5秒程度押して QR コード読み取りモードに移行させ、QR コードを読み寄ります。
モード移行に関しては以下をご参照ください。
スタンドアロン化のテスト
実際に Raspberry Pi からメッセージを publish し、 Local Console を使用せずに推論の開始と停止を行います。
処理の流れとしては以下のようになります。
HTTP サーバーの起動
Raspberry Pi で以下のコマンドで HTTP サーバーを起動します。
HTTP サーバーのポート番号は空いているポートで自由に設定してください。
venv_path/bin/uvicorn webapp:app_ins --reload --host raspberrypi.local --port 10066 --no-access-log
起動すると以下のようなログが出力されます。
INFO: Will watch for changes in these directories: ['/opt/camera/server']
INFO: Uvicorn running on http://raspberrypi.local:10066 (Press CTRL+C to quit)
INFO: Started reloader process [2480] using StatReload
INFO: Started server process [2482]
INFO: Waiting for application startup.
INFO: Application startup complete.
推論開始のメッセージの送信
以下のようなコードでメッセージを送信します。コードは 参考記事 をもとに作成しています。
送信トピック名の数字8桁の部分は何でもよいそうですが、念のため筆者の環境で Wireshark でキャプチャされた数字を使用しています。
また MQTT のブローカーや HTTP サーバーのアドレスは Raspberry Pi のアドレスを使用しています。
推論開始のメッセージを送信するコード例
#!/usr/bin/env python3
import json
import paho.mqtt.client as mqtt
# MQTT Brokerの設定
broker = "192.168.***.***"
port = 1883
topic = "v1/devices/me/rpc/request/26255775"
# 送信するメッセージ
message = {
"method": "ModuleMethodCall",
"params": {
"moduleMethod": "StartUploadInferenceData",
"moduleInstance": "backdoor-EA_Main",
"params": {
"CropHOffset": 0,
"CropVOffset": 0,
"CropHSize": 4056,
"CropVSize": 3040,
"Mode": 1,
"UploadMethod": "HttpStorage",
"StorageName": "http://192.168.***.***:10066", # メタデータ送信先のhttpサーバ(適宜変更)
"StorageSubDirectoryPath": "image",
"UploadMethodIR": "HttpStorage",
"StorageNameIR": "http://192.168.***.***:10066", # 画像データ送信先のhttpサーバ(適宜変更)
"UploadInterval": 30,
"StorageSubDirectoryPathIR": "meta"
}
}
}
# メッセージをJSON形式に変換
message_json = json.dumps(message)
# MQTTクライアントの設定
client = mqtt.Client()
# MQTT Brokerへの接続
client.connect(broker, port)
# メッセージの送信
client.publish(topic, message_json)
# メッセージ送信後に接続を切断
client.disconnect()
print(f"Message sent to topic {topic}")
このコードを実行すると MQTT ブローカーに接続され、推論開始のメッセージが publish されます。
推論開始時に送信するメッセージのパラメータについては以下のドキュメントをご参照ください。
推論停止のメッセージの送信
以下のようなコードでメッセージを送信します。コードは同じく 参考記事 をもとに作成しています。
送信トピック名の数字8桁の部分はこちらも何でもよいそうですが、推論開始のメッセージを publish したときに使用した数字は使用できないようです。
こちらも同じく筆者の環境で Wireshark でキャプチャされた数字を使用しています。
推論停止のメッセージを送信するコード
#!/usr/bin/env python3
import json
import paho.mqtt.client as mqtt
# MQTT Brokerの設定
broker = "192.168.***.***"
port = 1883
topic = "v1/devices/me/rpc/request/7290184"
# 送信するメッセージ
message = {
"method": "ModuleMethodCall",
"params": {
"moduleMethod": "StopUploadInferenceData",
"moduleInstance": "backdoor-EA_Main",
"params": {}
}
}
# メッセージをJSON形式に変換
message_json = json.dumps(message)
# MQTTクライアントの設定
client = mqtt.Client()
# MQTT Brokerへの接続
client.connect(broker, port)
# メッセージの送信
client.publish(topic, message_json)
# メッセージ送信後に接続を切断
client.disconnect()
print(f"Message sent to topic {topic}")
このコードを実行すると MQTT ブローカーに接続され、推論停止のメッセージが publish されます。
取得データの確認
カメラで撮影した画像と推論結果のメタデータは webapp.py で指定したディレクトリに保存されています。
本記事のコードでは ./image
, ./meta
ディレクトリに保存されています。
物体検出結果の表示
カメラ画像と推論結果のメタデータを使用して物体検出の結果をカメラ画像に重ねて表示させます。
メタデータは base64 でデコードし、FlatBuffers によるデシリアライズを行う必要があります。
コードの作成には以下の記事を参考にしました。
また、コード作成にあたり、以下のライブラリを使用しています。
物体検出の結果をカメラ画像に重ねるコード
import base64
import json
import os
import sys
import cv2
import numpy as np
from src.data_deserializer.object_detection.object_detection_top import ObjectDetectionTop
from src.data_deserializer.object_detection.bounding_box import BoundingBox
from src.data_deserializer.object_detection.bounding_box_2d import BoundingBox2d
MATADATA_BASE_DIR = '/opt/camera/server/meta'
IMGDATA_BASE_DIR = '/opt/camera/server/image'
colors = [
(255, 0, 0),
(255, 128, 0),
(255, 255, 0),
(128, 255, 0),
(0, 255, 0),
(0, 255, 255),
(0, 128, 255),
(0, 0, 255),
(128, 0, 255),
(255, 0, 255),
(255, 0, 128),
(255, 255, 255),
(192, 192, 192),
(128, 128, 128),
(0, 0, 0),
]
def get_deserialize_data(serialize_data):
"""Get access information from yaml and generate ConsoleAccess client
Returns:
ConsoleAccessClient: CosoleAccessClient Class generated from access information.
"""
buf = {}
buf_decode = base64.b64decode(serialize_data)
ppl_out = ObjectDetectionTop.GetRootAsObjectDetectionTop(buf_decode, 0)
obj_data = ppl_out.Perception()
res_num = obj_data.ObjectDetectionListLength()
for i in range(res_num):
obj_list = obj_data.ObjectDetectionList(i)
union_type = obj_list.BoundingBoxType()
if union_type == BoundingBox.BoundingBox2d:
bbox_2d = BoundingBox2d()
bbox_2d.Init(obj_list.BoundingBox().Bytes, obj_list.BoundingBox().Pos)
buf[str(i + 1)] = {}
buf[str(i + 1)]['C'] = obj_list.ClassId()
buf[str(i + 1)]['P'] = obj_list.Score()
buf[str(i + 1)]['X'] = bbox_2d.Left()
buf[str(i + 1)]['Y'] = bbox_2d.Top()
buf[str(i + 1)]['x'] = bbox_2d.Right()
buf[str(i + 1)]['y'] = bbox_2d.Bottom()
print(obj_list.ClassId())
return buf
def draw_bounding_box(path_jpg, data):
img_np = cv2.imread(path_jpg)
for index in range(len(data)):
color = colors[index]
# 矩形の描画
cv2.rectangle(img_np, (deserialize_data[str(index+1)]['X'], deserialize_data[str(index+1)]['Y']), (deserialize_data[str(index+1)]['x'], deserialize_data[str(index+1)]['y']), color\
,thickness=1, lineType=cv2.LINE_4, shift=0)
# 文字列の描画 {class id} : {score}
class_text = str(deserialize_data[str(index+1)]['C']) + ': ' + str(deserialize_data[str(index+1)]['P'])
cv2.putText(img_np,
text=class_text,
org=(deserialize_data[str(index+1)]['X'], deserialize_data[str(index+1)]['Y']),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.3,
color=color,
thickness=1,
lineType=cv2.LINE_4)
if index == 1:
break
if len(data) > 0:
name, ext = os.path.splitext(path_jpg)
output_filename = f"{name}_detection.jpg"
cv2.imwrite(output_filename, img_np)
for filename in os.listdir(MATADATA_BASE_DIR):
if filename.endswith('.txt'):
base_name = os.path.splitext(filename)[0]
path_txt = os.path.join(MATADATA_BASE_DIR, filename)
path_jpg = os.path.join(IMGDATA_BASE_DIR, base_name + '.jpg')
try:
# メタデータから推論結果の読み出し
with open(path_txt, 'r', encoding='utf-8') as f:
data = json.load(f)
serialize_data = data['Inferences'][0]['O']
deserialize_data = get_deserialize_data(serialize_data)
print(deserialize_data)
# 画像ファイルの読み出し、bounding box の描画
draw_bounding_box(path_jpg, deserialize_data)
except Exception as e:
print(e)
以下を実行すると何かしらの物体を検出していた画像ファイルに bounding box が追加された画像ファイルが新たに保存されます。
おわりに
Local Console で AI モデルやファームウェアのデプロイ、ラベリングの設定をしてしまえばあとは Raspberry Pi のみで推論を行えるようになりました。
エッジデバイス AIH-IVRW2 を運用する際の参考にしていただければ幸いです。
おまけ
SSS から Local Console を Linux 環境で操作できるツールが提供されています。
これを使用すれば Raspberry Pi から推論の開始や停止が公式のやり方でできそうです。