3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

家にインターフォンカメラが無いので自作した話

Posted at

はじめに

こんにちは!LONです。
12月にも入り、ますます寒くなってきましたね~

今回は、ラズパイを使ったiotのお話がメインになります。
学生の時、触ったなぁ~くらいでずっとiotには手を出してませんでした。
 なぜこのタイミングで手を出したかというと、セールスがしつこく毎週ピンポンするのでそのけん制として玄関にカメラを仕掛けることにしました!
 モノ自体は結構簡単に作れそうだったので、重い腰を上げて引き出しの奥からラズパイを引いてきたのです。

目次

  1. システム概要

  2. ハードウェア

  3. クラス図

  4. 実装

  5. 実行例

  6. まとめ

🎛️システム概要

今回作成したシステム図は以下の通りです

              ┌─────────────┐
              │  Picamera3  │
              └──────┬──────┘
 human detect(pir)   │      always watch
         ┌───────────┴──────────┐
         │                      │
    ┌────▼────┐            ┌────▼────┐
    │  main   │            │  lores  │
    │ (RGB)   │            │ (YUV)   │
    └────┬────┘            └────┬────┘
         │                      │
    ┌────▼────────┐         ┌───▼──────────┐
    │ H.264       │         │ MJPEG        │
    │ Encoder     │         │ Encoder      │
    └────┬────────┘         └───┬──────────┘
         │                      │
    ┌────▼────────┐         ┌───▼──────────┐
    │ video.h264  │         │ HTTP Stream  │
    │ (file)      │         │ (browser)    │
    └─────────────┘         └──────────────┘

二つの動画ファイルの違い

項目 MJPEG H.264
圧縮率 低い 高い
遅延 非常に低い 高い
CPU負荷 低い 高い(エンコード時)
帯域幅 大量に必要 少なくて済む
用途 ライブ配信、監視 録画、保存、配信

⚙ハードウェア

部品 型番・仕様 価格目安
Raspberry Pi Raspberry Pi 4 Modelo B 2019 Quad Core 64 bit WiFi Bluetooth (4 GB) 9000円
カメラモジュール InnoMaker IMX708 MIPIカメラモジュール Raspberry PIカメラモジュール3 3000円
PIRセンサー HC-SR501 400円
その他 ジャンパーワイヤー、ケースなど 1000円

合計:13400円

📉クラス図

小さくてすみません。
ざっと全体の関係を絵にしてみました。私は動体検知した動画をディスコードのDMで送信されるようにしました。この辺りの媒体は適宜好きなものに差し替えてください。

クラス図.png

🛠実装

以下のような順序で記載します。

  • StreamingOutPut
  • HTTPストリーミングハンドラ
  • PIRセンサー
  • 動体検知
  • カメラ起動
  • ストリーミングサーバ起動
  • レコーディング機能
  • カメラ停止
  • 監視カメラシステム起動

StreamingOutPut

# ===== ストリーミング関連 =====
class StreamingOutput(io.BufferedIOBase):
    def __init__(self, rotation=0):
        self.frame = None
        self.condition = Condition()
        self.rotation = rotation  # 90度回転を修正するため逆方向に回転

    def write(self, buf):
        with self.condition:
            # フレームを逆方向に回転(transpose=2の補正)
            try:
                nparr = np.frombuffer(buf, np.uint8)
                img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
                if img is not None and self.rotation == 90:
                    # 逆方向に回転(transpose=1で補正)
                    img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
                    ret, buffer = cv2.imencode('.jpg', img)
                    self.frame = buffer.tobytes()
                else:
                    self.frame = buf #この処理が重要!
            except:
                self.frame = buf
            self.condition.notify_all()

streaming_output = StreamingOutput()
概要

一言でいうと、StreamingHandlerSurveillanceCameraの仲介役。
PicameraはFileOutput形式でしかデータの出力ができない+HTTPストリーミングは別スレッドで動作する都合上、ライブ配信するには仲介者が必要。

このクラスが無いと、ストリーミング配信ができないんだなぁと思っておいてください。

class StreamingOutput(io.BufferedIOBase)

継承元:io.BufferedIOBase

  • Pythonの標準入出力クラス
  • ファイルのようにwrite()メソッドで書き込める
  • Picamera2が「ファイルに書き込むように」データを渡せる

__init__

変数 説明
self.frame bytes of None 最新のJPEG画像データ(バイナリ)
self.condition Condition スレッド間通信用
self.condition int 回転角度(0.90,180,270)

write

条件変数(Condition Variable)を使った排他制御

カメラスレッド          HTTPスレッド
     │                      │
     │ write()呼出           │
     ├─ ロック取得           │
     │  frame更新中...       │ wait()で待機
     │  ↓                   │
     │  notify_all()  ─────→│ 起床!
     ├─ ロック解放           │
     │                      ├─ ロック取得
     │                      │  frameを読取
     │                      └─ ロック解放

目的

  • 同時にframeを読み書きしないようにする
  • データの破損を防ぐ

HTTPストリーミングハンドラ

class StreamingHandler(BaseHTTPRequestHandler):
    """HTTPストリーミングハンドラ"""
    
    def do_GET(self):
        if self.path == '/':
            self.send_response(200)
            self.send_header('Content-Type', 'text/html; charset=utf-8')
            self.end_headers()
            html = '''
            <html><head>
            <meta charset="UTF-8">
            <title>監視カメラ</title>
            <style>
                body { background: #1a1a1a; color: white; font-family: sans-serif; 
                       text-align: center; padding: 20px; }
                img { max-width: 100%; border: 2px solid #333; border-radius: 8px; }
                h1 { color: #4CAF50; }
                .status { padding: 10px; background: #333; border-radius: 5px; 
                          display: inline-block; margin: 10px; }
            </style>
            </head>
            <body>
                <h1>🎥 監視カメラ ライブ配信</h1>
                <div class="status">PIRセンサー + Picamera3</div>
                <br>
                <img src="/stream.mjpg" />
            </body></html>
            '''
            self.wfile.write(html.encode('utf-8'))
        
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header('Age', 0)
            self.send_header('Cache-Control', 'no-cache, private')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
            self.end_headers()
            
            try:
                while True:
                    with streaming_output.condition:
                        streaming_output.condition.wait()
                        frame = streaming_output.frame
                    
                    if frame:
                        self.wfile.write(b'--FRAME\r\n')
                        self.wfile.write(b'Content-Type: image/jpeg\r\n')
                        self.wfile.write(b'Content-Length: ' + str(len(frame)).encode() + b'\r\n\r\n')
                        self.wfile.write(frame)
                        self.wfile.write(b'\r\n')
            except Exception as e:
                pass
        
        else:
            self.send_error(404)

アクセスの流れ

  • ➀パスが"/" → HTMLページを返す
  • ②パスが"/stream.mjpg" → 動画ストリームを返す
  • ③その他 → 404エラー

self.path == '/':トップページを指す
self.send_response(200):ブラウザに通信が成功したことを伝える
self.send_header:ヘッダー

self.end_headers():ヘッダー終了(この後にボディが続く)

メッセージボディ(HTMLのイメージ)

┌─────────────────────────┐
│ 🎥 監視カメラ ライブ配信 │ ←  h1タイトル
├─────────────────────────┤
│ PIRセンサー + Picamera3  │ ← ステータス表示
├─────────────────────────┤
│                         │
│   [カメラ映像]           │ ← <img src="/stream.mjpg" />
│                         │
└─────────────────────────┘

ストリーミング処理

while True:  # ← 永遠に画像を送り続ける
    新しいフレームを取得
    ブラウザに送信
    (また新しいフレームを取得)
    (また送信)
    ... (無限に繰り返す)

ここまで、来ればストリーミングの仕組みが分かるかと思いますが、画像データをブラウザに送信し続けることで動画のように見せています。

PIRセンサー

class PIRSensor:
    def __init__(self, pin=17):
        self.pin = pin
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.pin, GPIO.IN)
        time.sleep(60)  # センサー安定化待機
        print(f"PIRセンサー初期化完了 (GPIO{self.pin})")
    
    def is_detected(self):
        return GPIO.input(self.pin) == GPIO.HIGH
    
    def cleanup(self):
        GPIO.cleanup(self.pin)

def __init__(self, pin=17):pirセンサーを使うため、GPIO 17番をセット

def is_detected(self):設定したGPIOが立ち上がったら、True、そうでないとFalse

def cleanup(self):使用後のリソースを開放

動体検知

# ===== 動体検知クラス =====
class MotionDetector:
    def __init__(self, threshold=500, blur_size=21, dilate_iter=2):
        """
        動体検知クラスの初期化
        """
        self.threshold = threshold
        self.blur_size = blur_size
        self.dilate_iter = dilate_iter
        self.prev_frame = None
        
    def process_frame(self, frame):
        """フレーム前処理(複数の色空間に対応)"""
        # フレーム形式の判定
        if len(frame.shape) == 2:
            # 既にグレースケール
            gray = frame
        elif frame.shape[2] == 3:
            # RGB または BGR(通常はBGR)
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        elif frame.shape[2] == 4:
            # RGBA
            gray = cv2.cvtColor(frame, cv2.COLOR_RGBA2GRAY)
        else:
            # YUV420 など他の形式の場合
            print(f"警告: 未対応の色空間(チャンネル数: {frame.shape[2]}")
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        
        blurred = cv2.GaussianBlur(gray, (self.blur_size, self.blur_size), 0)
        return blurred
    
    def calculate_frame_diff(self, current_blurred):
        """前フレームとの差分計算"""
        if self.prev_frame is None:
            self.prev_frame = current_blurred
            return None
        
        frame_diff = cv2.absdiff(self.prev_frame, current_blurred)
        self.prev_frame = current_blurred
        return frame_diff
    
    def extract_motion_contours(self, frame_diff):
        """動体領域の輪郭抽出"""
        _, thresh = cv2.threshold(frame_diff, 30, 255, cv2.THRESH_BINARY)
        
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
        dilated = cv2.dilate(thresh, kernel, iterations=self.dilate_iter)
        
        contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, 
                                        cv2.CHAIN_APPROX_SIMPLE)
        return contours
    
    def is_motion_detected(self, contours):
        """動体検知判定(True/False を返す)"""
        if contours is None:
            return False
        
        total_area = sum(cv2.contourArea(c) for c in contours)
        return total_area > self.threshold
    
    def detect(self, frame):
        """
        統合検知処理(互換性のため)
        Returns: (motion_detected:bool)
        """
        try:
            blurred = self.process_frame(frame)
            frame_diff = self.calculate_frame_diff(blurred)
            
            if frame_diff is None:
                return False
            
            contours = self.extract_motion_contours(frame_diff)
            motion_detected = self.is_motion_detected(contours)
            
            return motion_detected
        except Exception as e:
            print(f"動体検知エラー: {e}")
            return False

概要

カメラ映像から動体(動いているもの)を検知するためのPythonクラス。OpenCVを使用して、フレーム間の差分を計算し、動きを検出する。

__init__

threshold:動体と判定する最小面積(ピクセル^2)。この値より大きい変化があれば動体と判定する。
blur_size:ノイズ除去に使用(奇数である必要があり)
dilate_iter:検出領域を拡大して穴を埋める。

process_frame

条件 意味 処理
len(frame.shape) == 2 グレースケール画像 そのまま使用
frame.shape[2] == 3 RGB/BGR画像(通常はBGR) cv2.COLOR_BGR2GRAYで変換
frame.shape[2] == 4 RGBA画像(透明度あり) cv2.COLOR_RGBA2GRAYで変換

なぜ、グレースケールに変換するのか?
・色情報は動体検知に不要
・計算量を削減(1チャンネルに限定できる)
・メモリ使用量も1/3に

blurred:カメラのノイズや小さな変化を除去。21×21ピクセルサイズで周囲をぼかすことで、照明の微妙な変化や圧縮ノイズを無視することができる。

ノイズあり画像(細かい変動)→ガウシアンぼかし(ノイズ除去)→なめらかな画像(本質的な変化のみ)

calculate_frame_diff

1.初回フレームの処理

  • 最初のフレームは比較対象がないため。現在のフレームを保存し、Noneで終了

2.フレーム間差分の計算
cv2.absdiffは絶対値差分を計算し、次回比較のために、現在のフレームを保存する。

計算例:

前フレーム: [100, 150, 200]
現フレーム: [120, 140, 200]
差分結果:   [ 20,  10,   0]  ← 絶対値なので常に正

extract_motion_contours

cv2.threshold:差分画像を白黒に変換する。

ピクセル値 判定 変換後
0-30 変化なし 0(黒)
31-255 変化あり 255(白)

変換イメージ

差分画像(グレー)  →  二値化  →  白黒画像
[5, 50, 200, 10]    threshold    [0, 255, 255, 0]

kerneldilated(膨張処理):検出領域の小さな穴を埋め、断片化を防ぐ

楕円カーネル(5×5)を使用
2回反復で白い領域を拡大

cv2.findContours:輪郭抽出

パラメータ 説明
cv2.RETR_EXTERNAL 最も外側の輪郭のみ取得(内側の穴は無視)
cv2.CHAIN_APPROX_SIMPLE 輪郭を圧縮して端点のみ保存(メモリ節約)

is_motion_detected

1.全輪郭の面積をcv2.contourAreaで計算し、合算する。

contours = [輪郭A, 輪郭B, 輪郭C]
面積 = [100, 200, 50] = 合計350ピクセル^2

2.閾値との比較

350 > 500(threshold) → False(動体なし)
600 > 500(threshold) → True(動体あり)

detect

入力フレーム
    ↓
process_frame(前処理)
    ↓
calculate_frame_diff(差分計算)
    ↓
extract_motion_contours(輪郭抽出)
    ↓
is_motion_detected(判定)
    ↓
(True/False)

カメラ起動

def start(self):
    self.picam2.start()
    time.sleep(2)
    # グローバルストリーミング出力に回転情報を設定
    streaming_output.rotation = self.camera_rotation
    self.picam2.start_encoder(self.mjpeg_encoder, FileOutput(streaming_output), name="lores")
    self.start_streaming_server()
    print("カメラ起動完了")

self.picam2.start():カメラ起動
self.mjpeg_encoder:MJPEGエンコーダー(動画を構成する各フレームを個別にJPEG画像として圧縮する動画形式)
FileOutput(streaming_output):エンコードされたデータの出力先
name="lores":低解像度ストリーム
self.start_streaming_server():ストリーミング用のサーバーを呼び出し

ストリーミングサーバ起動

def start_streaming_server(self):
    self.http_server = HTTPServer(('0.0.0.0', self.stream_port), StreamingHandler)
    thread = Thread(target=self.http_server.serve_forever, daemon=True)
    thread.start()
    print(f"ストリーミング開始: http://0.0.0.0:{self.stream_port}/")

Thread:httpサーバをスレッドで立てる
target:スレッド対象オブジェクト
daemon:Noneではない時、スレッドがdaemonかどうか明示的に設定する
thrad.start():threadを開始する

レコーディング機能

def start_recording(self):
    if self.is_recording:
        return
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = os.path.join(self.save_dir, f"pir_{timestamp}.h264")
    self.current_output = filename
    self.picam2.start_encoder(self.h264_encoder, filename, name="main")
    self.is_recording = True
    self.motion_start_time = time.time()
    print(f"\n[録画開始] {filename}")

h264:Advanced Video Coding(AVC)とも呼ばれ、現在最も一般的に使用される動画圧縮規格。ブルーレイ・オンデマンド・ライブTVを含むストリーミングサービスでH.264が使用されている。

start_encoder:引数にエンコーダーファイル、アウトプットされるファイルなどが来る。

今回は、エンコーダファイルとして、h264_encoder・アウトプット先のファイルをfilename・名前をmainとした。

カメラ停止

def stop(self):
    if self.is_recording:
        self.stop_recording()
    if self.http_server:
        self.http_server.shutdown()
    self.picam2.stop_encoder(self.mjpeg_encoder)
    self.picam2.stop()
    self.pir.cleanup()
    print("カメラ停止")

self.stop_recording():録画で動いているなら、h.264の撮影をストップし、mp4に変換して録画終了する。
self.http_server.shutdown():ストリーミング用のウェブサーバーが動いている場合はシャットダウンする。

監視カメラシステム起動

def run(self):
    print("=" * 70)
    print("監視カメラシステム起動(別プロセス)")     
    print(f"動画保存先: {self.save_dir}")
    print("=" * 70)
        
    try:
        self.start()

        start_time = time.time()
            
        while True:
            current_time = time.time()
            elapsed = current_time - start_time
                
            pir_detected = self.pir.is_detected()
            motion_detected = self.motion_detector.detect(self.picam2.capture_array("lores"))
            
            if pir_detected and motion_detected:
                print("pir_detected: ",pir_detected)
                print("motion_detected: ",motion_detected)
                    
                self.last_detection_time = current_time
                if not self.is_recording:
                    self.start_recording()
            elif self.is_recording:
                if self.last_detection_time and current_time - self.last_detection_time > self.recording_timeout:
                    self.stop_recording()
                
            status = "録画中" if self.is_recording else "待機中"
            pir_status = "検知" if pir_detected else "なし"
                
            time.sleep(0.1)

    except KeyboardInterrupt:
        print("\n終了処理中...")
    except Exception as e:
        print(f"エラー: {e}")
    finally:
        self.stop()

self.start()class SurveillanceCamera内のstart()を呼び出し、カメラを起動
elapsed = current_time - start_time:経過時間を算出
pir_detected:pirセンサーの反応結果
motion_detected:動きを検知

🕹️実行例

camera_process = Process(
    target=lambda: SurveillanceCamera(
        save_dir="surveillance_videos",
        pir_pin=18,
        stream_port=8080,
        rotation=90  # 時計回りに90度回転
    ).run()
)
camera_process.daemon = True
camera_process.start()

まとめ

今回は自分の理解+初心者向けに丁寧に説明しました。
ただ、課題として、ウェブアクセス時が遅いなど、人感センサーが室外機の回転にも反応したりがあるので、もう少しブラッシュアップできるところはありそうです。

次回は、自分の開発環境のインフラ整備について書こうと思います。では、また~

参考文献

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?