はじめに
こんにちは!LONです。
12月にも入り、ますます寒くなってきましたね~
今回は、ラズパイを使ったiotのお話がメインになります。
学生の時、触ったなぁ~くらいでずっとiotには手を出してませんでした。
なぜこのタイミングで手を出したかというと、セールスがしつこく毎週ピンポンするのでそのけん制として玄関にカメラを仕掛けることにしました!
モノ自体は結構簡単に作れそうだったので、重い腰を上げて引き出しの奥からラズパイを引いてきたのです。
目次
-
システム概要
-
ハードウェア
-
クラス図
-
実装
-
実行例
-
まとめ
🎛️システム概要
今回作成したシステム図は以下の通りです
┌─────────────┐
│ Picamera3 │
└──────┬──────┘
human detect(pir) │ always watch
┌───────────┴──────────┐
│ │
┌────▼────┐ ┌────▼────┐
│ main │ │ lores │
│ (RGB) │ │ (YUV) │
└────┬────┘ └────┬────┘
│ │
┌────▼────────┐ ┌───▼──────────┐
│ H.264 │ │ MJPEG │
│ Encoder │ │ Encoder │
└────┬────────┘ └───┬──────────┘
│ │
┌────▼────────┐ ┌───▼──────────┐
│ video.h264 │ │ HTTP Stream │
│ (file) │ │ (browser) │
└─────────────┘ └──────────────┘
二つの動画ファイルの違い
| 項目 | MJPEG | H.264 |
|---|---|---|
| 圧縮率 | 低い | 高い |
| 遅延 | 非常に低い | 高い |
| CPU負荷 | 低い | 高い(エンコード時) |
| 帯域幅 | 大量に必要 | 少なくて済む |
| 用途 | ライブ配信、監視 | 録画、保存、配信 |
⚙ハードウェア
| 部品 | 型番・仕様 | 価格目安 |
|---|---|---|
| Raspberry Pi | Raspberry Pi 4 Modelo B 2019 Quad Core 64 bit WiFi Bluetooth (4 GB) | 9000円 |
| カメラモジュール | InnoMaker IMX708 MIPIカメラモジュール Raspberry PIカメラモジュール3 | 3000円 |
| PIRセンサー | HC-SR501 | 400円 |
| その他 | ジャンパーワイヤー、ケースなど | 1000円 |
合計:13400円
📉クラス図
小さくてすみません。
ざっと全体の関係を絵にしてみました。私は動体検知した動画をディスコードのDMで送信されるようにしました。この辺りの媒体は適宜好きなものに差し替えてください。
🛠実装
以下のような順序で記載します。
- StreamingOutPut
- HTTPストリーミングハンドラ
- PIRセンサー
- 動体検知
- カメラ起動
- ストリーミングサーバ起動
- レコーディング機能
- カメラ停止
- 監視カメラシステム起動
StreamingOutPut
# ===== ストリーミング関連 =====
class StreamingOutput(io.BufferedIOBase):
def __init__(self, rotation=0):
self.frame = None
self.condition = Condition()
self.rotation = rotation # 90度回転を修正するため逆方向に回転
def write(self, buf):
with self.condition:
# フレームを逆方向に回転(transpose=2の補正)
try:
nparr = np.frombuffer(buf, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is not None and self.rotation == 90:
# 逆方向に回転(transpose=1で補正)
img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
ret, buffer = cv2.imencode('.jpg', img)
self.frame = buffer.tobytes()
else:
self.frame = buf #この処理が重要!
except:
self.frame = buf
self.condition.notify_all()
streaming_output = StreamingOutput()
概要
一言でいうと、StreamingHandlerとSurveillanceCameraの仲介役。
PicameraはFileOutput形式でしかデータの出力ができない+HTTPストリーミングは別スレッドで動作する都合上、ライブ配信するには仲介者が必要。
このクラスが無いと、ストリーミング配信ができないんだなぁと思っておいてください。
class StreamingOutput(io.BufferedIOBase)
継承元:io.BufferedIOBase
- Pythonの標準入出力クラス
- ファイルのように
write()メソッドで書き込める - Picamera2が「ファイルに書き込むように」データを渡せる
__init__
| 変数 | 型 | 説明 |
|---|---|---|
self.frame |
bytes of None | 最新のJPEG画像データ(バイナリ) |
self.condition |
Condition | スレッド間通信用 |
self.condition |
int | 回転角度(0.90,180,270) |
write
条件変数(Condition Variable)を使った排他制御
カメラスレッド HTTPスレッド
│ │
│ write()呼出 │
├─ ロック取得 │
│ frame更新中... │ wait()で待機
│ ↓ │
│ notify_all() ─────→│ 起床!
├─ ロック解放 │
│ ├─ ロック取得
│ │ frameを読取
│ └─ ロック解放
目的
- 同時に
frameを読み書きしないようにする - データの破損を防ぐ
HTTPストリーミングハンドラ
class StreamingHandler(BaseHTTPRequestHandler):
"""HTTPストリーミングハンドラ"""
def do_GET(self):
if self.path == '/':
self.send_response(200)
self.send_header('Content-Type', 'text/html; charset=utf-8')
self.end_headers()
html = '''
<html><head>
<meta charset="UTF-8">
<title>監視カメラ</title>
<style>
body { background: #1a1a1a; color: white; font-family: sans-serif;
text-align: center; padding: 20px; }
img { max-width: 100%; border: 2px solid #333; border-radius: 8px; }
h1 { color: #4CAF50; }
.status { padding: 10px; background: #333; border-radius: 5px;
display: inline-block; margin: 10px; }
</style>
</head>
<body>
<h1>🎥 監視カメラ ライブ配信</h1>
<div class="status">PIRセンサー + Picamera3</div>
<br>
<img src="/stream.mjpg" />
</body></html>
'''
self.wfile.write(html.encode('utf-8'))
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with streaming_output.condition:
streaming_output.condition.wait()
frame = streaming_output.frame
if frame:
self.wfile.write(b'--FRAME\r\n')
self.wfile.write(b'Content-Type: image/jpeg\r\n')
self.wfile.write(b'Content-Length: ' + str(len(frame)).encode() + b'\r\n\r\n')
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
pass
else:
self.send_error(404)
アクセスの流れ
- ➀パスが"/" → HTMLページを返す
- ②パスが"/stream.mjpg" → 動画ストリームを返す
- ③その他 → 404エラー
self.path == '/':トップページを指す
self.send_response(200):ブラウザに通信が成功したことを伝える
self.send_header:ヘッダー
self.end_headers():ヘッダー終了(この後にボディが続く)
メッセージボディ(HTMLのイメージ)
┌─────────────────────────┐
│ 🎥 監視カメラ ライブ配信 │ ← h1タイトル
├─────────────────────────┤
│ PIRセンサー + Picamera3 │ ← ステータス表示
├─────────────────────────┤
│ │
│ [カメラ映像] │ ← <img src="/stream.mjpg" />
│ │
└─────────────────────────┘
ストリーミング処理
while True: # ← 永遠に画像を送り続ける
新しいフレームを取得
ブラウザに送信
(また新しいフレームを取得)
(また送信)
... (無限に繰り返す)
ここまで、来ればストリーミングの仕組みが分かるかと思いますが、画像データをブラウザに送信し続けることで動画のように見せています。
PIRセンサー
class PIRSensor:
def __init__(self, pin=17):
self.pin = pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin, GPIO.IN)
time.sleep(60) # センサー安定化待機
print(f"PIRセンサー初期化完了 (GPIO{self.pin})")
def is_detected(self):
return GPIO.input(self.pin) == GPIO.HIGH
def cleanup(self):
GPIO.cleanup(self.pin)
def __init__(self, pin=17):pirセンサーを使うため、GPIO 17番をセット
def is_detected(self):設定したGPIOが立ち上がったら、True、そうでないとFalse
def cleanup(self):使用後のリソースを開放
動体検知
# ===== 動体検知クラス =====
class MotionDetector:
def __init__(self, threshold=500, blur_size=21, dilate_iter=2):
"""
動体検知クラスの初期化
"""
self.threshold = threshold
self.blur_size = blur_size
self.dilate_iter = dilate_iter
self.prev_frame = None
def process_frame(self, frame):
"""フレーム前処理(複数の色空間に対応)"""
# フレーム形式の判定
if len(frame.shape) == 2:
# 既にグレースケール
gray = frame
elif frame.shape[2] == 3:
# RGB または BGR(通常はBGR)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
elif frame.shape[2] == 4:
# RGBA
gray = cv2.cvtColor(frame, cv2.COLOR_RGBA2GRAY)
else:
# YUV420 など他の形式の場合
print(f"警告: 未対応の色空間(チャンネル数: {frame.shape[2]})")
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (self.blur_size, self.blur_size), 0)
return blurred
def calculate_frame_diff(self, current_blurred):
"""前フレームとの差分計算"""
if self.prev_frame is None:
self.prev_frame = current_blurred
return None
frame_diff = cv2.absdiff(self.prev_frame, current_blurred)
self.prev_frame = current_blurred
return frame_diff
def extract_motion_contours(self, frame_diff):
"""動体領域の輪郭抽出"""
_, thresh = cv2.threshold(frame_diff, 30, 255, cv2.THRESH_BINARY)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
dilated = cv2.dilate(thresh, kernel, iterations=self.dilate_iter)
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)
return contours
def is_motion_detected(self, contours):
"""動体検知判定(True/False を返す)"""
if contours is None:
return False
total_area = sum(cv2.contourArea(c) for c in contours)
return total_area > self.threshold
def detect(self, frame):
"""
統合検知処理(互換性のため)
Returns: (motion_detected:bool)
"""
try:
blurred = self.process_frame(frame)
frame_diff = self.calculate_frame_diff(blurred)
if frame_diff is None:
return False
contours = self.extract_motion_contours(frame_diff)
motion_detected = self.is_motion_detected(contours)
return motion_detected
except Exception as e:
print(f"動体検知エラー: {e}")
return False
概要
カメラ映像から動体(動いているもの)を検知するためのPythonクラス。OpenCVを使用して、フレーム間の差分を計算し、動きを検出する。
__init__
threshold:動体と判定する最小面積(ピクセル^2)。この値より大きい変化があれば動体と判定する。
blur_size:ノイズ除去に使用(奇数である必要があり)
dilate_iter:検出領域を拡大して穴を埋める。
process_frame
| 条件 | 意味 | 処理 |
|---|---|---|
len(frame.shape) == 2 |
グレースケール画像 | そのまま使用 |
frame.shape[2] == 3 |
RGB/BGR画像(通常はBGR) |
cv2.COLOR_BGR2GRAYで変換 |
frame.shape[2] == 4 |
RGBA画像(透明度あり) |
cv2.COLOR_RGBA2GRAYで変換 |
なぜ、グレースケールに変換するのか?
・色情報は動体検知に不要
・計算量を削減(1チャンネルに限定できる)
・メモリ使用量も1/3に
blurred:カメラのノイズや小さな変化を除去。21×21ピクセルサイズで周囲をぼかすことで、照明の微妙な変化や圧縮ノイズを無視することができる。
ノイズあり画像(細かい変動)→ガウシアンぼかし(ノイズ除去)→なめらかな画像(本質的な変化のみ)
calculate_frame_diff
1.初回フレームの処理
- 最初のフレームは比較対象がないため。現在のフレームを保存し、Noneで終了
2.フレーム間差分の計算
cv2.absdiffは絶対値差分を計算し、次回比較のために、現在のフレームを保存する。
計算例:
前フレーム: [100, 150, 200]
現フレーム: [120, 140, 200]
差分結果: [ 20, 10, 0] ← 絶対値なので常に正
extract_motion_contours
cv2.threshold:差分画像を白黒に変換する。
| ピクセル値 | 判定 | 変換後 |
|---|---|---|
| 0-30 | 変化なし | 0(黒) |
| 31-255 | 変化あり | 255(白) |
変換イメージ
差分画像(グレー) → 二値化 → 白黒画像
[5, 50, 200, 10] threshold [0, 255, 255, 0]
kernel・dilated(膨張処理):検出領域の小さな穴を埋め、断片化を防ぐ
楕円カーネル(5×5)を使用
2回反復で白い領域を拡大
cv2.findContours:輪郭抽出
| パラメータ | 説明 |
|---|---|
cv2.RETR_EXTERNAL |
最も外側の輪郭のみ取得(内側の穴は無視) |
cv2.CHAIN_APPROX_SIMPLE |
輪郭を圧縮して端点のみ保存(メモリ節約) |
is_motion_detected
1.全輪郭の面積をcv2.contourAreaで計算し、合算する。
contours = [輪郭A, 輪郭B, 輪郭C]
面積 = [100, 200, 50] = 合計350ピクセル^2
2.閾値との比較
350 > 500(threshold) → False(動体なし)
600 > 500(threshold) → True(動体あり)
detect
入力フレーム
↓
process_frame(前処理)
↓
calculate_frame_diff(差分計算)
↓
extract_motion_contours(輪郭抽出)
↓
is_motion_detected(判定)
↓
(True/False)
カメラ起動
def start(self):
self.picam2.start()
time.sleep(2)
# グローバルストリーミング出力に回転情報を設定
streaming_output.rotation = self.camera_rotation
self.picam2.start_encoder(self.mjpeg_encoder, FileOutput(streaming_output), name="lores")
self.start_streaming_server()
print("カメラ起動完了")
self.picam2.start():カメラ起動
self.mjpeg_encoder:MJPEGエンコーダー(動画を構成する各フレームを個別にJPEG画像として圧縮する動画形式)
FileOutput(streaming_output):エンコードされたデータの出力先
name="lores":低解像度ストリーム
self.start_streaming_server():ストリーミング用のサーバーを呼び出し
ストリーミングサーバ起動
def start_streaming_server(self):
self.http_server = HTTPServer(('0.0.0.0', self.stream_port), StreamingHandler)
thread = Thread(target=self.http_server.serve_forever, daemon=True)
thread.start()
print(f"ストリーミング開始: http://0.0.0.0:{self.stream_port}/")
Thread:httpサーバをスレッドで立てる
target:スレッド対象オブジェクト
daemon:Noneではない時、スレッドがdaemonかどうか明示的に設定する
thrad.start():threadを開始する
レコーディング機能
def start_recording(self):
if self.is_recording:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.save_dir, f"pir_{timestamp}.h264")
self.current_output = filename
self.picam2.start_encoder(self.h264_encoder, filename, name="main")
self.is_recording = True
self.motion_start_time = time.time()
print(f"\n[録画開始] {filename}")
h264:Advanced Video Coding(AVC)とも呼ばれ、現在最も一般的に使用される動画圧縮規格。ブルーレイ・オンデマンド・ライブTVを含むストリーミングサービスでH.264が使用されている。
start_encoder:引数にエンコーダーファイル、アウトプットされるファイルなどが来る。
今回は、エンコーダファイルとして、h264_encoder・アウトプット先のファイルをfilename・名前をmainとした。
カメラ停止
def stop(self):
if self.is_recording:
self.stop_recording()
if self.http_server:
self.http_server.shutdown()
self.picam2.stop_encoder(self.mjpeg_encoder)
self.picam2.stop()
self.pir.cleanup()
print("カメラ停止")
self.stop_recording():録画で動いているなら、h.264の撮影をストップし、mp4に変換して録画終了する。
self.http_server.shutdown():ストリーミング用のウェブサーバーが動いている場合はシャットダウンする。
監視カメラシステム起動
def run(self):
print("=" * 70)
print("監視カメラシステム起動(別プロセス)")
print(f"動画保存先: {self.save_dir}")
print("=" * 70)
try:
self.start()
start_time = time.time()
while True:
current_time = time.time()
elapsed = current_time - start_time
pir_detected = self.pir.is_detected()
motion_detected = self.motion_detector.detect(self.picam2.capture_array("lores"))
if pir_detected and motion_detected:
print("pir_detected: ",pir_detected)
print("motion_detected: ",motion_detected)
self.last_detection_time = current_time
if not self.is_recording:
self.start_recording()
elif self.is_recording:
if self.last_detection_time and current_time - self.last_detection_time > self.recording_timeout:
self.stop_recording()
status = "録画中" if self.is_recording else "待機中"
pir_status = "検知" if pir_detected else "なし"
time.sleep(0.1)
except KeyboardInterrupt:
print("\n終了処理中...")
except Exception as e:
print(f"エラー: {e}")
finally:
self.stop()
self.start():class SurveillanceCamera内のstart()を呼び出し、カメラを起動
elapsed = current_time - start_time:経過時間を算出
pir_detected:pirセンサーの反応結果
motion_detected:動きを検知
🕹️実行例
camera_process = Process(
target=lambda: SurveillanceCamera(
save_dir="surveillance_videos",
pir_pin=18,
stream_port=8080,
rotation=90 # 時計回りに90度回転
).run()
)
camera_process.daemon = True
camera_process.start()
まとめ
今回は自分の理解+初心者向けに丁寧に説明しました。
ただ、課題として、ウェブアクセス時が遅いなど、人感センサーが室外機の回転にも反応したりがあるので、もう少しブラッシュアップできるところはありそうです。
次回は、自分の開発環境のインフラ整備について書こうと思います。では、また~
参考文献
