0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MiniPC(2台目)購入したのでYOLOのエッジ端末としてMQTTで送信してみる。

Last updated at Posted at 2025-10-12

MiniPC(Ubuntu)にYOLOv8を導入してエッジAI推論環境を構築する手順【Node-RED/MQTT連携付き】

🧩 はじめに

MiniPCを使って、YOLOv8による物体検出をローカルで行い、
結果をMQTT経由でNode-REDダッシュボードへ送信・可視化するまでの手順をまとめました。

この構成では以下のように役割を分けています:

機器 役割
MiniPC#2 エッジ推論端末(YOLO実行)
MiniPC#1 MQTTブローカー(Mosquitto)+ Node-RED可視化サーバー

環境:

  • Ubuntu 22.04 LTS
  • Python 3.12
  • YOLOv8 (ultralytics)
  • MQTT (paho-mqtt)
  • Node-RED (MiniPC#1)
  • MiniPC#1はLAN内でIPアドレスは192.168.0.101想定

🏗️ 仮想環境の作成とPython準備

bash
sudo apt update
sudo apt install python3-venv v4l-utils -y

mkdir -p ~/yolo-edge && cd ~/yolo-edge
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip

📦 必要パッケージのインストール

bash
pip install -U ultralytics opencv-python paho-mqtt numpy

💡 HDMI接続で画面表示する場合は opencv-python
ヘッドレス運用(GUIなし)なら opencv-python-headless に変更可。


🎥 カメラテスト

bash
v4l2-ctl --list-devices

python - <<'PY'
import cv2
cap = cv2.VideoCapture(0)
print("open:", cap.isOpened())
cap.release()
PY

open: True になればカメラ認識OKです。


🧠 YOLO 単体動作テスト

bash
yolo task=detect mode=predict model=yolov8n.pt source=0 show=True

リアルタイム映像に検出枠が表示されれば成功。


🌐 MQTT送信スクリプト yolo_mqtt_edge.py

~/yolo-edge/yolo_mqtt_edge.py を作成して以下を保存します👇

python
import os, time, json, base64
import cv2
import numpy as np
from ultralytics import YOLO
import paho.mqtt.client as mqtt

# ========= 設定 =========
BROKER = os.getenv("BROKER", "192.168.0.101")   # ← MiniPC#1 のIPアドレス
PORT   = int(os.getenv("MQTT_PORT", "1883"))
TOPIC_DET    = "trackbay/detections"
TOPIC_ZONES  = "trackbay/zones"
TOPIC_IMAGE  = "trackbay/image"

SEND_IMAGE   = os.getenv("SEND_IMAGE", "false").lower() == "true"
CAM_INDEX    = 0
MODEL_PATH   = "yolov8n.pt"
CONF_THRES   = 0.4
TARGET_CLASS = "truck"      # 検出対象(例:'truck','car','person')
IMG_W, IMG_H = 1280, 720
FPS_LIMIT    = 0            # 0で制限なし

# ========= 初期化 =========
model = YOLO(MODEL_PATH)
names = getattr(model.model, "names", {})

cli = mqtt.Client()
cli.connect(BROKER, PORT, 60)
cli.loop_start()

cap = cv2.VideoCapture(CAM_INDEX, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH,  IMG_W)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, IMG_H)

def zone_of_point(xc, yc, w, h):
    if xc < w/2 and yc < h/2: return 1
    if xc >= w/2 and yc < h/2: return 2
    if xc < w/2 and yc >= h/2: return 3
    return 4

def maybe_sleep(last_t):
    if FPS_LIMIT <= 0: return time.time()
    elapsed = time.time() - last_t
    target = 1.0 / FPS_LIMIT
    if elapsed < target:
        time.sleep(target - elapsed)
    return time.time()

last_t = 0.0
try:
    while True:
        last_t = maybe_sleep(last_t)
        ok, frame = cap.read()
        if not ok:
            time.sleep(0.05)
            continue

        h, w = frame.shape[:2]
        res = model.predict(source=frame, conf=CONF_THRES, imgsz=640, verbose=False)
        zones = {1: False, 2: False, 3: False, 4: False}
        dets  = []

        for r in res:
            if not hasattr(r, "boxes") or r.boxes is None:
                continue
            for b in r.boxes:
                cls_id = int(b.cls[0])
                cls_name = names.get(cls_id, str(cls_id))
                if TARGET_CLASS and cls_name != TARGET_CLASS:
                    continue
                x1,y1,x2,y2 = b.xyxy[0].cpu().numpy().tolist()
                xc, yc = (x1+x2)/2, (y1+y2)/2
                z = zone_of_point(xc, yc, w, h)
                zones[z] = True
                dets.append({
                    "class": cls_name,
                    "conf": round(float(b.conf[0]), 3),
                    "bbox": [round(x1,1), round(y1,1), round(x2,1), round(y2,1)],
                    "zone": z
                })

        ts = int(time.time()*1000)
        cli.publish(TOPIC_DET, json.dumps({"ts": ts, "count": len(dets), "detections": dets}))
        cli.publish(TOPIC_ZONES, json.dumps({"ts": ts, "zones": {str(i): zones[i] for i in range(1,5)}}))

        if SEND_IMAGE:
            ok_jpg, jpg = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 60])
            if ok_jpg:
                b64 = base64.b64encode(jpg.tobytes()).decode("utf-8")
                cli.publish(TOPIC_IMAGE, json.dumps({"ts": ts, "image_b64": b64}))
except KeyboardInterrupt:
    pass
finally:
    cap.release()
    cli.loop_stop()

🚀 実行(MiniPC#2 側)

bash
cd ~/yolo-edge
source .venv/bin/activate
BROKER=192.168.0.101 python yolo_mqtt_edge.py

📊 Node-RED 側(MiniPC#1)

Node-REDに以下のトピックを購読するフローを作成します。

トピック 内容
trackbay/zones 4分割ゾーンごとの「満/空」状態
trackbay/detections 検出クラス・信頼度・座標
trackbay/image base64画像(任意)

簡易的には mqtt in → json → ui_template で表示可能。
詳細なUI例は別記事でも紹介予定。


🧠 動作イメージ

  1. MiniPC#2 のカメラが物体を検出
  2. YOLOv8 がクラス・座標・信頼度をJSON化
  3. MQTTで MiniPC#1 のMosquittoへ送信
  4. Node-RED が受信してリアルタイム可視化

🔋 おまけ:自動起動設定(systemd)

bash
sudo bash -c 'cat >/etc/systemd/system/yolo-edge@.service' <<'SERVICE'
[Unit]
Description=YOLO Edge Inference (MQTT)
After=network-online.target
Wants=network-online.target

[Service]
User=%i
WorkingDirectory=/home/%i/yolo-edge
Environment="PATH=/home/%i/yolo-edge/.venv/bin:/usr/bin"
Environment=BROKER=192.168.0.101
ExecStart=/home/%i/yolo-edge/.venv/bin/python /home/%i/yolo-edge/yolo_mqtt_edge.py
Restart=always
RestartSec=3

[Install]
WantedBy=multi-user.target
SERVICE

sudo systemctl enable yolo-edge@${USER}.service
sudo systemctl start yolo-edge@${USER}.service

✨ まとめ

  • YOLOv8をMiniPCに導入し、リアルタイム推論を実現
  • MQTTで軽量に結果を転送
  • Node-REDダッシュボードで簡易可視化

今後はOpenVINOやONNXで最適化、Edge推論の省電力化などにも挑戦予定です。


🕒 最終更新: 2025-10-10

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?