目次
- 初めに
- 使ったもの
- システム図
- ラズベリーパイ側のハードウェア
- ラズベリーパイ側のソフトウェア
- うんこ検出ロジックの詳細解説
- ラズベリーパイ側のソフト動作
- Arduino側のハードウェア
- Arduino側のソフトウェア
- Arduino側のソフト動作
- さいごに
初めに
いきなりですが――うちの愛犬、自分(犬)のうんちを食べます。
これだけ聞くと衝撃ですが、若い犬には割とある習性らしいんです。
母犬が子犬の排泄物を食べて巣を清潔に保つ習性の名残とか、
野生時代に外敵から身を守るために排泄物を隠す行動の名残とか。
ここまではまだ許せます。
問題は食べる場所です。そう、ベッドで食べるんです。
しかも犬用ベッドじゃなくて、人間が寝るあのベッド。
おかげでシーツを1日に2回洗濯したこともあります。
一応トイレトレーで排泄はしてくれるんですが、
なぜかそれをわざわざベッドまで運び、くつろぎながら食べるという謎ムーブ。
(なんでやねん。トレーで食べてくれや)
さらに厄介なのは、人が見ていないときに限ってやること。
ペットカメラは置いてあるけど、ずっと監視しているわけにもいかず、気づいたときにはもう事件は終わっているのです。
そこで今回は――
余っていたラズベリーパイとUSBカメラを使って、
うんち検知警報システムを作ることにしました。
愛犬。ポメラニアンとスピッツのミックス犬。生後9ヶ月。かわいい。
使ったもの
・USBカメラ
たまたま余ってた物を使いました。大体のUSBカメラは問題なく使えます。
バッファロー BSW305MBK
・Arduino UNO R4 WiFi
今回はトイレトレーとは別の部屋にいる時に気付けるように、aruinoを使いました。
ちょっと課金すればスマホに直接通知するシステムもできますが、ケチりました。
システム図
今回作るのは―― 「犬のうんちを見張って光で知らせるシステム」 です。
流れはこう。
1.犬がうんちをする。
2.ラズベリーパイがUSBカメラでその瞬間をキャッチ。
3.ラズパイがWi‑Fi経由でArduinoに「うんち検知フラグ」を送信。
4.ArduinoがLEDをピカピカ点滅させて、飼い主に「事件発生!」をお知らせ。
5.(デバックも兼ねて)スマホでリアルタイム映像を確認可能。
ラズベリーパイは24時間、トイレトレーをじーっと監視。
1秒ごとに「うんち有り/無し」の結果をArduinoへ報告します。
Arduinoは「有り」と聞けば、即座にLEDを点滅。
これで、別の部屋にいても光で“うんち速報”を受け取れるわけです。
今回作るシステムの概要
今回は、Arduinoが余っていたということで使いましたが、IFTTTを使ってスマホに通知を送るなんてことも可能です。
日常がちょっとだけ便利になる電子工作 (https://qiita.com/s_iijima/items/035ee89be0ebc9aaf7c9)
ラズベリーパイ側のハードウェア
まずはカメラの設置をご覧ください。
黒いUSBカメラは、青いテープでガッチリ固定され、真下をじっと監視中。 その視線の先には、例のトイレトレーがあります。
右手に見える白いケースの中身は、今回の司令塔・ラズベリーパイ。 そして奥にひっそり佇む白い物体は、普段はペットカメラとして活躍している eufy。 今回は“助っ人”というより、ただのギャラリー役です。
こちらが今回のシステム全景です。 USBカメラとラズベリーパイ、そして監視対象のトイレトレー―― …のはずが、なぜか愛犬本人も堂々とフレームイン。
「どうせなら僕も紹介しろ」と言わんばかりのドヤ顔。 しかもトレーはすでに使用後というリアルタイム感。 ハードもソフトも、そして犬も、すべてが稼働中です。
ラズベリーパイ側のソフトウェア
まず、ソースコードをフルで載せます。その後ブロックごとに説明していきます。
import cv2
import numpy as np
import asyncio
import threading
import queue
import httpx
import time
from flask import Flask, Response
# ====== グローバル(最新フレーム共有)======
# スレッド間で最新のカメラフレームを安全に共有するためのロックと変数
latest_frame_lock = threading.Lock()
latest_frame = None
# ====== カメラ画像から白いトレイ領域を検出 ======
def detect_tray_area(frame):
"""
フレームから白いトレイの領域を検出する。
Args:
frame: 解析対象のカメラフレーム (BGR形式)。
Returns:
tuple: (最大の輪郭, トレイ領域のマスク, 色検出マスク)
検出できなかった場合は (None, None, 色検出マスク)
"""
# BGRからHSV色空間に変換
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 白色のHSV範囲を定義
lower_white = np.array([0, 0, 150])
upper_white = np.array([180, 60, 255])
# 指定した範囲の色を抽出するマスクを作成
mask = cv2.inRange(hsv, lower_white, upper_white)
# モルフォロジー変換でノイズを除去
kernel = np.ones((5, 5), np.uint8)
# オープニング処理: 小さなノイズ(白点)を除去
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# クロージング処理: 領域内の小さな穴(黒点)を埋める
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# マスクから輪郭を検出
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return None, None, mask
# 面積が最大の輪郭をトレイとして選択
largest_contour = max(contours, key=cv2.contourArea)
# 小さすぎる領域はトレイとみなさない (ノイズ対策)
if cv2.contourArea(largest_contour) < 5000:
return None, None, mask
# 最大輪郭から内部を塗りつぶしたマスクを作成
contour_mask = np.zeros_like(mask)
cv2.drawContours(contour_mask, [largest_contour], -1, 255, -1)
# 輪郭の内側を確実に対象とするため、マスクを少し収縮させる
contour_mask = cv2.erode(contour_mask, np.ones((15, 15), np.uint8))
return largest_contour, contour_mask, mask
# ====== OpenCV スレッド ======
def opencv_worker(state_queue_latest, stop_event, camera_index=0, show_window=True):
"""
カメラから映像を取得し、物体検出を行うワーカースレッド。
検出状態の変化をキューに送信し、最新フレームをグローバル変数に格納する。
Args:
state_queue_latest (queue.Queue): 検出状態を送信するためのキュー。
stop_event (threading.Event): スレッドを停止させるためのイベント。
camera_index (int): 使用するカメラのインデックス。
show_window (bool): 処理中の映像をウィンドウで表示するかどうか。
"""
global latest_frame
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
print("カメラを開けません")
stop_event.set()
return
is_currently_detected = False # 現在のフレームで物体が検出されているか
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
print("フレーム取得に失敗。再試行します...")
time.sleep(0.1)
continue
# 白いトレイ領域を検出
tray_contour, tray_mask, _ = detect_tray_area(frame)
if tray_contour is not None and tray_mask is not None:
# トレイの輪郭を緑色で描画
cv2.drawContours(frame, [tray_contour], -1, (0, 255, 0), 2)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# 検出対象(うんこ)の色のHSV範囲を定義 (黒〜茶色)
lower_black = np.array([0, 0, 0])
upper_black = np.array([180, 255, 70])
lower_brown = np.array([5, 50, 20])
upper_brown = np.array([35, 255, 255])
# 色範囲に基づいてマスクを作成
mask_black = cv2.inRange(hsv, lower_black, upper_black)
mask_brown = cv2.inRange(hsv, lower_brown, upper_brown)
# 黒と茶色のマスクを合成
unko_mask = cv2.bitwise_or(mask_black, mask_brown)
# トレイ領域内のみを対象とする
unko_mask = cv2.bitwise_and(unko_mask, tray_mask)
# マスクのノイズ除去
kernel = np.ones((5, 5), np.uint8)
unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_OPEN, kernel)
unko_mask = cv2.morphologyEx(unko_mask, cv2.MORPH_CLOSE, kernel)
# 検出領域を少し膨張させて検出しやすくする
unko_mask = cv2.dilate(unko_mask, kernel, iterations=1)
# 処理後のマスクから輪郭を検出
contours, _ = cv2.findContours(unko_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
detection_count = 0
for cnt in contours:
# 小さすぎる輪郭はノイズとして無視
if cv2.contourArea(cnt) > 50:
detection_count += 1
# 検出した物体の輪郭を赤色で描画
cv2.drawContours(frame, [cnt], -1, (0, 0, 255), 2)
x, y, w, h = cv2.boundingRect(cnt)
text_pos = (x + w + 5, y + 20)
# 検出したことを示すテキストを描画
cv2.putText(
frame,
"!!Unko Detected!!",
text_pos,
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(0, 0, 255),
2,
cv2.LINE_AA
)
# 検出状態が変化したかチェック
object_found = detection_count > 0
if object_found != is_currently_detected:
is_currently_detected = object_found
# 検出状態を 1 (ON) または 0 (OFF) としてキューに送信
data_to_send = 0x01 if object_found else 0x00
try:
# キューにノンブロッキングで追加
state_queue_latest.put(data_to_send, block=False)
except queue.Full:
# キューが満杯の場合、古いデータを捨てて最新のデータのみを保持する
with state_queue_latest.mutex:
state_queue_latest.queue.clear()
state_queue_latest.put_nowait(data_to_send)
# 最新の処理済みフレームをグローバル変数に格納 (Flask配信用)
with latest_frame_lock:
latest_frame = frame
# ローカルPCでデバッグ用にウィンドウ表示
if show_window:
cv2.imshow('Camera', frame)
# 'q'キーが押されたらループを抜ける
if cv2.waitKey(1) & 0xFF == ord('q'):
stop_event.set()
break
# 終了処理
cap.release()
if show_window:
cv2.destroyAllWindows()
print("OpenCVを終了しました。")
# ====== Flask サーバー(MJPEG)======
app = Flask(__name__)
def generate_mjpeg():
"""
グローバル変数から最新フレームを取得し、MJPEGストリームとして配信するジェネレータ。
"""
global latest_frame
while True:
with latest_frame_lock:
frame = latest_frame
if frame is None:
# OpenCVスレッドがフレームを準備できるまで少し待つ
time.sleep(0.03)
continue
# フレームをJPEG形式にエンコード
ok, jpeg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if not ok:
time.sleep(0.01)
continue
frame_bytes = jpeg.tobytes()
try:
# MJPEGストリームのフレームとして送信
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
except GeneratorExit:
# ブラウザが接続を閉じた場合
break
except Exception:
# その他の送信エラー (一時的なネットワークの問題など)
time.sleep(0.01)
@app.route('/')
def index():
"""WebページのUIを返す。"""
return '''
<html>
<head>
<title>うんこカメラ [LIVE]</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
margin: 0;
background-color: black;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
}
.video-container {
position: relative;
width: 100%;
max-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
background-color: black;
}
img {
width: 100%;
height: auto;
object-fit: contain;
display: block;
}
/* REC表示を動画の左上に重ねる */
.rec {
position: absolute;
top: 10px;
left: 10px;
color: red;
font-weight: bold;
font-size: 1.2em;
display: flex;
align-items: center;
animation: blink 1s step-start infinite;
background-color: rgba(0,0,0,0.4);
padding: 4px 8px;
border-radius: 4px;
}
.rec-dot {
width: 10px;
height: 10px;
background-color: red;
border-radius: 50%;
margin-right: 5px;
}
@keyframes blink {
50% { opacity: 0; }
}
h2 {
color: white;
margin: 5px 0;
font-size: 1.5em;
text-align: center;
}
</style>
</head>
<body>
<h2>うんこカメラ [LIVE]</h2>
<div class="video-container">
<div class="rec"><div class="rec-dot"></div>REC</div>
<img src="/video_feed">
</div>
</body>
</html>
'''
@app.route('/video_feed')
def video_feed():
"""MJPEGストリームを配信するエンドポイント。"""
return Response(generate_mjpeg(), mimetype='multipart/x-mixed-replace; boundary=frame')
def flask_worker(stop_event, host='0.0.0.0', port=5000):
"""
Flaskウェブサーバーを別スレッドで実行する。
デーモンスレッドとして実行されるため、メインスレッドが終了すると自動的に終了する。
"""
# flaskは明示的には止められないが、デーモンスレッドなのでプロセス終了で落ちる
print(f"[Flask] ライブ配信: http://{host}:{port}/")
# use_reloader=False は、Flaskが子プロセスを生成するのを防ぐために重要
app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True)
# ====== HTTP通信タスク ======
async def http_post_task(arduino_ip, state_queue_latest, stop_event):
"""
キューから検出状態を取得し、Arduinoに非同期でHTTP POST送信するタスク。
状態が変化したとき、または定期的に再送を試みる。
Args:
arduino_ip (str): 送信先ArduinoのIPアドレス。
state_queue_latest (queue.Queue): 検出状態が格納されるキュー。
stop_event (threading.Event): タスクを停止させるためのイベント。
"""
url = f"http://{arduino_ip}/unko"
current_state = 0 # 0: OFF, 1: ON
last_sent_state = -1 # 前回送信した状態 (-1は未送信を示す)
async with httpx.AsyncClient(timeout=5.0) as client:
print(f"[HTTP] Arduino ({url}) への送信を開始します。")
while not stop_event.is_set():
# キューから最新の検出状態を取得する
try:
# キューに溜まっている古いデータをすべて読み飛ばし、最新の値のみを取得
while True:
state_from_queue = state_queue_latest.get_nowait()
current_state = state_from_queue
except queue.Empty:
# キューが空なら何もしない
pass
data_to_send = str(current_state).encode('utf-8')
try:
# 状態が変化した場合、または前回の送信に失敗した場合のみ送信を試みる
if current_state != last_sent_state:
print(f"[HTTP] 状態を送信します: {'ON' if current_state == 1 else 'OFF'}")
response = await client.post(url, content=data_to_send)
response.raise_for_status()
# 送信に成功した場合
if current_state != last_sent_state:
print(f"[HTTP] 送信成功。Arduinoの応答: {response.text.strip()}")
last_sent_state = current_state
except httpx.RequestError as exc:
# タイムアウトや接続エラーなど
print(f"[HTTP] 送信エラー: {exc}")
# 再送を試みるために last_sent_state をリセット
last_sent_state = -1
except Exception as e:
# その他の予期せぬエラー
print(f"[HTTP] 予期せぬエラー: {e}")
last_sent_state = -1
# 短い待機時間を入れてループの過負荷を防ぐ
await asyncio.sleep(0.1)
print("HTTPタスクを終了しました。")
# ====== メイン ======
async def main_async():
"""
メインの非同期関数。
各スレッド(OpenCV, Flask)と非同期タスク(HTTP)を起動し、管理する。
"""
# --- 設定値 ---
ARDUINO_IP_ADDRESS = "192.168.0.67" # ArduinoのIPアドレスを設定
CAMERA_INDEX = 0 # カメラ切替が必要なら変更
SHOW_WINDOW = False # サーバー側でGUIウィンドウを表示しない場合は False
# --- 共有リソースの初期化 ---
# OpenCVスレッドからHTTPタスクへ最新の検出状態を渡すためのキュー (サイズ1)
state_queue_latest = queue.Queue(maxsize=1)
# すべてのスレッドとタスクを安全に停止させるためのイベント
stop_event = threading.Event()
# --- OpenCVスレッドの起動 ---
t_cv = threading.Thread(
target=opencv_worker,
args=(state_queue_latest, stop_event, CAMERA_INDEX, SHOW_WINDOW),
daemon=True # メインスレッド終了時に自動で終了させる
)
t_cv.start()
# --- Flaskスレッドの起動 ---
t_flask = threading.Thread(
target=flask_worker,
args=(stop_event, '0.0.0.0', 5000),
daemon=True # メインスレッド終了時に自動で終了させる
)
t_flask.start()
# --- HTTP非同期タスクの起動 ---
http_task = asyncio.create_task(http_post_task(
ARDUINO_IP_ADDRESS, state_queue_latest, stop_event
))
try:
# メインループ: stop_eventがセットされるか、HTTPタスクが完了するまで待機
while not stop_event.is_set() and not http_task.done():
await asyncio.sleep(0.2)
except KeyboardInterrupt:
# Ctrl+C が押されたときの処理
print("\nキーボード割り込みを検出しました。終了処理を開始します...")
finally:
# --- クリーンアップ処理 ---
print("各タスク/スレッドに停止信号を送信します...")
stop_event.set()
print("HTTPタスクをキャンセルしています...")
http_task.cancel()
try:
# キャンセルが完了するのを待つ
await http_task
except asyncio.CancelledError:
# キャンセルは正常な終了パス
pass
# デーモンスレッド (t_cv, t_flask) はここで join() しなくても
# プロセス終了時に自動的に終了する
print("プログラムを終了しました。")
if __name__ == '__main__':
# 非同期のメイン関数を実行
asyncio.run(main_async())
このプログラム、やってることはシンプルです。 カメラで映した映像から「うんこ」を見つけたら―
- Webブラウザでライブ映像を配信して現場を実況中継
- 同時にArduinoへHTTPで「事件発生!」を通報
という、二刀流で外部に知らせます。
この全体を仕切るのがmain_async
関数。いわば現場監督兼司令塔。ここから各ブロックに指示を飛ばし、「検出 → 配信 → 通報」の流れを回しています。
では、このmain_async
がどんな段取りで動いているのか、ブロックごとに分解して見ていきましょう。
1. 設定値の定義
# --- 設定値 ---
ARDUINO_IP_ADDRESS = "192.168.0.67" # ArduinoのIPアドレスを設定
CAMERA_INDEX = 0 # カメラ切替が必要なら変更
SHOW_WINDOW = False # サーバー側でGUIウィンドウを表示しない場合は False
プログラムの冒頭には、いくつかの設定項目があります。
-
ARDUINO_IP_ADDRESS
犯行(うんち)を検知したら通報する先、ArduinoのIPアドレスです。
ここを間違えると、LEDは一生光らず、ベッドがうんちまみれになります。 -
CAMERA_INDEX
どのカメラを使うかを番号で指定します。
USBカメラが1台ならだいたい「0」でOK。 -
SHOW_WINDOW
Trueにすると、ラズパイのデスクトップにデバッグ用の映像ウィンドウが出ます。
犯行現場をリアルタイムで見たいときは便利ですが、
サーバーとして常時運用するならFalse推奨。
(ずっと映像が出っぱなしだと、無駄に消費電力が増えちゃいます。)
2. 共有リソースの初期化
複数のスレッドやタスクが同時に動く今回のシステムでは、 「誰がいつ何を受け取ったか」がぐちゃぐちゃにならないよう、 情報を安全にやり取りする仕組みをあらかじめ用意しています。
# --- 共有リソースの初期化 ---
# OpenCVスレッドからHTTPタスクへ最新の検出状態を渡すためのキュー (サイズ1)
state_queue_latest = queue.Queue(maxsize=1)
# すべてのスレッドとタスクを安全に停止させるためのイベント
stop_event = threading.Event()
-
state_queue_latest
画像処理スレッド(生産者)と、HTTP通知タスク(消費者)の間で
「うんち検出中かどうか(ON/OFF)」を受け渡すためのキューです。
maxsize=1
にしてあるので、常に最新の1件だけを保持。
新しいデータが来たら古いのは即ポイして、最新状態に上書きします。 -
stop_event
プログラム全体を安全に終了させるための“信号旗”です。
これがセットされると、各スレッドやタスクは「おっと、撤収だな」と察して
ループを抜け、終了処理に入ります。
強制終了でデータが壊れるのを防ぐための、いわば非常ベル。
3. 各機能の起動(スレッド/タスク)
今回のシステムは、主要な3つの役者――
①画像処理、②Webサーバー、③HTTP通知を、それぞれ別々のスレッドや非同期タスクとして走らせます。
こうすることで、
- 画像解析みたいな重たい処理
- HTTP通信みたいに待ち時間が発生する処理
が、お互いの足を引っ張らずに同時進行できます。
つまり「画像処理が終わるまで通知が遅れる」なんてことはなく、 現場(カメラ)と司令室(Arduino)と中継(Web配信)がそれぞれ自分のペースで動き続けられるわけです。
①OpenCVスレッドの起動
カメラからの映像取得と画像解析を担当するのが、このシステムの“現場班”です。
ここでは opencv_worker
関数を別スレッドで走らせて、ひたすら監視と解析を続けます。
この画像解析がキモですので、詳細はうんこ検出ロジックの詳細解説で解説します。
# --- OpenCVスレッドの起動 ---
t_cv = threading.Thread(
target=opencv_worker,
args=(state_queue_latest, stop_event, CAMERA_INDEX, SHOW_WINDOW),
daemon=True # メインスレッド終了時に自動で終了させる
)
t_cv.start()
-
opencv_worker
関数を別スレッドで実行
メインとは独立して動くので、他の処理に邪魔されずにカメラ監視に集中できます。 -
役割
- カメラからフレームを連続取得
- 取得したフレームを画像処理して、物体(うんこ)を検出
- 検出状態に変化があったら(例: 未検出→検出)、その状態(
0
or1
)をstate_queue_latest
に書き込み - Web配信用に、解析結果(検出範囲の枠線など)が描き込まれた最新フレームをグローバル変数に保存
-
daemon=True
設定
メインプログラムが終了すると、このスレッドも自動的に終了します。
つまり「現場班」も司令塔が帰れば一緒に撤収する仕組みです。
② Flaskスレッドの起動
カメラ映像のライブ配信を担当するのが、このシステムの“中継班”です。
ここでは flask_worker
関数を別スレッドで動かし、現場の様子をリアルタイムでお届けします。
# --- Flaskスレッドの起動 ---
t_flask = threading.Thread(
target=flask_worker,
args=(stop_event, '0.0.0.0', 5000),
daemon=True # メインスレッド終了時に自動で終了させる
)
t_flask.start()
クリックして該当コードを表示
# ====== Flask サーバー(MJPEG)======
app = Flask(__name__)
def generate_mjpeg():
"""
グローバル変数から最新フレームを取得し、MJPEGストリームとして配信するジェネレータ。
"""
global latest_frame
while True:
with latest_frame_lock:
frame = latest_frame
if frame is None:
# OpenCVスレッドがフレームを準備できるまで少し待つ
time.sleep(0.03)
continue
# フレームをJPEG形式にエンコード
ok, jpeg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if not ok:
time.sleep(0.01)
continue
frame_bytes = jpeg.tobytes()
try:
# MJPEGストリームのフレームとして送信
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
except GeneratorExit:
# ブラウザが接続を閉じた場合
break
except Exception:
# その他の送信エラー (一時的なネットワークの問題など)
time.sleep(0.01)
@app.route('/')
def index():
"""WebページのUIを返す。"""
return '''
<html>
<head>
<title>うんこカメラ [LIVE]</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
margin: 0;
background-color: black;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
}
.video-container {
position: relative;
width: 100%;
max-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
background-color: black;
}
img {
width: 100%;
height: auto;
object-fit: contain;
display: block;
}
/* REC表示を動画の左上に重ねる */
.rec {
position: absolute;
top: 10px;
left: 10px;
color: red;
font-weight: bold;
font-size: 1.2em;
display: flex;
align-items: center;
animation: blink 1s step-start infinite;
background-color: rgba(0,0,0,0.4);
padding: 4px 8px;
border-radius: 4px;
}
.rec-dot {
width: 10px;
height: 10px;
background-color: red;
border-radius: 50%;
margin-right: 5px;
}
@keyframes blink {
50% { opacity: 0; }
}
h2 {
color: white;
margin: 5px 0;
font-size: 1.5em;
text-align: center;
}
</style>
</head>
<body>
<h2>うんこカメラ [LIVE]</h2>
<div class="video-container">
<div class="rec"><div class="rec-dot"></div>REC</div>
<img src="/video_feed">
</div>
</body>
</html>
'''
@app.route('/video_feed')
def video_feed():
"""MJPEGストリームを配信するエンドポイント。"""
return Response(generate_mjpeg(), mimetype='multipart/x-mixed-replace; boundary=frame')
def flask_worker(stop_event, host='0.0.0.0', port=5000):
"""
Flaskウェブサーバーを別スレッドで実行する。
デーモンスレッドとして実行されるため、メインスレッドが終了すると自動的に終了する。
"""
# flaskは明示的には止められないが、デーモンスレッドなのでプロセス終了で落ちる
print(f"[Flask] ライブ配信: http://{host}:{port}/")
# use_reloader=False は、Flaskが子プロセスを生成するのを防ぐために重要
app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True)
-
flask_worker
関数を別スレッドで実行
メイン処理とは独立して動くので、画像解析や通知処理に邪魔されず、配信に専念できます。 -
役割
- Flaskウェブサーバーを起動(配信スタジオのオープン)
-
/video_feed
にアクセスがあると、OpenCVスレッドが保存した最新フレームを
MJPEG形式のストリームとして配信 - これにより、PCやスマホのブラウザから現場をリアルタイムで確認可能
-
daemon=True
設定
メインプログラムが終了すれば、このスレッドも自動で終了。
つまり「中継班」も現場が閉まれば一緒に撤収します。
③ HTTP非同期タスクの起動
Arduinoへの検出状態の通知を担当するのが、このシステムの“通報班”です。
ここでは http_post_task
関数を非同期タスクとして動かし、現場からの情報を即座に司令室(Arduino)へ届けます。
# --- HTTP非同期タスクの起動 ---
http_task = asyncio.create_task(http_post_task(
ARDUINO_IP_ADDRESS, state_queue_latest, stop_event
))
# ====== HTTP通信タスク ======
async def http_post_task(arduino_ip, state_queue_latest, stop_event):
"""
キューから検出状態を取得し、Arduinoに非同期でHTTP POST送信するタスク。
状態が変化したとき、または定期的に再送を試みる。
Args:
arduino_ip (str): 送信先ArduinoのIPアドレス。
state_queue_latest (queue.Queue): 検出状態が格納されるキュー。
stop_event (threading.Event): タスクを停止させるためのイベント。
"""
url = f"http://{arduino_ip}/unko"
current_state = 0 # 0: OFF, 1: ON
last_sent_state = -1 # 前回送信した状態 (-1は未送信を示す)
async with httpx.AsyncClient(timeout=5.0) as client:
print(f"[HTTP] Arduino ({url}) への送信を開始します。")
while not stop_event.is_set():
# キューから最新の検出状態を取得する
try:
# キューに溜まっている古いデータをすべて読み飛ばし、最新の値のみを取得
while True:
state_from_queue = state_queue_latest.get_nowait()
current_state = state_from_queue
except queue.Empty:
# キューが空なら何もしない
pass
data_to_send = str(current_state).encode('utf-8')
try:
# 状態が変化した場合、または前回の送信に失敗した場合のみ送信を試みる
if current_state != last_sent_state:
print(f"[HTTP] 状態を送信します: {'ON' if current_state == 1 else 'OFF'}")
response = await client.post(url, content=data_to_send)
response.raise_for_status()
# 送信に成功した場合
if current_state != last_sent_state:
print(f"[HTTP] 送信成功。Arduinoの応答: {response.text.strip()}")
last_sent_state = current_state
except httpx.RequestError as exc:
# タイムアウトや接続エラーなど
print(f"[HTTP] 送信エラー: {exc}")
# 再送を試みるために last_sent_state をリセット
last_sent_state = -1
except Exception as e:
# その他の予期せぬエラー
print(f"[HTTP] 予期せぬエラー: {e}")
last_sent_state = -1
# 短い待機時間を入れてループの過負荷を防ぐ
await asyncio.sleep(0.1)
print("HTTPタスクを終了しました。")
-
http_post_task
関数を非同期タスクとして実行
他の処理を止めずに、裏でコソコソ通報作業を続けます。 -
役割
-
state_queue_latest
キューを定期的に監視 - 新しい検出状態を取得
- 状態が前回送信したものから変わっていたら、ArduinoのIPアドレス
(http://{IP}/unko
)にHTTP POSTリクエストを送信して通知 - 通信エラーが出たら再送を試みる
-
-
asyncio
とhttpx
の活用
ネットワーク通信の待ち時間でプログラム全体が固まらないよう、
非同期処理でサクサク動かします。
これで「通報中に監視が止まる」なんてことはありません。
4. メインループと終了処理
すべてのスレッド/タスクを立ち上げたあとは、メインプログラムは“待機モード”に入ります。
ここからは、終了シグナル(Ctrl+Cなど)が来るまでじっと構えている状態です。
try:
# メインループ: stop_eventがセットされるか、HTTPタスクが完了するまで待機
while not stop_event.is_set() and not http_task.done():
await asyncio.sleep(0.2)
except KeyboardInterrupt:
# Ctrl+C が押されたときの処理
print("\nキーボード割り込みを検出しました。終了処理を開始します...")
finally:
# --- クリーンアップ処理 ---
print("各タスク/スレッドに停止信号を送信します...")
stop_event.set()
print("HTTPタスクをキャンセルしています...")
http_task.cancel()
try:
# キャンセルが完了するのを待つ
await http_task
except asyncio.CancelledError:
# キャンセルは正常な終了パス
pass
print("プログラムを終了しました。")
-
try
ブロック
while
ループでプログラムをアクティブに保ちます。
stop_event
がセットされるまでは、短いスリープを挟みつつ待機。 -
except KeyboardInterrupt
ブロック
ユーザーが Ctrl+C を押すとここが発動。
「はい撤収〜」というメッセージを出して、終了処理の準備に入ります。 -
finally
ブロック
try
を抜けると必ず実行されるクリーンアップ処理。-
stop_event.set()
— 全スレッド/タスクに「止まれ!」信号を送る -
http_task.cancel()
— HTTP通知タスクを明示的にキャンセル -
await http_task
— キャンセル完了を待つ - 最後に「正常終了しました」的なメッセージを表示して終了
(デーモンとして動いていた OpenCV と Flask のスレッドも、メイン終了とともに自動停止)
-
こうして、現場班・中継班・通報班の全員が、きれいに業務終了となります。
5. プログラムの実行
このスクリプトが「直接」実行されたときだけ、非同期の main_async
関数を起動します。
ここで asyncio.run()
がイベントループを立ち上げ、main_async
を走らせることで、
これまで説明してきた“現場班・中継班・通報班”が一斉に動き出します。
if __name__ == '__main__':
# 非同期のメイン関数を実行
asyncio.run(main_async())
いわば、ここが全システムのスタートボタン。
うんこ検出ロジックの詳細解説
このプログラムの物体検出は、ただ「それっぽい色」を探すだけじゃありません。
ポイントは 「どこで」 探すかをあらかじめ絞り込むこと。
これによって、検出の精度と信頼性がグッと上がります。
処理の流れは大きく2ステップ。
【ステップ1】監視エリアの特定
まず、カメラ映像から白色のトイレシーツ(トレイ)を見つけ出し、そこを「監視すべきエリア」として限定します。
(つまり、部屋全体じゃなく“現場”だけを見張る)
【ステップ2】ターゲットの検出
次に、ステップ1で特定した監視エリア内だけを対象に、ターゲットである「うんこ」の色(黒〜茶色)を探します。
この2段階アプローチのおかげで、家具や床、影といった無関係な背景は無視。
トイレシーツ上に現れた“本命”だけを正確にキャッチできます。
ステップ1: 白色のトイレシーツを特定する (detect_tray_area
関数)
まずは現場の“ステージ”を特定します。
カメラが映しているのは部屋全体ですが、うんこが出現するのは基本的にトイレシーツの上。
そこで、このステップでは映像の中から白色のトイレシーツ部分だけを切り出します。
こうすることで、後の検出処理が「部屋全体を探す大捜索」ではなく、
「現場だけを重点監視するピンポイント捜査」に変わります。
結果として、家具や床、影などのノイズを無視でき、検出の精度と信頼性がグッと向上します。
①HSV色空間への変換
まずは画像を、カメラが吐き出すBGR(青・緑・赤)形式から**HSV(色相・彩度・明度)**色空間に変換します。
# BGRからHSV色空間に変換
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
なぜそんな面倒なことをするのかというと、HSVだと「色の種類(H)」「色の鮮やかさ(S)」「明るさ(V)」をそれぞれ独立して扱えるからです。
これにより、部屋の照明がちょっと暗くなったり明るくなったりしても、「白」という色をブレずに特定できます。(BGRのままだと、照明の加減で“白”が“灰色”や“黄色っぽい何か”に見えてしまうことも)
②白色の範囲を指定してマスクを作成
次は、HSV空間で「白」とみなす範囲を決めて、その条件に合うピクセルだけを抜き出します。
この抜き出し結果が“マスク”と呼ばれる二値画像(白=該当、黒=非該当)です。
# 白色のHSV範囲を定義
lower_white = np.array([0, 0, 150])
upper_white = np.array([180, 60, 255])
# 指定した範囲の色を抽出するマスクを作成
mask = cv2.inRange(hsv, lower_white, upper_white)
今回の設定では、
-
明度(V) が
150
以上 -
彩度(S) が
60
以下
という条件を「白」とみなしています。 この条件で抽出されるのは、色味のない明るい部分(=無彩色の明るい領域)です。
つまり、 「室内で最もまぶしい無彩色エリア=トイレシーツ」 をピンポイントで拾い出すためのフィルター、というわけです。
なお、この数値は私の環境(使用カメラや照明条件)で試行錯誤の末に落ち着いたものです。 機材や設置場所が変われば見え方も変わるので、実際に使う際はぜひ自分の環境で調整してみてください。
③ノイズ除去(モルフォロジー変換)
作成したマスクには、どうしても小さなゴミや光の反射によるチラつきノイズが混ざります。
このままだと「トイレシーツが点々と分裂して見える」みたいな状態になり、後の検出精度がガタ落ちします。
そこで登場するのが モルフォロジー変換。
画像処理界の“お掃除係”です。
# モルフォロジー変換でノイズを除去
kernel = np.ones((5, 5), np.uint8)
# オープニング処理: 小さなノイズ(白点)を除去
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
# クロージング処理: 領域内の小さな穴(黒点)を埋める
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
-
オープニング処理
画像上にポツポツ残っている小さな白点(ノイズ)を除去します。
これで「関係ないホコリ」をきれいに掃き出します。 -
クロージング処理
領域内部にある小さな黒い穴を埋めます。
これにより、トイレシーツの輪郭が途切れず、しっかり一枚の領域として認識されます。
④最大領域の特定と最終マスクの生成
ノイズ除去を終えたマスクから輪郭を検出し、その中で一番面積が大きい領域を「トイレシーツ」として確定します。
# 面積が最大の輪郭をトレイとして選択
largest_contour = max(contours, key=cv2.contourArea)
# ...
# 最大輪郭から内部を塗りつぶしたマスクを作成
contour_mask = np.zeros_like(mask)
cv2.drawContours(contour_mask, [largest_contour], -1, 255, -1)
# 輪郭の内側を確実に対象とするため、マスクを少し収縮させる
contour_mask = cv2.erode(contour_mask, np.ones((15, 15), np.uint8))
次に、その輪郭の内側を塗りつぶして最終的なマスクを作成。
さらに cv2.erode
(収縮処理)をかけて、領域をほんの少しだけ内側に縮めます。
これは、シーツの境界ギリギリにある微妙な色や影を監視範囲から外し、**「確実にシーツの内側だけを見る」**ための安全策です。
こうして出来上がった contour_mask
が、次のステップで使う監視エリアになります。
いわば「ここが現場」という立ち入り線を引いた状態です。
ステップ2: トイレシーツ内で「うんこ」を検出する (opencv_worker
関数)
ステップ1で作った「監視エリア」マスクの範囲内だけを対象にターゲット(うんこ)を探します。
①ターゲット(うんこ)の色の定義
次は、いよいよ本命ターゲットの色を決めます。 ここでいうターゲットとはもちろん――うんこです。
検出方法はシンプルで、HSV色空間で「黒〜茶色」の範囲を定義します。 黒は暗くて彩度が低い領域、茶色はやや明るくて赤みを帯びた低彩度領域として設定。 この2つの範囲をカバーすることで、新鮮な(?)ものからちょっと乾いたものまで、幅広く検出できます。
ちなみにここで紹介している数値は、あくまで私の環境(カメラ・照明条件)でうまくいった設定です。 カメラの種類や撮影場所の明るさによって見え方は変わるので、実際に使うときは自分の環境に合わせて微調整してみてください。
# 検出対象(うんこ)の色のHSV範囲を定義 (黒〜茶色)
lower_black = np.array([0, 0, 0])
upper_black = np.array([180, 255, 70])
lower_brown = np.array([5, 50, 20])
upper_brown = np.array([35, 255, 255])
②色マスクの作成と監視エリアの適用
まず、ステップ1で定義した「黒」と「茶色」の色範囲に基づいて、それぞれマスクを作成します。
この2つのマスクを合成すれば、「怪しい色ゾーン」が一枚の画像としてまとまります。
# 黒と茶色のマスクを合成
unko_mask = cv2.bitwise_or(mask_black, mask_brown)
# トレイ領域内のみを対象とする
unko_mask = cv2.bitwise_and(unko_mask, tray_mask)
そして、ここからが一番大事なポイント。
ステップ1で作ったトイレシーツのマスク(tray_mask
)と、この色マスクをAND演算で掛け合わせます。
cv2.bitwise_and
を使うと、両方のマスクで白(255)になっているピクセルだけが残ります。
つまり、
「トイレシーツの上にあって、なおかつ黒または茶色の部分」
だけを抽出できるわけです。
こうして出来上がったのが、最終的な検出候補 unko_mask
。
このマスクこそが、「現場に落ちているブツ」を特定するための決定打になります。
③最終的な輪郭検出
ここまでで作った unko_mask
に対して、もう一度ノイズ除去などの後処理を行います。
余計な点や細かいゴミを取り除いたら、残った領域の輪郭を検出します。
# 処理後のマスクから輪郭を検出
contours, _ = cv2.findContours(unko_mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
detection_count = 0
for cnt in contours:
# 小さすぎる輪郭はノイズとして無視
if cv2.contourArea(cnt) > 50:
detection_count += 1
# ... 輪郭を描画 ...
そして、その輪郭の中で一定以上の面積を持つものがあれば、それはもう「うんこ発見!」と判断します。
このように、
- 監視エリアを動的に特定し
- その中だけでターゲットを探す
という二段構えの賢い(自称)手法によって、
部屋の明るさや背景の変化にも強く、誤検出の少ない安定した物体検出システムが完成します。
ラズベリーパイ側のソフト動作
プログラムを起動したら、あとはラズベリーパイと同じWi‑FiにつながっているスマホやPCのブラウザで、ラズパイの IPアドレス:ポート番号 を叩くだけ。
すると――下図のように、リアルタイム映像がブラウザにドンと表示されます。
白いトイレシーツのフチが緑色のラインで囲まれているのが見えるはず。
これはステップ1で作った「監視エリアマスク」がちゃんと機能している証拠です。
つまり、カメラは部屋全体を見ているようでいて、実際には「白いシーツの範囲だけ」をしっかりロックオンしているわけです。
私のスマホからラズベリーパイのサーバーにアクセスしたときのブラウザ画面
下図は、うんこを検知したときのデモ映像です。
…とはいえ、本物をお見せするのはさすがにアレなので、
今回は代役として黒い財布にご登場いただきました。
映像では、この“うんこ(財布)”が赤い枠でしっかり囲われ、
画面上には 「Unko Detected」 の文字がドンと表示されます。
つまり、システム的には
「黒くて怪しい物体がシーツの上にある → これはうんこだ!」
という判断が下された状態です。
うんこを検知した時の画面
このコードにはちょっとした弱点もあります。
まず、うちの愛犬が白い毛並みのため、ステップ1で作った「白いトイレシーツ検出マスク」に愛犬の体も含まれてしまいます。
とはいえ、うんち中は愛犬の体でうんちが隠れてカメラに映らないので、実害は少ないと割り切っています。
もうひとつの問題は暗所。
夜間や電気を消した状態では、カメラ映像がほぼ真っ暗になり、当然ながら検知は不可能です。
これを解決するには、
- 専用のライトを設置する
- 暗所でも撮影できる高感度カメラに置き換える
といったハード面での対策が必要になります。
要するに、現状のシステムは明るい時間帯専用・白犬はシーツ扱いという制限付きで運用中です。
シーツとして検知されている愛犬
Arduino側のハードウェア
こちら側の設置は本当にシンプル。
特別な固定具や加工は不要で、よく視界に入る場所にポンと置くだけです。
今回は、アニメを見たりゲームをしているときでもすぐ気づけるように、テレビ台の上に設置しました。
こうしておけば、LEDが点灯した瞬間に視界に入り、「事件発生!」をリアルタイムでキャッチできます。
Arduino側のソフトウェア
まず、ソースコードをフルで載せます。その後ブロックごとに説明していきます。
#include <WiFiS3.h>
#include <Arduino_LED_Matrix.h>
// --- Wi-Fi Configuration ---
// ご自身のWi-Fiネットワーク情報に書き換えてください
char ssid[] = "********";
char pass[] = "*********";
int status = WL_IDLE_STATUS;
WiFiServer server(80);
// -------------------------
ArduinoLEDMatrix matrix;
uint8_t ALL_ON_BITMAP[8][12] PROGMEM = {
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1},
{1,1,1,1,1,1,1,1,1,1,1,1}
};
bool unkoIsPresent = false;
bool matrixIsOn = false;
unsigned long matrixBlinkPreviousMillis = 0;
const long matrixBlinkInterval = 100;
bool matrixIsCurrentlyRendered = false;
void printWifiStatus() {
Serial.print("SSID: ");
Serial.println(WiFi.SSID());
IPAddress ip = WiFi.localIP();
Serial.print("IP Address: ");
Serial.println(ip);
}
void setup() {
Serial.begin(115200);
matrix.begin();
Serial.println("LED Matrix Boot Test");
matrix.renderBitmap(ALL_ON_BITMAP, 8, 12);
delay(1000);
matrix.clear();
// Wi-Fiモジュールのチェック
if (WiFi.status() == WL_NO_MODULE) {
Serial.println("Communication with WiFi module failed!");
while (1) { delay(100); }
}
String fv = WiFi.firmwareVersion();
if (fv < WIFI_FIRMWARE_LATEST_VERSION) {
Serial.println("Please upgrade the firmware");
}
// Wi-Fiネットワークへの接続試行
Serial.print("Attempting to connect to SSID: ");
Serial.println(ssid);
while (status != WL_CONNECTED) {
status = WiFi.begin(ssid, pass);
delay(5000);
Serial.print(".");
}
Serial.println("\nConnected to wifi");
server.begin();
printWifiStatus();
Serial.println("HTTP server started. Waiting for requests on port 80.");
}
void loop() {
// HTTPサーバーのロジック
WiFiClient client = server.available();
if (client) {
Serial.println("New client");
String request_line = client.readStringUntil('\n');
if (request_line.startsWith("POST /unko")) {
// ヘッダーをスキップ
while (client.connected() && client.available()) {
String header_line = client.readStringUntil('\n');
if (header_line.length() <= 1) { // 空行を検出
break;
}
}
// ボディを読み取り
if (client.connected() && client.available()) {
char value = client.read();
Serial.print("HTTP Body received: ");
Serial.println(value);
if (value == '1') {
unkoIsPresent = true;
Serial.println(" -> ON");
} else if (value == '0') {
unkoIsPresent = false;
Serial.println(" -> OFF");
}
}
}
// レスポンスを送信
client.println("HTTP/1.1 200 OK");
client.println("Content-Type: text/plain");
client.println("Connection: close");
client.println();
client.println("OK");
delay(1);
client.stop();
Serial.println("Client disconnected");
}
// LED点滅ロジック
if (unkoIsPresent) {
if (!matrixIsOn) matrixIsOn = true;
unsigned long currentMillis = millis();
if (currentMillis - matrixBlinkPreviousMillis >= matrixBlinkInterval) {
matrixBlinkPreviousMillis = currentMillis;
if (matrixIsCurrentlyRendered) {
matrix.clear();
matrixIsCurrentlyRendered = false;
} else {
matrix.renderBitmap(ALL_ON_BITMAP, 8, 12);
matrixIsCurrentlyRendered = true;
}
}
} else if (matrixIsOn) {
matrix.clear();
matrixIsOn = false;
matrixIsCurrentlyRendered = false;
}
}
このプログラムは、Wi‑Fi機能を備えたArduinoボード上で動作し、
特定のHTTPリクエストを受け取ったときにLEDマトリックスを点滅させます。
想定シナリオはこうです。
カメラ付きRaspberry Piが「うんこ」を検知すると、
その瞬間にArduinoへHTTPリクエストを送信。
ArduinoはそれをトリガーとしてLEDマトリックスを点滅させ、
「事件発生!」を視覚的に知らせる警報機として機能します。
つまり、
- Raspberry Pi:監視&検知
- Arduino:通知&警告
という役割分担で、検知から警告をに実現しているわけです。
setup()
関数処理の流れ
setup()
関数は、Arduinoが電源投入またはリセットされた直後に一度だけ実行される初期化ルーチンです。
ここで、ハードウェアやネットワークの準備をすべて整え、プログラム本体(loop()
)が安定して動ける状態にします。
要するに setup()
は、
「電源ON → 機材チェック → ネット接続 → 待機OK」
というスタートアップシーケンスを一気にこなす場所です。
ここをきちんと整えておくことで、後のloop()
処理が安定して動作します。
1. シリアル通信とLEDマトリックスの初期化
Serial.begin(115200);
matrix.begin();
-
Serial.begin(115200);
ArduinoとPC間でテキストデータをやり取りするためのシリアル通信を開始します。
115200
は通信速度(ボーレート)で、比較的高速なデータ転送が可能です。
この通信は、Wi‑Fi接続状況や受信データ、エラー内容などを
シリアルモニタに出力して確認するためのデバッグ手段として不可欠です。 -
matrix.begin();
**LEDマトリックス(今回警告を表示するLED)**を制御するライブラリ(Arduino_LED_Matrix
)の初期化処理を実行します。
これにより、内部的に以下の準備が行われます。- 制御用ピンの設定
- タイマーや描画バッファの初期化
- LED点灯命令を受け付けられる状態への移行
この初期化が完了すると、以降のコードから
matrix.print()
やmatrix.drawBitmap()
などの命令でLEDマトリックスに自由に表示を行えるようになります。
2. LEDマトリックスの起動テスト
Serial.println("LED Matrix Boot Test");
matrix.renderBitmap(ALL_ON_BITMAP, 8, 12);
delay(1000);
matrix.clear();
この処理は、Arduinoの電源投入直後にLEDマトリックスが正常に動作しているかを目視で確認するためのセルフテストです。
手順としては――
-
全LEDを点灯
- すべてのドットを一斉に光らせることで、配線ミスやLEDの不点灯を一目で確認できます。
-
1秒間の待機 (
delay(1000)
)- 人間の目で確実に点灯状態を認識できるよう、プログラムを1秒間停止します。
-
全LEDを消灯
- 初期状態に戻し、以降の通常動作に備えます。
このテストがあることで、電源を入れた瞬間にハードが生きているかどうかを即座に判断でき、開発や運用時のトラブルシューティングが格段に楽になります。
3. Wi-Fiモジュールのチェック
if (WiFi.status() == WL_NO_MODULE) {
// ...
}
String fv = WiFi.firmwareVersion();
if (fv < WIFI_FIRMWARE_LATEST_VERSION) {
// ...
}
この処理は、ArduinoがWi‑Fi機能を利用できる状態かどうかを起動時に確認するための
重要なフェイルセーフ機構です。
具体的には、WiFi.status()
の戻り値が WL_NO_MODULE
でないかをチェックします。
WL_NO_MODULE
は、
- Wi‑Fiモジュールが物理的に搭載されていない
- またはハード的に認識されていない
ときに返されるエラーコードです。
もしこの状態だった場合、while(1);
という無限ループに入り、プログラムの実行をそこで完全に停止させます。
これにより、存在しないWi‑Fi機能を使おうとして予期せぬエラーや暴走が発生するのを未然に防ぎます。
要するにここは、
「通信機の電源が入っていないなら、出発しない」
という安全確認ステップです。
4. Wi-Fiネットワークへの接続
while (status != WL_CONNECTED) {
status = WiFi.begin(ssid, pass);
delay(5000);
Serial.print(".");
}
この処理では、Arduinoが指定されたWi‑Fiネットワークに接続できるまで
接続試行を繰り返すループを実行します。
-
WiFi.begin(ssid, password);
登録されたSSIDとパスワードを使って接続を開始します。 -
while (WiFi.status() != WL_CONNECTED)
接続状態がWL_CONNECTED
になるまでループを継続します。
つまり「つながるまで何度でも挑戦」する仕組みです。 -
delay(5000);
接続試行の間に5秒間の待機時間を設けます。
Wi‑Fi接続は即時ではなく数秒かかるため、
適度な間隔を空けることで無駄な再試行や処理負荷を防ぎます。 -
シリアルモニタへの「.」出力
待機中に「.」を連続表示することで、開発者は「プログラムが固まったのではなく、接続待ち中」であることを一目で確認できます。
5. HTTPサーバーの起動と情報表示
server.begin();
printWifiStatus();
Serial.println("HTTP server started. Waiting for requests on port 80.");
Wi‑Fi接続が無事に確立したら、いよいよ外部からのアクセスを受け付ける準備に入ります。
-
server.begin();
ArduinoをTCPサーバーとして動作させ、HTTP通信の標準ポートである80番ポートで待ち受けを開始します。
これにより、Raspberry Piなどの外部デバイスから送られるHTTPリクエストを受信できる状態になります。
(ここが開いていないと、検知通知が届かずシステムが機能しません) -
printWifiStatus();
接続中のWi‑FiネットワークのSSIDと、DHCPサーバーから割り当てられたIPアドレスをシリアルモニタに表示します。
このIPアドレスは、クライアント(Raspberry Piなど)がHTTPリクエストを送信する際の宛先となるため、非常に重要です。
開発・運用時には、この情報を控えておくことで、接続先の設定ミスを防ぎます。
要するにこのステップは、
「通信回線がつながったら、玄関(ポート80)を開けて住所(IP)を掲示する」
という、外部連携のための最終セットアップです。
loop()
関数処理の流れ
setup()
が完了すると、Arduinoは電源が切れるまで loop()
関数を延々と繰り返します。
このプログラムでは、loop()
が担う役割は大きく分けて 2つ。
-
HTTPリクエストの受付
-
server.available()
で新しいクライアント(接続要求)があるかをチェックします。 - 接続があれば、そのHTTPリクエストを読み取り、内容を解析します。
- もし特定のパス(例:
/unko
)がリクエストされていれば、LED点滅のトリガーとなるフラグをセットします。 - 処理が終わったらクライアント接続を閉じ、次のリクエストに備えます。
-
-
LEDの点滅制御
- 上記のフラグが立っていれば、LEDマトリックスを一定パターンで点滅させます。
- 点滅回数や間隔はプログラム内で定義されており、通知として十分に目立つように設計されています。
- 点滅が完了するとフラグをリセットし、待機状態に戻ります。
このように、loop()
は
「外部からの合図を待ち受け → 合図があれば即座にLEDで通知」
というサイクルを高速に回し続けています。
1. HTTPサーバーのロジック
WiFiClient client = server.available();
if (client) {
// ...
}
loop()
が1回まわるごとに、まず server.available()
を呼び出して
新しいクライアント(外部デバイス)からの接続要求があるかを確認します。
この関数はノンブロッキングなので、接続がなければ即座に処理を返し、
プログラム全体の動作を止めることはありません。
接続があった場合のみ、if (client)
ブロック内の処理が実行されます。
-
リクエストの解析
受信したHTTPリクエストのメソッドとパスを確認します。
ここではPOST /unko
がターゲット。
これ以外のリクエストは無視します。 -
HTTPボディの読み取り
リクエスト本文から'1'
または'0'
を取得。
'1'
→ うんこ検出あり
'0'
→ うんこ検出なし
この値をグローバル変数unkoIsPresent
に反映します。 -
レスポンスの送信
クライアントにHTTP/1.1 200 OK
を返し、
リクエストが正常に処理されたことを通知します。 -
接続の終了
client.stop()
で接続を閉じ、リソースを解放します。
2. LED点滅ロジック
if (unkoIsPresent) {
// ...
} else if (matrixIsOn) {
// ...
}
HTTPサーバーの処理とは独立して、グローバル変数 unkoIsPresent
の状態に応じて
LEDマトリックスの表示を制御します。
これにより、検知通知と表示制御が並行して動作し、どちらかがもう一方をブロックすることがありません。
-
if (unkoIsPresent)
このフラグがtrue
の間は、LEDマトリックスを点滅させます。
ここで重要なのは、delay()
を使わないこと。
delay()
は指定時間プログラム全体を停止させてしまうため、
HTTPサーバーのリクエスト受付が止まってしまいます。
代わりにmillis()
(起動からの経過ミリ秒)を利用し、
一定間隔(matrixBlinkInterval
)ごとに点灯/消灯を切り替える
ノンブロッキング方式で点滅を実現しています。 -
else if (matrixIsOn)
この条件は、unkoIsPresent
がfalse
に変わった瞬間、
かつ直前までLEDが点滅していた場合に成立します。
ここでmatrix.clear()
を呼び出してLEDを完全に消灯し、
点滅状態を管理していたフラグをリセット。
これにより、システムは静的な待機状態に戻ります。
Arduino側のソフト動作
やることはとてもシンプルです。
Raspberry Piから「うんこ検知!」のHTTPリクエストを受け取ると、LEDマトリックスを点滅させて知らせます。
これにより、テレビを見ていても、ゲームに夢中でも、視界の端でチカチカ光って「事件発生!」を即座に認識できます。
下図のように、検知中はLEDが一定間隔で点滅し、検知が解除されるとすぐに消灯して待機状態に戻ります。
シンプルかつ確実な警報装置です。
さいごに
ここまでお付き合いいただき、ありがとうございました。
今回ご紹介した「うんち検知警報システム」は、ラズベリーパイでの映像解析と、Arduinoによるシンプルな通知機構を組み合わせることで、「見ていない時でも確実に気づける」 という安心感を実現しました。
もちろん、まだ改善の余地はあります。
白い犬の毛色問題や暗所での検知不可など、ハード面の課題も残っていますが、
それらも含めて「作って試して改善する」プロセスこそが、この手のDIYの醍醐味です。
もしこの記事を読んで「自分も作ってみたい」と思っていただけたなら、それが何よりの喜びです。
ペットのいたずら対策や、ちょっとした監視システムの自作に興味がある方のヒントやきっかけになれば幸いです。
こういった電子工作は、夏休みや冬休みの自由研究・工作にもぴったりです。
身近な課題をテーマにすれば、作って楽しい・使って便利な作品になります。
最後まで読んでくださった皆さん、本当にありがとうございました。
あなたの愛犬(や愛猫)が、今日も平和に過ごせますように。
作者について
この記事を書いた人は、普段はRaspberry Pi用HAT基板や周辺アクセサリを設計・販売しています。
電子工作やIoTプロジェクトに役立つアイテムも取り扱っていますので、
興味のある方はぜひこちらも覗いてみてください。