📝 はじめに
この記事は、
SwitchBotプラグミニ 消費電力定期取得 (準備編) #Python - Qiita
の 実践編 になります。
基本的なAPI連携ができた後、本格的な監視システムとして運用するための実装方法を解説します。
この記事の対象読者
- SwitchBot APIでの電力データ取得が動作している方
- 定期的なデータ収集・監視システムを構築したい方
- 電力データの可視化やアラート通知を実装したい方
前提条件
- SwitchBotのトークン・シークレットを取得済み
- デバイスIDを把握している
- 基本的な
SwitchBotPowerMonitorクラスが実装済み
🎯 システム概要
アーキテクチャ
| コンポーネント | 役割 | 技術要素 |
|---|---|---|
| PeriodicPowerMonitor | メイン監視クラス | schedule, threading |
| データ収集 | 定期的な電力データ取得 | SwitchBot API v1.1 |
| データストア | データの保存・管理 | メモリ |
🛠️ 実装
必要なライブラリのインストール
# 定期実行用ライブラリ
pip install schedule
# HTTP通信
pip install requests
基本クラスの前提
以下の実装は、前記事で作成したSwitchBotPowerMonitorクラスを前提としています:
# 前記事で実装済みのクラス(参考)
class SwitchBotPowerMonitor:
def __init__(self, token, secret): ...
def generate_auth_headers(self): ...
def get_device_status(self, device_id): ...
def get_power_data(self, device_id): ...
def get_device_list(self): ...
定期監視システムの基本実装
import schedule
import time
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class AlertConfig:
"""アラート設定"""
high_power_threshold: float = 1000.0 # W
daily_cost_threshold: float = 500.0 # 円
cost_per_kwh: float = 25.0 # 円/kWh
@dataclass
class MonitorConfig:
"""監視設定"""
interval_minutes: int = 5
max_retries: int = 3
retry_delay_seconds: int = 10
data_retention_days: int = 30
class PeriodicPowerMonitor:
"""
SwitchBotプラグミニの定期監視システム
機能:
- 定期的な電力データ収集
- 履歴データの管理
- アラート通知
- 電気代計算
"""
def __init__(
self,
token: str,
secret: str,
device_id: str,
alert_config: Optional[AlertConfig] = None,
monitor_config: Optional[MonitorConfig] = None
):
self.monitor = SwitchBotPowerMonitor(token, secret)
self.device_id = device_id
self.data_store: List[Dict] = []
self.is_running: bool = False
self.alert_config = alert_config or AlertConfig()
self.monitor_config = monitor_config or MonitorConfig()
self.alert_callbacks: List[Callable] = []
self.error_count: int = 0
def add_alert_callback(self, callback: Callable) -> None:
"""
アラート発生時のコールバックを追加
Args:
callback: アラート情報を受け取るコールバック関数
"""
self.alert_callbacks.append(callback)
def _notify_alert(self, alert_type: str, message: str, data: Dict) -> None:
"""アラート通知を実行"""
alert_info = {
"type": alert_type,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
logger.warning(f"🚨 アラート: {message}")
for callback in self.alert_callbacks:
try:
callback(alert_info)
except Exception as e:
logger.error(f"アラートコールバック実行エラー: {e}")
def job(self) -> None:
"""
定期実行ジョブ
- 電力データを取得
- データストアに保存
- アラートをチェック
"""
retries = 0
while retries < self.monitor_config.max_retries:
try:
data = self.monitor.get_power_data(self.device_id)
if data:
# データストアに追加
self.data_store.append(data)
# 古いデータを削除
self._cleanup_old_data()
# ログ出力
logger.info(
f"[{data['timestamp']}] "
f"Power: {data['power']}W, "
f"Voltage: {data['voltage']}V, "
f"Current: {data['electric_current']}mA, "
f"Daily: {data['electricity_of_day']}kWh"
)
# アラートチェック
self._check_alerts(data)
# エラーカウントリセット
self.error_count = 0
return
else:
logger.warning("データ取得失敗 - 空のレスポンス")
retries += 1
except Exception as e:
logger.error(f"エラー発生: {e}")
retries += 1
self.error_count += 1
if retries < self.monitor_config.max_retries:
logger.info(f"リトライ中... ({retries}/{self.monitor_config.max_retries})")
time.sleep(self.monitor_config.retry_delay_seconds)
# 全リトライ失敗
logger.error("データ取得に失敗しました(最大リトライ回数超過)")
if self.error_count >= 5:
self._notify_alert(
"connection_error",
"連続してデータ取得に失敗しています",
{"error_count": self.error_count}
)
def _cleanup_old_data(self) -> None:
"""古いデータを削除"""
cutoff_date = datetime.now() - timedelta(
days=self.monitor_config.data_retention_days
)
original_count = len(self.data_store)
self.data_store = [
d for d in self.data_store
if datetime.fromisoformat(d['timestamp']) > cutoff_date
]
removed_count = original_count - len(self.data_store)
if removed_count > 0:
logger.debug(f"古いデータを{removed_count}件削除しました")
def _check_alerts(self, data: Dict) -> None:
"""アラートをチェック"""
power = data.get('power', 0)
daily_kwh = data.get('electricity_of_day', 0)
# 高負荷アラート
if power > self.alert_config.high_power_threshold:
self._notify_alert(
"high_power",
f"高負荷警告: {power}Wを消費中",
data
)
# 電気代アラート
daily_cost = daily_kwh * self.alert_config.cost_per_kwh
if daily_cost > self.alert_config.daily_cost_threshold:
self._notify_alert(
"high_cost",
f"電気代警告: 本日{daily_cost:.0f}円",
{"daily_cost": daily_cost, **data}
)
def run(self, interval_minutes: Optional[int] = None) -> None:
"""
定期実行を開始
Args:
interval_minutes: 監視間隔(分)
"""
interval = interval_minutes or self.monitor_config.interval_minutes
self.is_running = True
# スケジュール設定
schedule.every(interval).minutes.do(self.job)
# 初回即座に実行
self.job()
logger.info(f"🚀 電力監視開始({interval}分間隔)")
while self.is_running:
schedule.run_pending()
time.sleep(1)
def stop(self) -> None:
"""監視を停止"""
self.is_running = False
schedule.clear()
logger.info("⏹️ 電力監視を停止しました")
def get_statistics(self) -> Dict:
"""統計情報を取得"""
if not self.data_store:
return {"error": "データがありません"}
powers = [d['power'] for d in self.data_store]
daily_consumptions = [d['electricity_of_day'] for d in self.data_store]
return {
"total_readings": len(self.data_store),
"power": {
"avg": sum(powers) / len(powers),
"max": max(powers),
"min": min(powers)
},
"daily_consumption": {
"latest": daily_consumptions[-1] if daily_consumptions else 0,
"max": max(daily_consumptions) if daily_consumptions else 0
},
"date_range": {
"start": self.data_store[0]['timestamp'],
"end": self.data_store[-1]['timestamp']
}
}
def calculate_cost(self) -> Dict:
"""電気代を計算"""
if not self.data_store:
return {"error": "データがありません"}
latest = self.data_store[-1]
daily_kwh = latest.get('electricity_of_day', 0)
daily_cost = daily_kwh * self.alert_config.cost_per_kwh
monthly_estimate = daily_cost * 30
return {
"daily_kwh": daily_kwh,
"daily_cost": daily_cost,
"monthly_estimate": monthly_estimate,
"cost_per_kwh": self.alert_config.cost_per_kwh
}
def export_data(self, filepath: str, format: str = "json") -> None:
"""データをエクスポート"""
if format == "json":
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.data_store, f, ensure_ascii=False, indent=2)
logger.info(f"データを{filepath}にエクスポートしました")
elif format == "csv":
import csv
if self.data_store:
with open(filepath, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=self.data_store[0].keys())
writer.writeheader()
writer.writerows(self.data_store)
logger.info(f"データを{filepath}にエクスポートしました")
else:
raise ValueError(f"未対応のフォーマット: {format}")
# 使用例
if __name__ == "__main__":
# 設定
TOKEN = "your_token"
SECRET = "your_secret"
DEVICE_ID = "your_device_id"
# アラート設定
alert_config = AlertConfig(
high_power_threshold=1000, # 1000W超えで警告
daily_cost_threshold=500, # 500円/日超えで警告
cost_per_kwh=25 # 電気料金
)
# 監視設定
monitor_config = MonitorConfig(
interval_minutes=5, # 5分間隔
max_retries=3, # 最大3回リトライ
retry_delay_seconds=10, # リトライ間隔10秒
data_retention_days=30 # 30日間保持
)
# 監視システム初期化
periodic = PeriodicPowerMonitor(
TOKEN, SECRET, DEVICE_ID,
alert_config=alert_config,
monitor_config=monitor_config
)
# アラートコールバック設定
def on_alert(alert_info):
print(f"🔔 アラート: {alert_info['message']}")
periodic.add_alert_callback(on_alert)
try:
# 監視開始
periodic.run(interval_minutes=5)
except KeyboardInterrupt:
periodic.stop()
📊 実行結果と可視化
データ収集結果の例
{
"power": 45.2,
"voltage": 119.8,
"electric_current": 377,
"electricity_of_day": 0.324,
"timestamp": "2025-01-07T12:30:00.123456"
}
コンソール出力例
2025-01-08 10:00:00 - INFO - 🚀 電力監視開始(5分間隔)
2025-01-08 10:00:01 - INFO - [2025-01-08T10:00:01] Power: 45.2W, Voltage: 119.8V, Current: 377mA, Daily: 0.324kWh
2025-01-08 10:05:01 - INFO - [2025-01-08T10:05:01] Power: 52.1W, Voltage: 120.1V, Current: 434mA, Daily: 0.328kWh
2025-01-08 10:10:01 - WARNING - 🚨 アラート: 高負荷警告: 1250Wを消費中
📈 コスト計算機能
時間帯別料金計算
@dataclass
class TimeBasedRateConfig:
"""時間帯別電気料金設定"""
peak_rate: float = 30.0 # ピーク時料金(円/kWh)
off_peak_rate: float = 15.0 # オフピーク料金
peak_start_hour: int = 8 # ピーク開始時刻
peak_end_hour: int = 22 # ピーク終了時刻
def calculate_time_based_cost(
data_store: List[Dict],
rate_config: TimeBasedRateConfig
) -> Dict:
"""時間帯別電気代計算"""
peak_consumption = 0
off_peak_consumption = 0
for data in data_store:
hour = datetime.fromisoformat(data['timestamp']).hour
# 簡易的にpower×interval(5min)で消費量を推定
consumption_wh = data['power'] * (5 / 60) # Wh
consumption_kwh = consumption_wh / 1000
if rate_config.peak_start_hour <= hour < rate_config.peak_end_hour:
peak_consumption += consumption_kwh
else:
off_peak_consumption += consumption_kwh
peak_cost = peak_consumption * rate_config.peak_rate
off_peak_cost = off_peak_consumption * rate_config.off_peak_rate
total_cost = peak_cost + off_peak_cost
return {
"peak": {
"consumption_kwh": peak_consumption,
"cost_yen": peak_cost,
"rate": rate_config.peak_rate
},
"off_peak": {
"consumption_kwh": off_peak_consumption,
"cost_yen": off_peak_cost,
"rate": rate_config.off_peak_rate
},
"total_cost": total_cost
}
月間レポート生成
def generate_monthly_report(data_store: List[Dict], cost_per_kwh: float = 25.0) -> str:
"""月間レポートを生成"""
if not data_store:
return "データがありません"
# 統計計算
powers = [d['power'] for d in data_store]
avg_power = sum(powers) / len(powers)
max_power = max(powers)
min_power = min(powers)
# 最新の日次消費量
latest_daily = data_store[-1].get('electricity_of_day', 0)
estimated_monthly = latest_daily * 30
estimated_cost = estimated_monthly * cost_per_kwh
report = f"""
╔══════════════════════════════════════════════════════╗
║ 📊 SwitchBot 電力監視 月間レポート ║
╠══════════════════════════════════════════════════════╣
║ 📈 消費電力統計 ║
║ 平均: {avg_power:>8.1f} W ║
║ 最大: {max_power:>8.1f} W ║
║ 最小: {min_power:>8.1f} W ║
╠══════════════════════════════════════════════════════╣
║ 💰 電気代推定 ║
║ 本日消費量: {latest_daily:>8.3f} kWh ║
║ 月間推定消費量: {estimated_monthly:>8.1f} kWh ║
║ 月間推定電気代: {estimated_cost:>8.0f} 円 ║
╠══════════════════════════════════════════════════════╣
║ 📅 データ期間 ║
║ 開始: {data_store[0]['timestamp'][:19]} ║
║ 終了: {data_store[-1]['timestamp'][:19]} ║
║ データ数: {len(data_store)} 件 ║
╚══════════════════════════════════════════════════════╝
"""
return report
🚨 トラブルシューティング
よくある問題と解決策
| エラー | 原因 | 解決方法 |
|---|---|---|
| 401 Unauthorized | トークンが間違っている | 再度トークンを取得し直す |
| 403 Forbidden | API制限超過 | 1日10,000回以内に収める |
| デバイスが見つからない | device_idが間違っている | デバイス一覧を再度取得する |
| データがnull | デバイスがオフライン | Wi-Fi接続を確認する |
| スケジュールが動かない | メインループがブロック | time.sleep(1)を確認 |
| メモリ増加 | データ保持期間が長い | data_retention_daysを調整 |
実践的なコツ
- キャッシュの活用: 同じデータの重複取得を防ぐ
- エラーハンドリング: ネットワークエラーを適切に処理
- レート制限の考慮: 1分間隔程度が推奨(APIは1日10,000回制限)
- ログ記録: 問題発生時の調査を容易にする
デバッグ用コード
# 統計情報の確認
stats = periodic.get_statistics()
print(json.dumps(stats, indent=2, ensure_ascii=False))
# コスト計算
cost = periodic.calculate_cost()
print(f"本日の電気代: {cost['daily_cost']:.0f}円")
print(f"月間推定: {cost['monthly_estimate']:.0f}円")
# データエクスポート
periodic.export_data("debug_data.json")
periodic.export_data("debug_data.csv", format="csv")
# 月間レポート
report = generate_monthly_report(periodic.data_store)
print(report)
📝 まとめ
この記事では、SwitchBotプラグミニの定期監視システムを構築しました。
実装した機能
- ✅ 定期的な電力データ収集(スケジュール実行)
- ✅ 履歴データの管理(自動削除)
- ✅ データエクスポート(JSON、CSV)
- ✅ 時間帯別料金計算
- ✅ 月間レポート生成