0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SwitchBot + FastAPI + Flutter で家庭の電力消費を可視化するWebアプリ構築(その2)

Last updated at Posted at 2026-01-15

FastAPI で IoT データ収集バックエンドを構築する - 定期実行とエラーハンドリング

📝 はじめに

SwitchBot + FastAPI + Flutter で家庭の電力消費を可視化するWebアプリ構築(その1) #Python - Qiita
の続きになります。

:white_check_mark: この記事で書くこと

  • FastAPI を使った REST API の設計と実装
  • schedule ライブラリによる 5 分間隔のデータ収集
  • バックグラウンドスレッドでの並行処理

:white_check_mark: 対象読者

  • FastAPI を使って API サーバーを構築したい方
  • IoT データの定期収集を実装したい方
  • Python での並行処理に興味がある方

:white_check_mark: 前提

  • Python 3.9 以上
  • FastAPI / REST API の基本知識

🎯 背景・動機

なぜ FastAPI を選んだのか

:thinking: 課題
SwitchBot から 5 分ごとに電力データを自動収集し、フロントエンドに API として提供したい。

:bulb: Flask ではなく FastAPI を選んだ理由

項目 Flask FastAPI
速度 標準的 高速(3〜10倍)
型検証 手動実装 自動(Pydantic)
非同期 追加設定必要 ネイティブ対応
ドキュメント 別途作成 自動生成

:point_right: Swagger UI が自動生成されるのが決め手でした。


🛠️ 手順/解説

アプリケーションの基本設定

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI(
    title="SwitchBot Power Monitor API",
    version="1.0.0"
)

# CORS設定(フロントエンドからのアクセス許可)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],        # 本番は制限推奨
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

:warning: CORS について

  • フロントエンド(localhost:3000)からバックエンド(localhost:8000)はクロスオリジン
  • CORS 設定なしだとブラウザがリクエストをブロック
  • 本番環境では allow_origins を特定ドメインに制限

API エンドポイント設計

メソッド パス 説明 用途
GET / API 状態確認 死活監視
GET /health 詳細ヘルスチェック サービス状態確認
GET /devices デバイス一覧 デバイス管理
GET /data/latest 最新データ取得 グラフ表示
POST /data/collect 手動データ収集 テスト・デバッグ

エンドポイント実装

ルートエンドポイント

from datetime import datetime

@app.get("/")
async def root():
    return {
        "message": "SwitchBot Power Monitor API",
        "status": "running",
        "timestamp": datetime.now().isoformat()
    }

ヘルスチェック

@app.get("/health")
async def health_check():
    health_status = {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "services": {
            "switchbot_api": "connected" if switchbot_client else "not_initialized",
            "supabase_db": "connected" if supabase_client else "not_initialized",
            "power_meter_devices": len(power_meter_devices)
        }
    }
    return health_status

:white_check_mark: ヘルスチェックで確認すること

  • 外部 API への接続状態
  • データベースへの接続状態
  • 検出されたデバイス数

データ取得エンドポイント

from fastapi import HTTPException

@app.get("/data/latest")
async def get_latest_data(limit: int = 10):
    """最新の電力データを取得"""
    if not supabase_client:
        raise HTTPException(
            status_code=500,
            detail="Supabaseクライアントが初期化されていません"
        )

    try:
        data = supabase_client.fetch_table_data(
            "power_monitor_data",
            limit=limit,
            order_by="created_at",
            ascending=False
        )
        return {"data": data}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

:point_right: limit パラメータでデータ件数を制御できます。

定期データ収集の実装

schedule ライブラリの使用

import schedule
import time

def schedule_data_collection():
    """定期的なデータ収集をスケジュール"""
    # 5分ごとに実行
    schedule.every(5).minutes.do(collect_and_store_power_data)

    while True:
        schedule.run_pending()
        time.sleep(1)

:white_check_mark: schedule の記法例

schedule.every(10).seconds.do(job)           # 10秒ごと
schedule.every(5).minutes.do(job)            # 5分ごと
schedule.every().hour.do(job)                # 1時間ごと
schedule.every().day.at("10:30").do(job)     # 毎日10:30

データ収集処理

def collect_and_store_power_data():
    """SwitchBotから電力データを収集してSupabaseに保存"""
    print(f"電力データ収集開始: {datetime.now()}")

    for device in power_meter_devices:
        try:
            # デバイスから電力データを取得
            power_data = switchbot_client.get_power_meter_data(device['deviceId'])

            if power_data:
                # レコード作成
                record = {
                    'device_id': device['deviceId'],
                    'device_name': device['deviceName'],
                    'power_consumption': power_data.get('power', 0),
                    'voltage': power_data.get('voltage', 0),
                    'created_at': datetime.now().isoformat(),
                }

                # Supabase に保存
                supabase_client.insert_data("power_monitor_data", record)
                print(f"保存成功: {device['deviceName']} - {power_data.get('power')}W")

        except Exception as e:
            # 個別デバイスのエラーは他に影響させない
            print(f"エラー: {device['deviceName']} - {e}")
            continue

:warning: 重要な設計ポイント

  • 1つのデバイスでエラーが発生しても、他のデバイスの処理を継続
  • continue で次のデバイスへ

バックグラウンドスレッドでの実行

import threading

def main():
    # クライアント初期化
    if not initialize_clients():
        return

    # 初回データ収集
    collect_and_store_power_data()

    # バックグラウンドでスケジュール実行
    schedule_thread = threading.Thread(
        target=schedule_data_collection,
        daemon=True  # ← 重要!
    )
    schedule_thread.start()

    # FastAPI サーバー起動
    uvicorn.run(app, host="0.0.0.0", port=8000)

:white_check_mark: daemon=True の重要性

  • デーモンスレッドはメインスレッド終了時に自動終了
  • これがないと Ctrl+C でサーバーを停止してもスレッドが残る

エラーハンドリング

HTTPException の使い分け

ステータス 意味 使用場面
400 Bad Request パラメータ不正
404 Not Found リソースが見つからない
500 Internal Server Error サーバー内部エラー
from fastapi import HTTPException

@app.get("/devices/{device_id}/power")
async def get_power_data(device_id: str):
    if not switchbot_client:
        raise HTTPException(status_code=500, detail="クライアント未初期化")

    power_data = switchbot_client.get_power_meter_data(device_id)
    if power_data is None:
        raise HTTPException(status_code=404, detail="デバイスが見つかりません")

    return power_data

📊 実行結果 & コツ

起動時のログ

SwitchBot Power Monitor API を開始しています...
クライアントの初期化を開始...
✅ SwitchBot API接続テスト成功
   検出されたデバイス数: 3
電力測定デバイス 2 台を発見
- SwitchBot サーバー (ID: XXXX, タイプ: Plug Mini (JP))
- SwitchBot ディスプレイ (ID: YYYY, タイプ: Plug Mini (JP))
初回データ収集を実行...
データ保存成功: SwitchBot サーバー - 45.2W
データ保存成功: SwitchBot ディスプレイ - 23.1W
定期データ収集スレッドを開始しました
サーバー起動完了: http://localhost:8000

Swagger UI でテスト

FastAPI は自動で API ドキュメントを生成します。

  • Swagger UI: http://localhost:8000/docs
  • ReDoc: http://localhost:8000/redoc

:white_check_mark: Swagger UI でできること

  • エンドポイント一覧の確認
  • リクエスト/レスポンス形式の確認
  • API の直接実行(Try it out)

つまずきポイントと回避策

:x: つまずき1: スレッドが終了しない

:white_check_mark: 回避策: daemon=True を設定

threading.Thread(target=..., daemon=True)

:x: つまずき2: 1つのデバイスエラーで全停止

:white_check_mark: 回避策: try-except で個別にハンドリング

for device in devices:
    try:
        process(device)
    except Exception as e:
        print(f"Error: {e}")
        continue  # 次のデバイスへ

:x: つまずき3: CORS エラー

:white_check_mark: 回避策: CORSMiddleware を追加

app.add_middleware(CORSMiddleware, allow_origins=["*"], ...)

📝 まとめ

学んだこと

:white_check_mark: FastAPI の利点

  • Swagger UI 自動生成で開発・テストが楽
  • async/await でスケーラブルな設計
  • HTTPException で適切なエラーレスポンス

:white_check_mark: 定期実行の設計

  • schedule + threading でシンプルに実現
  • daemon スレッドでメインプロセスと連動
  • 個別エラーを全体に波及させない設計
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?