目的
(簡易)光学迷彩プログラム に引き続き、映像に映った人を透明化できる簡易的な光学迷彩1プログラムを作成する
やりたいのは攻殻機動隊的なイメージ・・
完成イメージ
今回は、前回作った光学迷彩プログラム(下図の(2))をON/OFFできるようにする(赤字部分)
とりあえずどんな感じになるか見たい場合はこちら
環境
前回 からの差分は赤字
今回の肝になるプログラムのON/OFFはM5StickCでMQTT over WiFi通信を使って実現する
HW
- PC:Lenovo ThinkBook 13s Gen 3
- カメラ:PC内蔵のインカメラ
- プログラムON/OFF:M5StickC
SW
- Python:v3.11.5
- Anaconda:conda v23.7.4
- 物体検出モデル:YOLO v8
- mosquitto:v2.0.15
作業メモ
環境整備
参考サイト
https://note.com/denyousya/n/n2ba35c82ea2e
PCでMQTT通信ができるようにする
-
mosquittoのインストール
参考サイト の手順でインストールを行う -
mosquittoのconfig設定
インストールフォルダ(デフォルトだとおそらくC:\Program Files\mosquitto
)にあるmosquitto.conf
に以下の内容を追記
listener 1883
protocol mqtt
allow_anonymous true
- MQTT Brokerサービスの動作確認
追記したconfigファイルを引数にして、mosquitto Brokerを起動
> .\mosquitto.exe -v -c ".\mosquitto.conf"
1718529920: mosquitto version 2.0.15 starting
1718529920: Config loaded from .\mosquitto.conf.
1718529920: Opening ipv6 listen socket on port 1883.
1718529920: Opening ipv4 listen socket on port 1883.
1718529920: mosquitto version 2.0.15 running
→ 正常に起動できることを確認
光学迷彩プログラムをON/OFFできるようにする
参考サイト
https://qiita.com/marumen/items/e2527248f5edd1adabcc
https://lang-ship.com/blog/work/m5stickc-button-class/
https://qiita.com/tchnkmr/items/b05f321fa315bbce4f77
M5StickCでMQTT通信を使ったプログラムON/OFFのトリガー送信
- M5StickCとPC間でのMQTT通信を実現し、プログラムON/OFFのトリガーを送信
- M5StickCのボタン押下2によりトリガー送信を行う
プログラム
以前投稿した記事 にてM5StickCでのMQTT通信を試していたため、そちらのプログラムをベースに修正
- プログラムON/OFFのトリガーはそれぞれ以下とした
- ON:
opticalcamouflage_start
- OFF:
camera_start
- ON:
- LCDにプログラムのON/OFF状態を表示(
optical camouflage ON or OFF
) - 一定時間操作しないとMQTTの接続が切れるので注意
#include <M5StickC.h>
#include <Wire.h>
#include <WiFi.h>
#include <PubSubClient.h>![uploading...0]()
/*---------------------------------------------
Global Data
-----------------------------------------------*/
/* -- 「*****」は環境に応じて設定必要 -- */
// For MQTT to raspberrypi
#define WLAN_SSID "*****"
#define WLAN_PASS "*****"
// MQTTの接続先のIP
const char *endpoint = "*****";
// MQTTのポート
const int port = 1883;
// デバイスID
char *deviceID = "*****";
// メッセージを知らせるトピック
char *pubTopic = "/pub/M5Stack";
// メッセージを待つトピック
char *subTopic = "/sub/M5Stack";
// 送信するメッセージ
char *pubMessage1 = "opticalcamouflage_start";
char *pubMessage2 = "camera_start";
/* -- 環境に応じて設定必要(ここまで) -- */
WiFiClient client;
PubSubClient mqttClient(client);
void connectMQTT();
void setup() {
M5.begin();
Wire.begin(0,26);
// WiFi設定
WiFi.begin(WLAN_SSID, WLAN_PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
}
// LCD表示の設定
M5.Lcd.setRotation(1);
M5.Lcd.setCursor(0, 0);
M5.Lcd.setTextSize(2);
M5.Lcd.print("optical camouflage OFF");
// Keep Alive設定
// "Client xxxxx has exceeded timeout, disconnecting." 暫定対策
mqttClient.setKeepAlive(60);
mqttClient.setSocketTimeout(60);
// PCへトリガー送信(MQTT)
mqttClient.setServer(endpoint, port);
}
void loop() {
connectMQTT();
delay(100);
}
void connectMQTT() {
// Server側(PC)と接続できるまで繰り返し
if (!mqttClient.connected()) {
reConnect();
}
else{
int qos = 0;
// ボタン状態を更新
M5.update();
if(M5.BtnA.wasReleased()) {
// メッセージ送信(光学迷彩開始)
Serial.println("BtnA.wasReleased");
M5.Lcd.fillScreen(BLACK);
M5.Lcd.setCursor(0, 0);
M5.Lcd.print("optical camouflage ON");
mqttClient.publish(pubTopic, pubMessage1);
mqttClient.subscribe(subTopic, qos);
}
else if(M5.BtnB.wasReleased()) {
// メッセージ送信(通常Webカメラ開始)
Serial.println("BtnB.wasReleased");
M5.Lcd.fillScreen(BLACK);
M5.Lcd.setCursor(0, 0);
M5.Lcd.print("optical camouflage OFF");
mqttClient.publish(pubTopic, pubMessage2);
mqttClient.subscribe(subTopic, qos);
}
}
}
void reConnect() {
while (!mqttClient.connected()) {
if (mqttClient.connect(deviceID)) {
Serial.println("reConnect Success");
}
else {
delay(5000);
}
}
}
光学迷彩プログラムをON/OFFさせる
ここまでに準備した環境を使って、前回作った光学迷彩プログラム をON/OFFできるようにする
プログラム
- mqtt_opticalcamouflage.py:メインプログラム
- ここから他クラスのメソッドを読んで処理を実現
- opticalcamouflage.py:光学迷彩処理クラス
- mqtt.py:MQTT通信制御クラス
#!/usr/bin/env python
# coding: utf-8
# In[1]:
#!/usr/bin/python3
import random
from paho.mqtt import client as mqtt_client
import opticalcamouflage
# In[2]:
# brokerにはPCのIPアドレスを指定
broker = 'xxx.xxx.xxx.xxx'
port = 1883
# M5StickCで指定したトピックと同じトピックを指定
topic = "/pub/M5Stack"
# client IDをランダム生成
client_id = f'python-mqtt-{random.randint(0, 100)}'
if __name__ == '__main__':
instance = opticalcamouflage.opticalcamouflage()
instance.pre_proc()
instance.oc_proc(broker, port, topic, client_id)
#!/usr/bin/env python
# coding: utf-8
# In[1]:
from ultralytics import YOLO
import cv2
import numpy as np
import mqtt
import threading
# In[18]:
class opticalcamouflage():
model = YOLO("yolov8n-seg.pt")
instance = mqtt.mqtt()
def __init__(self):
self.cap = cv2.VideoCapture(0)
self.img_bak = []
def pre_proc(self):
# (初期処理)Webカメラから背景画像を取得
try:
self.cap.isOpened()
while True:
ret, self.img_bak = self.cap.read()
cv2.imshow("background",self.img_bak)
# 背景画像の再取得が不要な場合、Escキーで抜ける
if cv2.waitKey(1) == 27:
cv2.destroyAllWindows()
break
except:
# Webカメラが起動できない場合の例外処理
cv2.destroyAllWindows()
self.cap.release()
print("Web camera can't open")
def oc_proc(self, broker, port, topic, client_id):
# 光学迷彩プログラムON/OFF判定
client = opticalcamouflage.instance.connect_mqtt(broker, port, client_id)
opticalcamouflage.instance.subscribe(client, topic)
thread = threading.Thread(target=client.loop_forever)
thread.start()
while True:
# Webカメラ映像取得
ret, img_main = self.cap.read()
results = opticalcamouflage.model(img_main)
# 取得した映像の検出結果(の一部)を保持
for r in results:
boxes_xyxy = r.boxes.numpy().xyxy
# [人(person)が映像に映っている and 光学迷彩プログラムON] の場合に以下の処理を実施
if (r.boxes.numpy().cls.size > 0 and r.boxes.numpy().cls[0] == 0) and opticalcamouflage.instance.get_flg() == True:
# 人をmask(消す)するための画像生成
# https://docs.ultralytics.com/guides/isolating-segmentation-objects/#recipe-walk-through
img_main_mask = np.zeros(r.boxes.numpy().orig_shape, np.uint8)
contour = r[0].masks.xy.pop()
contour = contour.astype(np.int32)
contour = contour.reshape(-1, 1, 2)
# 取得した輪郭からmask画像生成
_ = cv2.drawContours(img_main_mask,
[contour],
-1,
(255, 255, 255),
cv2.FILLED)
img_bak_mask = cv2.bitwise_not(img_main_mask)
# mask画像はグレースケールのため、BGRに変換
img_main_mask = cv2.cvtColor(img_main_mask, cv2.COLOR_GRAY2BGR)
img_bak_mask = cv2.cvtColor(img_bak_mask, cv2.COLOR_GRAY2BGR)
# Webカメラ画像をmask(人を消す)する
img_main_masked = cv2.bitwise_or(img_main, img_main_mask)
img_bak_masked = cv2.bitwise_or(self.img_bak, img_bak_mask)
img_masked = cv2.bitwise_and(img_main_masked, img_bak_masked)
# お試しにmaskした範囲を見える化
# cv2.rectangle(img_masked, (int(boxes_xyxy[0][0]), int(boxes_xyxy[0][1])), (int(boxes_xyxy[0][2]), int(boxes_xyxy[0][3])), (255, 255, 0))
_ = cv2.drawContours(img_masked,
[contour],
-1,
(255, 255, 0),
3)
img_last = img_masked.copy()
# 人(person)が映像に映っていない場合はWebカメラの画像をそのまま映す
else:
img_last = img_main.copy()
# mask済み映像表示
cv2.imshow("Web camera(masked)",img_last)
# 処理を終了する場合は、Escキーで抜ける
if cv2.waitKey(1) == 27:
break
cv2.destroyAllWindows()
self.cap.release()
#!/usr/bin/env python
# coding: utf-8
# In[1]:
from paho.mqtt import client as mqtt_client
# In[ ]:
class mqtt():
def __init__(self):
self.opt_flg = False
def connect_mqtt(self, broker, port, client_id) -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
sys.exit(rc)
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, client_id)
client.on_connect = on_connect
client.connect(broker, port, 30)
return client
def subscribe(self, client: mqtt_client, topic):
def on_message(client, userdata, msg):
# 光学迷彩プログラム開始
if "opticalcamouflage_start" in msg.payload.decode():
print("Opticalcamouflage Program Start")
self.opt_flg = True
elif "camera_start" in msg.payload.decode():
print("Normal Web camera")
self.opt_flg = False
client.subscribe(topic)
client.on_message = on_message
デモ
実際に動作させたときのデモ動画3が以下になる
- 準備として、
MQTT broker
を起動する(環境整備 参照) -
mqtt_opticalcamouflage.py
を起動し、背景画像を取得 →[Esc]
キーを押したタイミングで画像取得 - 再度映像を取得し、光学迷彩プログラムはOFFの状態(カメラ映像そのまま)で映像を画面に表示
- M5StickCのボタン押下によりプログラムON/OFFを何度か切り替え
- プログラムONの場合は、人が透過した部分には2で取得した背景画像を表示
- 終了したい場合は、
[Esc]
キーで抜ける
まとめ
目標達成!
(簡易)光学迷彩プログラム に引き続き、映像に映った人を透明化できる簡易的な光学迷彩プログラムを作成する
やりたいのは攻殻機動隊的なイメージ・・
ただし、課題は残っており、改善の余地あり
- 複数人の透過には未対応
- 一定時間操作しないと、M5StickCとPC間のMQTT通信が切れてしまう