1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AITRIOS Local Edition とラズパイで AIH-IVRW2 のスタンドアロン化

Posted at

はじめに

メカトラックス株式会社maeda01です。
以前 Local Console のセットアップオリジナルモデルの作成 の記事を作成しました。

実際にオリジナルのモデルを運用していくうえで推論実行時に Local Console を毎回起動し、推論を行っている間 PC を起動し続けなければならないのは少し不便です。
そこで本記事では Local Console ではなく Raspberry Pi からエッジデバイス AIH-IVRW2 を操作し、Local Console を使わず、スタンドアロンで推論結果を取得する方法を紹介します。

参考記事

本記事を作成するにあたり、以下の記事を参考にしました。
Windows PC で推論の開始と停止を行っている部分を Raspberry Pi に置き換えています。

構成

  • エッジデバイス AIH-IVRW2
    • メインチップファームウェアバージョン: 0400FEWS
    • センサチップファームウェアバージョン: 020300
    • AI モデル: 手話の検出モデル (オリジナルモデルの作成の記事 で作成)
  • Raspberry Pi 4 Model B
    • 使用 OS: Raspberry Pi OS (64bit) Bookworm

※ エッジデバイス AIH-IVRW2 は Wifi で、Raspberry Pi は有線 LAN で同一のネットワークに接続しています

Raspberry Pi での設定

MQTT ブローカー

Local Console を使用する場合は Local Console をインストールしている PC (Windows) の mosquitto を MQTT ブローカーに使用していました。
これを Raspberry Pi の中の MQTT ブローカーに置き換えます。

以下のコマンドで MQTT ブローカー (mosquitto) をインストールします。

sudo apt install -y mosquitto mosquitto-clients

必要に応じてポートの設定などを行ってください。
本記事ではデフォルトのポート 1883 を使用します。

HTTP サーバー

エッジデバイス AIH-IVRW2 から画像とメタデータを送信する際、HTTP メソッドが使用されます。データを受信するための HTTP サーバーを構築します。

以下のサイトを参考にパッケージのインストールと Python プログラムの作成を行ってください。
本記事ではパッケージのインストールの際に仮想環境を使用しています。

AIH-IVRW2 の設定

QR コードの書き換え

Local Console でデバイスの登録をする際 (この手順) に発行される QR コードは以下のような構成になっています。

AAIAAAAAAAAAAAAAAAAAAA==N=11;E=<ブローカーのアドレス>;H=<ブローカーのポート>;t=1;S=<Wifi の SSID>;P=<Wifi のパスワード>;T=pool.ntp.org;U1FS

Local Console で作成される QR コードではブローカーのアドレスは Local Console が入っている PC の IP アドレス、ポートは 1883 になっていました。
これを Raspberry Pi の MQTT ブローカーのアドレスとポートに置き換えます。

置き換えた後は再度 QR コードを発行します。
以下のようなサイトで QR コードが作成できます。

QR コードを再作成したらカメラの Reset / Mode Button を5秒程度押して QR コード読み取りモードに移行させ、QR コードを読み寄ります。

モード移行に関しては以下をご参照ください。

スタンドアロン化のテスト

実際に Raspberry Pi からメッセージを publish し、 Local Console を使用せずに推論の開始と停止を行います。

処理の流れとしては以下のようになります。

mermaid-diagram-2025-05-08-102711.png

HTTP サーバーの起動

Raspberry Pi で以下のコマンドで HTTP サーバーを起動します。
HTTP サーバーのポート番号は空いているポートで自由に設定してください。

venv_path/bin/uvicorn webapp:app_ins --reload --host raspberrypi.local --port 10066 --no-access-log

起動すると以下のようなログが出力されます。

INFO:     Will watch for changes in these directories: ['/opt/camera/server']
INFO:     Uvicorn running on http://raspberrypi.local:10066 (Press CTRL+C to quit)
INFO:     Started reloader process [2480] using StatReload
INFO:     Started server process [2482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

推論開始のメッセージの送信

以下のようなコードでメッセージを送信します。コードは 参考記事 をもとに作成しています。
送信トピック名の数字8桁の部分は何でもよいそうですが、念のため筆者の環境で Wireshark でキャプチャされた数字を使用しています。
また MQTT のブローカーや HTTP サーバーのアドレスは Raspberry Pi のアドレスを使用しています。

推論開始のメッセージを送信するコード例
start.py
#!/usr/bin/env python3

import json
import paho.mqtt.client as mqtt

# MQTT Brokerの設定
broker = "192.168.***.***"
port = 1883
topic = "v1/devices/me/rpc/request/26255775"

# 送信するメッセージ
message = {
	"method": "ModuleMethodCall",
	"params": {
		"moduleMethod": "StartUploadInferenceData",
		"moduleInstance": "backdoor-EA_Main",
		"params": {
			"CropHOffset": 0,
			"CropVOffset": 0,
			"CropHSize": 4056,
			"CropVSize": 3040,
			"Mode": 1,
			"UploadMethod": "HttpStorage",
			"StorageName": "http://192.168.***.***:10066",   # メタデータ送信先のhttpサーバ(適宜変更)
			"StorageSubDirectoryPath": "image",
			"UploadMethodIR": "HttpStorage",
    		"StorageNameIR": "http://192.168.***.***:10066", # 画像データ送信先のhttpサーバ(適宜変更)
			"UploadInterval": 30,
			"StorageSubDirectoryPathIR": "meta"
		}
	}
}
# メッセージをJSON形式に変換
message_json = json.dumps(message)

# MQTTクライアントの設定
client = mqtt.Client()

# MQTT Brokerへの接続
client.connect(broker, port)

# メッセージの送信
client.publish(topic, message_json)

# メッセージ送信後に接続を切断
client.disconnect()

print(f"Message sent to topic {topic}")

このコードを実行すると MQTT ブローカーに接続され、推論開始のメッセージが publish されます。
推論開始時に送信するメッセージのパラメータについては以下のドキュメントをご参照ください。

推論停止のメッセージの送信

以下のようなコードでメッセージを送信します。コードは同じく 参考記事 をもとに作成しています。
送信トピック名の数字8桁の部分はこちらも何でもよいそうですが、推論開始のメッセージを publish したときに使用した数字は使用できないようです。
こちらも同じく筆者の環境で Wireshark でキャプチャされた数字を使用しています。

推論停止のメッセージを送信するコード
stop.py
#!/usr/bin/env python3

import json
import paho.mqtt.client as mqtt

# MQTT Brokerの設定
broker = "192.168.***.***"
port = 1883
topic = "v1/devices/me/rpc/request/7290184"

# 送信するメッセージ
message = {
	"method": "ModuleMethodCall",
	"params": {
		"moduleMethod": "StopUploadInferenceData",
		"moduleInstance": "backdoor-EA_Main",
		"params": {}
	}
}

# メッセージをJSON形式に変換
message_json = json.dumps(message)

# MQTTクライアントの設定
client = mqtt.Client()

# MQTT Brokerへの接続
client.connect(broker, port)

# メッセージの送信
client.publish(topic, message_json)

# メッセージ送信後に接続を切断
client.disconnect()

print(f"Message sent to topic {topic}")

このコードを実行すると MQTT ブローカーに接続され、推論停止のメッセージが publish されます。

取得データの確認

カメラで撮影した画像と推論結果のメタデータは webapp.py で指定したディレクトリに保存されています。
本記事のコードでは ./image, ./meta ディレクトリに保存されています。

推論中.png

物体検出結果の表示

カメラ画像と推論結果のメタデータを使用して物体検出の結果をカメラ画像に重ねて表示させます。
メタデータは base64 でデコードし、FlatBuffers によるデシリアライズを行う必要があります。

コードの作成には以下の記事を参考にしました。

また、コード作成にあたり、以下のライブラリを使用しています。

物体検出の結果をカメラ画像に重ねるコード
deserialize_data.py
import base64
import json
import os
import sys

import cv2
import numpy as np

from src.data_deserializer.object_detection.object_detection_top import ObjectDetectionTop
from src.data_deserializer.object_detection.bounding_box import BoundingBox
from src.data_deserializer.object_detection.bounding_box_2d import BoundingBox2d

MATADATA_BASE_DIR = '/opt/camera/server/meta'
IMGDATA_BASE_DIR = '/opt/camera/server/image'

colors = [
(255, 0, 0),
(255, 128, 0),
(255, 255, 0),
(128, 255, 0),
(0, 255, 0),
(0, 255, 255),
(0, 128, 255),
(0, 0, 255),
(128, 0, 255),
(255, 0, 255),
(255, 0, 128),
(255, 255, 255),
(192, 192, 192),
(128, 128, 128),
(0, 0, 0),
]


def get_deserialize_data(serialize_data):
    """Get access information from yaml and generate ConsoleAccess client
    Returns:
        ConsoleAccessClient: CosoleAccessClient Class generated from access information.
    """
    buf = {}
    buf_decode = base64.b64decode(serialize_data)
    ppl_out = ObjectDetectionTop.GetRootAsObjectDetectionTop(buf_decode, 0)
    obj_data = ppl_out.Perception()
    res_num = obj_data.ObjectDetectionListLength()
    for i in range(res_num):
        obj_list = obj_data.ObjectDetectionList(i)
        union_type = obj_list.BoundingBoxType()
        if union_type == BoundingBox.BoundingBox2d:
            bbox_2d = BoundingBox2d()
            bbox_2d.Init(obj_list.BoundingBox().Bytes, obj_list.BoundingBox().Pos)
            buf[str(i + 1)] = {}
            buf[str(i + 1)]['C'] = obj_list.ClassId()
            buf[str(i + 1)]['P'] = obj_list.Score()
            buf[str(i + 1)]['X'] = bbox_2d.Left()
            buf[str(i + 1)]['Y'] = bbox_2d.Top()
            buf[str(i + 1)]['x'] = bbox_2d.Right()
            buf[str(i + 1)]['y'] = bbox_2d.Bottom()
            print(obj_list.ClassId())

    return buf

def draw_bounding_box(path_jpg, data):
    img_np = cv2.imread(path_jpg)
    for index in range(len(data)):
        color = colors[index]
        # 矩形の描画
        cv2.rectangle(img_np, (deserialize_data[str(index+1)]['X'], deserialize_data[str(index+1)]['Y']), (deserialize_data[str(index+1)]['x'], deserialize_data[str(index+1)]['y']), color\
                ,thickness=1, lineType=cv2.LINE_4, shift=0)
        # 文字列の描画 {class id} : {score}
        class_text = str(deserialize_data[str(index+1)]['C']) + ': ' + str(deserialize_data[str(index+1)]['P'])
        cv2.putText(img_np,
                text=class_text,
                org=(deserialize_data[str(index+1)]['X'], deserialize_data[str(index+1)]['Y']),
                fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                fontScale=0.3,
                color=color,
                thickness=1,
                lineType=cv2.LINE_4)
        if index == 1:
            break
    if len(data) > 0:
        name, ext = os.path.splitext(path_jpg)
        output_filename = f"{name}_detection.jpg"
        cv2.imwrite(output_filename, img_np)

for filename in os.listdir(MATADATA_BASE_DIR):
    if filename.endswith('.txt'):
        base_name = os.path.splitext(filename)[0]
        path_txt = os.path.join(MATADATA_BASE_DIR, filename)
        path_jpg = os.path.join(IMGDATA_BASE_DIR, base_name + '.jpg')
        try:
            # メタデータから推論結果の読み出し
            with open(path_txt, 'r', encoding='utf-8') as f:
                data = json.load(f)
                serialize_data = data['Inferences'][0]['O']

                deserialize_data = get_deserialize_data(serialize_data)
                print(deserialize_data)
                
            # 画像ファイルの読み出し、bounding box の描画
            draw_bounding_box(path_jpg, deserialize_data)
        except Exception as e:
            print(e)

以下を実行すると何かしらの物体を検出していた画像ファイルに bounding box が追加された画像ファイルが新たに保存されます。

20250507062151267_detection.jpg

おわりに

Local Console で AI モデルやファームウェアのデプロイ、ラベリングの設定をしてしまえばあとは Raspberry Pi のみで推論を行えるようになりました。
エッジデバイス AIH-IVRW2 を運用する際の参考にしていただければ幸いです。

おまけ

SSS から Local Console を Linux 環境で操作できるツールが提供されています。

これを使用すれば Raspberry Pi から推論の開始や停止が公式のやり方でできそうです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?