はじめに
メカトラックス株式会社のmaeda01です。
先日 Raspberry Pi 財団から AI Camera が発売されました。
本記事ではこの AI Camera をセットアップして実際にアプリケーションを作成して動かしてみます。
Raspberry Pi AI Camera とは
The Raspberry Pi AI Camera uses the Sony IMX500 imaging sensor to provide low-latency, high-performance AI capabilities to any camera application. Tight integration with Raspberry Pi’s camera software stack allows users to deploy their own neural network models with minimal effort.
Raspberry Pi 公式ドキュメント から引用
AI Camera には SONY 社の IMX500 というイメージセンサが搭載されています。SONY 独自のプロセッサやAI モデルを書き込むためのメモリが搭載されており、高性能なプロセッサや外部メモリーを必要とすることなく、エッジAIシステムを実現させることが可能になるそうです。
IMX500 の詳細については以下のサイトをご参照ください。
このイメージセンサ IMX500 のおかげで Raspberry Pi AI Camera 単体でも高性能な AI 処理が可能になります。
AI Camera のその他の詳細なスペックは以下のデータシートをご参照ください。
使用するもの
Raspberry Pi 4 model B: 1台
micro SD カード: 1枚
AC アダプタ (5V/3A): 1台
Raspberry Pi AI Camera: 1台
FFC (Raspberry Pi 4 向け): 1本
セットアップ
Raspberry Pi 公式ドキュメント に従ってセットアップをしていきます。
パッケージのインストール
IMX500 を扱うためのファームウェアの一式をインストールします。
以下のコマンドでインストールできます。
sudo apt install imx500-all
このコマンドではファームウェアのインストールとともにサンプルの学習済みモデルもダウンロードされます。
Raspberry Pi 4 の場合
/boot/firmware/config.txt を2箇所修正します。
・dtoverlay=imx500
という記述を追加して IMX500 のドライバを指定してロードさせます。
・camera_auto_detect=0
と修正してカメラの自動検出を無効化します。
略)
# Automatically load overlays for detected cameras
camera_auto_detect=0
略)
[all]
dtoverlay=imx500
Raspberry Pi 5 を使用する場合、この修正は必要ありません。
IMX500 のドライバのロード
IMX500 のドライバをロードさせるため、再起動をします。
sudo reboot
AI Camera の接続
Raspberry Pi 4 のカメラコネクタへ FFC を接続します。
コネクタのツメを上に上げて FFC を挿入、奥まで入ったらツメを下げて固定します。
向きは画像のように接続します。
AI Camera のカメラコネクタへ FFC を接続します。
Raspberry Pi 側と同じくコネクタのツメを上に上げて FFC を挿入、奥まで入ったらツメを下げて固定します。
向きは画像のように接続します。
公式サンプルの実行
Raspberry Pi 公式の picamera2 リポジトリにサンプルコードがあるので clone してローカルにダウンロードします。
これらのコードを実行する際に必要となるパッケージをインストールします。
sudo apt install python3-opencv python3-munkres
今回は物体検出のタスクのサンプルを実行します。
以下のコマンドで実行できます。
python3 imx500_object_detection_demo.py --model /usr/share/imx500-models/imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk
コードの実行に成功するとウィンドウが開き、検出された物体の周りに bounding box が表示されます。
インストールされた picamera2 のバージョンによってはサンプルコード実行時にエラーが発生する場合があります。
ValueError: tensor info length 260 does not match expected size 708
この場合は仮想環境を作成して 仮想環境内に picamera2 のバージョンを指定してインストールします。
v0.3.23 で正常に動作することは確認済みです。
python3 -m venv my_env --system-site-packages
pip install picamera2==v0.3.23
その他の骨格検出や分類などのサンプルコードの実行例は以下を参照してください。
オリジナルモデルのコンパイル
自分で学習させたモデルを AI Camera で使用する際にはモデルを量子化して圧縮し、IMX500 で使用可能な形式 (.rpk) に変換する必要があります。
YOLOv8 を使用したモデル
YOLO (You Only Look Once) については以下をご参照ください。
YOLOv8 を使用して学習させたモデルの変換を行います。
YOLOv8 モデルを AI Camera 用のモデルに変換する手順は Ultralytics のドキュメントに記載されています。これに従って変換をしていきます。
今回は画像からじゃんけんの手 (グー、チョキ、パー) を識別するモデルを使用します。
データセットと学習用コードは kaggle の Notebook からお借りしました。
仮想環境中に ultralytics モジュールをインストールします。
pip install ultralytics
ultralytics モジュールと picamera2 モジュールが両方インストールされていると AI Camera のサンプルコードが動かない可能性があります。その場合は別の仮想環境中に ultralytics をインストールしてください。
変換は WSL などの Linux 環境でも実行可能です。
仮想環境の中で以下の Python コードを実行します。
from ultralytics import YOLO
model = YOLO("best.pt")
model.export(format="imx")
必要なパッケージがインストールされ、モデルの変換が開始されます。
変換が完了すると best_imx_model
というディレクトリが作成されます。
このディレクトリには packerOut.zip という圧縮ファイルがあるのでこれを使用して AI Camera で使用可能なモデルを作成します。
まずは必要なパッケージをインストールします。
sudo apt install imx500-tools
以下のコマンドでモデルを作成します。
imx500-package -i packerOut.zip -o ./
このコマンドを実行すると指定したディレクトリ (上のコマンドではカレントディレクトリ) に network.rpk というモデルが作成されます。
TensorFlow, PyTorch を使用したモデル
TensorFlow や PyTorch を使用して学習させたモデルの圧縮と変換の手順は公式のドキュメントに記載されています。
※ imxconv-tf
と imxconv-pt
は排他なので両方使用する場合は仮想環境を2つ作成し、それぞれの仮想環境にインストールしてください。
オリジナルモデルでのテスト
前章で作成したオリジナルモデル (じゃんけんの手の判別モデル) を AI Camera で実際に動かしてみます。
公式の物体検出を行うサンプルコードでは引数で使用するモデルやラベルを指定できます。もしくはコードを修正してコード中で使用するモデルを指定してください。
python3 imx500_object_detection_demo.py --model best_imx_model/network.rpk --fps 25 --bbox-order xy --bbox-normalization --threshold 0.3 --labels best__imx_model/labels.txt
モデルによってはサンプルコードそのままでは bbox の表示位置が x軸とy軸入れ替わるような表示になる場合があります。その場合は --bbox-order
で xy
を指定してください。
その他オプションは --help
オプションで確認できます。
このコマンドを実行すると以下のようなウィンドウが表示され、じゃんけんの手の判別が行われます。
Raspberry Pi 4 でもフレームレートは 30fps 弱出ているのと、もとのモデル (.pt) を使用した場合から認識精度は変わっていないように感じます。
デモゲームの作成
作成したモデルと AI Camera を使用してデモゲーム (Web アプリケーション) を作成してみます。
内容はこちらの記事で紹介した AI Kit を使用したデモゲームを AI Camera で動かせるようにするというものになります。
参考にしたプログラム
基本的な構造は AI Kit の記事 のデモと同じです。
フレームの取得 → 物体の検出 → bbox などの描画 → Web ブラウザでストリーミングという流れで処理をしていきます。
物体検出のサンプルコードと AI Kit の記事 でも使用した Temple Run を物体検出を使用して操作するコードをベースに作成します。
物体検出のサンプルコード
Temple Run を物体検出を使用して操作するコード
必要なパッケージのインストール
Temple Run のアプリケーションは Flask フレームワークを使用するので、仮想環境中に Flask をインストールします。
pip install flask
コードの作成
前述の2つのサンプルコードをベースに web 上で Temple Run を実行するコードを作成します。
動作としては
- AI Camera でじゃんけんの手を判別
- パーの手を検出したらパーの手の bounding box の中心の座標を取得
- 中心の座標の位置でキーボードの十字キーの操作
となります。
以下が作成したコードになります。
ゲームのコード
import argparse
import sys
import time
from functools import lru_cache
import cv2
import libevdev
import numpy as np
from flask import Flask, render_template, Response
from picamera2 import MappedArray, Picamera2
from picamera2.devices import IMX500
from picamera2.devices.imx500 import (NetworkIntrinsics,
postprocess_nanodet_detection)
app = Flask(__name__)
device = libevdev.Device()
device.name = 'virtual keybord'
device.enable(libevdev.EV_KEY.KEY_UP)
device.enable(libevdev.EV_KEY.KEY_DOWN)
device.enable(libevdev.EV_KEY.KEY_RIGHT)
device.enable(libevdev.EV_KEY.KEY_LEFT)
device.enable(libevdev.EV_KEY.KEY_SPACE)
uinput = device.create_uinput_device()
last_detections = []
model_path = "best_imx_model/network.rpk"
label_path = "best_imx_model/labels.txt"
skip_frames = 10
scale = 1
width_frame = 640
limit_up = 150 // scale
limit_down = 350 // scale
limit_left = 250 // scale
limit_right = 400 //scale
class Detection:
def __init__(self, coords, category, conf, metadata, model, picam):
"""Create a Detection object, recording the bounding box, category and confidence."""
self.category = category
self.conf = conf
self.box = model.convert_inference_coords(coords, metadata, picam)
class AI_Camera:
def __init__(self):
self.args = self.get_args()
fps = 0.0
self.frame_array = None
self.wait_time = 0.01
self.start_time = 0.0
self.end_time = 0.0
self.total_frame = 0
# This must be called before instantiation of Picamera2
self.imx500 = IMX500(self.args.model)
self.intrinsics = self.imx500.network_intrinsics
if not self.intrinsics:
self.intrinsics = NetworkIntrinsics()
self.intrinsics.task = "object detection"
elif self.intrinsics.task != "object detection":
print("Network is not an object detection task", file=sys.stderr)
exit()
# Override intrinsics from args
for key, value in vars(self.args).items():
if key == 'labels' and value is not None:
with open(value, 'r') as f:
self.intrinsics.labels = f.read().splitlines()
elif hasattr(self.intrinsics, key) and value is not None:
setattr(self.intrinsics, key, value)
# Defaults
if self.intrinsics.labels is None:
with open(label_path, "r") as f:
self.intrinsics.labels = f.read().splitlines()
self.intrinsics.update_with_defaults()
if self.args.print_intrinsics:
print(self.intrinsics)
exit()
self.picam2 = Picamera2(self.imx500.camera_num)
config = self.picam2.create_preview_configuration(controls={"FrameRate": self.intrinsics.inference_rate}, buffer_count=12)
self.imx500.show_network_fw_progress_bar()
self.picam2.start(config, show_preview=False)
if self.intrinsics.preserve_aspect_ratio:
self.imx500.set_auto_aspect_ratio()
self.last_results = None
self.picam2.pre_callback = self.draw_detections
def parse_detections(self, metadata: dict):
"""Parse the output tensor into a number of detected objects, scaled to the ISP output."""
global last_detections
bbox_normalization = self.intrinsics.bbox_normalization
bbox_order = self.args.bbox_order
threshold = self.args.threshold
iou = self.args.iou
max_detections = self.args.max_detections
np_outputs = self.imx500.get_outputs(metadata, add_batch=True)
input_w, input_h = self.imx500.get_input_size()
if np_outputs is None:
return last_detections
if self.intrinsics.postprocess == "nanodet":
boxes, scores, classes = \
postprocess_nanodet_detection(outputs=np_outputs[0], conf=threshold, iou_thres=iou,
max_out_dets=max_detections)[0]
from picamera2.devices.imx500.postprocess import scale_boxes
boxes = scale_boxes(boxes, 1, 1, input_h, input_w, False, False)
else:
boxes, scores, classes = np_outputs[0][0], np_outputs[1][0], np_outputs[2][0]
if not bbox_normalization:
boxes = boxes / input_h
if bbox_order == "xy":
boxes = boxes[:, [1, 0, 3, 2]]
boxes = np.array_split(boxes, 4, axis=1)
boxes = zip(*boxes)
last_detections = [
Detection(box, category, score, metadata, self.imx500, self.picam2)
for box, score, category in zip(boxes, scores, classes)
if score > threshold
]
return last_detections
@lru_cache
def get_labels(self):
labels = self.intrinsics.labels
if self.intrinsics.ignore_dash_labels:
labels = [label for label in labels if label and label != "-"]
return labels
def draw_detections(self, request, stream="main"):
"""Draw the detections for this request onto the ISP output."""
detections = self.last_results
if detections is None:
return
labels = self.get_labels()
with MappedArray(request, stream) as m:
for detection in detections:
x, y, w, h = detection.box
label = f"{labels[int(detection.category)]} ({detection.conf:.2f})"
hand_sign = label.split("(")[0].strip()
# Calculate text size and position
(text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
text_x = x + 5
text_y = y + 15
# Create a copy of the array to draw the background with opacity
overlay = m.array.copy()
# Draw the background rectangle on the overlay
cv2.rectangle(overlay,
(text_x, text_y - text_height),
(text_x + text_width, text_y + baseline),
(255, 255, 255), # Background color (white)
cv2.FILLED)
alpha = 0.30
cv2.addWeighted(overlay, alpha, m.array, 1 - alpha, 0, m.array)
# Draw text on top of the background
cv2.putText(m.array, label, (text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# Draw detection box
cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0), thickness=2)
if hand_sign == "Paper":
# Draw circle on center of bbox
center_x = x + w // 2
center_y = y + h // 2
cv2.circle(m.array, (center_x, center_y), 5, (0, 255, 0), -1)
if center_y < limit_up:
self.gen_action('up')
elif center_y > limit_down:
self.gen_action('down')
elif center_x < limit_left:
self.gen_action('left')
elif center_x > limit_right:
self.gen_action('right')
else: pass
if self.intrinsics.preserve_aspect_ratio:
b_x, b_y, b_w, b_h = self.imx500.get_roi_scaled(request)
color = (255, 0, 0) # red
cv2.putText(m.array, "ROI", (b_x + 5, b_y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
cv2.rectangle(m.array, (b_x, b_y), (b_x + b_w, b_y + b_h), (255, 0, 0, 0))
cv2.line(m.array, (0, limit_up), (width_frame, limit_up), (255, 255, 255), 2)
cv2.line(m.array, (0, limit_down), (width_frame, limit_down), (255, 255, 255), 2)
cv2.line(m.array, (limit_left, limit_up), (limit_left, limit_down), (255, 255, 255), 2)
cv2.line(m.array, (limit_right, limit_up), (limit_right, limit_down), (255, 255, 255), 2)
self.frame_array = m.array
def gen_action(self, action):
if action == 'up':
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_UP, 1), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
time.sleep(0.012)
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_UP, 0), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
elif action == 'down':
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_DOWN, 1), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
time.sleep(0.012)
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_DOWN, 0), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
elif action == 'right':
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_RIGHT, 1), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
time.sleep(0.012)
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_RIGHT, 0), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
elif action == 'left':
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_LEFT, 1), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
time.sleep(0.012)
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_LEFT, 0), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
elif action == 'space':
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_SPACE, 1), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
time.sleep(0.012)
uinput.send_events([libevdev.InputEvent(libevdev.EV_KEY.KEY_SPACE, 0), libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)])
else:
pass
def get_args(self):
parser = argparse.ArgumentParser()
parser.add_argument("--model", type=str, help="Path of the model",
default=model_path)
parser.add_argument("--fps", type=int, help="Frames per second")
parser.add_argument("--bbox-normalization", action=argparse.BooleanOptionalAction, help="Normalize bbox")
parser.add_argument("--bbox-order", choices=["yx", "xy"], default="xy",
help="Set bbox order yx -> (y0, x0, y1, x1) xy -> (x0, y0, x1, y1)")
parser.add_argument("--threshold", type=float, default=0.3, help="Detection threshold")
parser.add_argument("--iou", type=float, default=0.65, help="Set iou threshold")
parser.add_argument("--max-detections", type=int, default=10, help="Set max detections")
parser.add_argument("--ignore-dash-labels", action=argparse.BooleanOptionalAction, help="Remove '-' labels ")
parser.add_argument("--postprocess", choices=["", "nanodet"],
default=None, help="Run post process of type")
parser.add_argument("-r", "--preserve-aspect-ratio", action=argparse.BooleanOptionalAction,
help="preserve the pixel aspect ratio of the input tensor")
parser.add_argument("--labels", type=str,
help="Path to the labels file")
parser.add_argument("--print-intrinsics", action="store_true",
help="Print JSON network_intrinsics then exit")
return parser.parse_args()
def main_stream(self):
self.last_results = self.parse_detections(self.picam2.capture_metadata())
while True:
self.start_time = time.time()
if (self.frame_array is not None) and (self.total_frame % skip_frames == 0):
self.last_results = self.parse_detections(self.picam2.capture_metadata())
frame_rgb_array = cv2.cvtColor(self.frame_array, cv2.COLOR_BGR2RGB)
_, buffer = cv2.imencode('.jpg', frame_rgb_array)
frame = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
self.total_frame += 1
self.end_time = time.time()
# Home Page
@app.route('/')
def index():
return render_template('index.html')
# Video streaming route
@app.route('/video_feed')
def video_feed():
cam_obj = AI_Camera()
return Response(cam_obj.main_stream(), mimetype='multipart/x-mixed-replace; boundary=frame')
if __name__ == "__main__":
app.run(debug=True)
ベースの 2つのコードからの主な変更点は以下の通りです。
-
変数
model_path
,label_path
で使用したいモデルとラベルを指定するとそれらがデフォルトで指定されます -
bookworm では display system は Wayland がデフォルトで使用されるようになっており、pyautogui モジュールが使用できません。そのため libevdev モジュールを使用して矢印キーのシミュレートを行っています
-
毎フレーム物体検出を行い出力すると Web アプリケーション内でのカメラの遅延が発生するため、5フレームに1回のみ物体検出と出力を行っています
このプログラム app.py を仮想環境内で実行し、ブラウザから 127.0.0.1:5000 にアクセスするとゲーム画面に入ります。
ゲームが開始したら手をパーの形にして動かし、キャラクターを操作します。
おわりに
Raspberry Pi 5 以外のシリーズでも AI Camera を使用することで AI 処理が可能になりました。AI Kit と並び手軽に扱えるので参考にしていただけると幸いです。
物体検出などの処理を行うと Raspberry Pi 4 であっても発熱はあるので考慮する必要があります。
クーラーや放熱シートなどで放熱対策を行うことをおすすめします。
おまけ
AITRIOS というソニーセミコンダクタソリューションズが提供しているワンストッププラットフォームがございます。
こちらは インテリジェントビジョンセンサーをはじめとするイメージセンサーによるエッジAI技術を活用したソリューション開発を加速させるプラットフォーム とのことです。(ソニーのブログから引用)
AITRIOS から AI Camera のモデル作成用の専用ソフトウェアが有償で提供されています。
このソフトウェアを使用するとノーコードでモデルが作成できるようです。
AITRIOS ホームページ
AITRIOS のドキュメント