FastAPI で IoT データ収集バックエンドを構築する - 定期実行とエラーハンドリング
📝 はじめに
SwitchBot + FastAPI + Flutter で家庭の電力消費を可視化するWebアプリ構築(その1) #Python - Qiita
の続きになります。
この記事で書くこと
- FastAPI を使った REST API の設計と実装
-
scheduleライブラリによる 5 分間隔のデータ収集 - バックグラウンドスレッドでの並行処理
対象読者
- FastAPI を使って API サーバーを構築したい方
- IoT データの定期収集を実装したい方
- Python での並行処理に興味がある方
前提
- Python 3.9 以上
- FastAPI / REST API の基本知識
🎯 背景・動機
なぜ FastAPI を選んだのか
課題
SwitchBot から 5 分ごとに電力データを自動収集し、フロントエンドに API として提供したい。
Flask ではなく FastAPI を選んだ理由
| 項目 | Flask | FastAPI |
|---|---|---|
| 速度 | 標準的 | 高速(3〜10倍) |
| 型検証 | 手動実装 | 自動(Pydantic) |
| 非同期 | 追加設定必要 | ネイティブ対応 |
| ドキュメント | 別途作成 | 自動生成 |
Swagger UI が自動生成されるのが決め手でした。
🛠️ 手順/解説
アプリケーションの基本設定
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="SwitchBot Power Monitor API",
version="1.0.0"
)
# CORS設定(フロントエンドからのアクセス許可)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 本番は制限推奨
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
CORS について
- フロントエンド(
localhost:3000)からバックエンド(localhost:8000)はクロスオリジン - CORS 設定なしだとブラウザがリクエストをブロック
- 本番環境では
allow_originsを特定ドメインに制限
API エンドポイント設計
| メソッド | パス | 説明 | 用途 |
|---|---|---|---|
| GET | / |
API 状態確認 | 死活監視 |
| GET | /health |
詳細ヘルスチェック | サービス状態確認 |
| GET | /devices |
デバイス一覧 | デバイス管理 |
| GET | /data/latest |
最新データ取得 | グラフ表示 |
| POST | /data/collect |
手動データ収集 | テスト・デバッグ |
エンドポイント実装
ルートエンドポイント
from datetime import datetime
@app.get("/")
async def root():
return {
"message": "SwitchBot Power Monitor API",
"status": "running",
"timestamp": datetime.now().isoformat()
}
ヘルスチェック
@app.get("/health")
async def health_check():
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"switchbot_api": "connected" if switchbot_client else "not_initialized",
"supabase_db": "connected" if supabase_client else "not_initialized",
"power_meter_devices": len(power_meter_devices)
}
}
return health_status
ヘルスチェックで確認すること
- 外部 API への接続状態
- データベースへの接続状態
- 検出されたデバイス数
データ取得エンドポイント
from fastapi import HTTPException
@app.get("/data/latest")
async def get_latest_data(limit: int = 10):
"""最新の電力データを取得"""
if not supabase_client:
raise HTTPException(
status_code=500,
detail="Supabaseクライアントが初期化されていません"
)
try:
data = supabase_client.fetch_table_data(
"power_monitor_data",
limit=limit,
order_by="created_at",
ascending=False
)
return {"data": data}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
limit パラメータでデータ件数を制御できます。
定期データ収集の実装
schedule ライブラリの使用
import schedule
import time
def schedule_data_collection():
"""定期的なデータ収集をスケジュール"""
# 5分ごとに実行
schedule.every(5).minutes.do(collect_and_store_power_data)
while True:
schedule.run_pending()
time.sleep(1)
schedule の記法例
schedule.every(10).seconds.do(job) # 10秒ごと
schedule.every(5).minutes.do(job) # 5分ごと
schedule.every().hour.do(job) # 1時間ごと
schedule.every().day.at("10:30").do(job) # 毎日10:30
データ収集処理
def collect_and_store_power_data():
"""SwitchBotから電力データを収集してSupabaseに保存"""
print(f"電力データ収集開始: {datetime.now()}")
for device in power_meter_devices:
try:
# デバイスから電力データを取得
power_data = switchbot_client.get_power_meter_data(device['deviceId'])
if power_data:
# レコード作成
record = {
'device_id': device['deviceId'],
'device_name': device['deviceName'],
'power_consumption': power_data.get('power', 0),
'voltage': power_data.get('voltage', 0),
'created_at': datetime.now().isoformat(),
}
# Supabase に保存
supabase_client.insert_data("power_monitor_data", record)
print(f"保存成功: {device['deviceName']} - {power_data.get('power')}W")
except Exception as e:
# 個別デバイスのエラーは他に影響させない
print(f"エラー: {device['deviceName']} - {e}")
continue
重要な設計ポイント
- 1つのデバイスでエラーが発生しても、他のデバイスの処理を継続
-
continueで次のデバイスへ
バックグラウンドスレッドでの実行
import threading
def main():
# クライアント初期化
if not initialize_clients():
return
# 初回データ収集
collect_and_store_power_data()
# バックグラウンドでスケジュール実行
schedule_thread = threading.Thread(
target=schedule_data_collection,
daemon=True # ← 重要!
)
schedule_thread.start()
# FastAPI サーバー起動
uvicorn.run(app, host="0.0.0.0", port=8000)
daemon=True の重要性
- デーモンスレッドはメインスレッド終了時に自動終了
- これがないと Ctrl+C でサーバーを停止してもスレッドが残る
エラーハンドリング
HTTPException の使い分け
| ステータス | 意味 | 使用場面 |
|---|---|---|
| 400 | Bad Request | パラメータ不正 |
| 404 | Not Found | リソースが見つからない |
| 500 | Internal Server Error | サーバー内部エラー |
from fastapi import HTTPException
@app.get("/devices/{device_id}/power")
async def get_power_data(device_id: str):
if not switchbot_client:
raise HTTPException(status_code=500, detail="クライアント未初期化")
power_data = switchbot_client.get_power_meter_data(device_id)
if power_data is None:
raise HTTPException(status_code=404, detail="デバイスが見つかりません")
return power_data
📊 実行結果 & コツ
起動時のログ
SwitchBot Power Monitor API を開始しています...
クライアントの初期化を開始...
✅ SwitchBot API接続テスト成功
検出されたデバイス数: 3
電力測定デバイス 2 台を発見
- SwitchBot サーバー (ID: XXXX, タイプ: Plug Mini (JP))
- SwitchBot ディスプレイ (ID: YYYY, タイプ: Plug Mini (JP))
初回データ収集を実行...
データ保存成功: SwitchBot サーバー - 45.2W
データ保存成功: SwitchBot ディスプレイ - 23.1W
定期データ収集スレッドを開始しました
サーバー起動完了: http://localhost:8000
Swagger UI でテスト
FastAPI は自動で API ドキュメントを生成します。
-
Swagger UI:
http://localhost:8000/docs -
ReDoc:
http://localhost:8000/redoc
Swagger UI でできること
- エンドポイント一覧の確認
- リクエスト/レスポンス形式の確認
- API の直接実行(Try it out)
つまずきポイントと回避策
つまずき1: スレッドが終了しない
回避策: daemon=True を設定
threading.Thread(target=..., daemon=True)
つまずき2: 1つのデバイスエラーで全停止
回避策: try-except で個別にハンドリング
for device in devices:
try:
process(device)
except Exception as e:
print(f"Error: {e}")
continue # 次のデバイスへ
つまずき3: CORS エラー
回避策: CORSMiddleware を追加
app.add_middleware(CORSMiddleware, allow_origins=["*"], ...)
📝 まとめ
学んだこと
FastAPI の利点
- Swagger UI 自動生成で開発・テストが楽
- async/await でスケーラブルな設計
- HTTPException で適切なエラーレスポンス
定期実行の設計
- schedule + threading でシンプルに実現
- daemon スレッドでメインプロセスと連動
- 個別エラーを全体に波及させない設計