📝 はじめに
この記事は、
Tapo Matter 消費電力定期取得(準備編) #Python - Qiita
の 実践編 になります。
Tapo P110Mスマートプラグから取得した電力データを定期的に収集・分析し、アラート通知や電気代計算を自動化するシステムを構築していきます。
この記事の対象読者
- Tapo Matterの基本的な電力データ取得ができている方
- 定期的なデータ収集・監視システムを構築したい方
- 電力消費の異常検知やコスト管理を自動化したい方
- Pythonでの非同期プログラミングに興味がある方
前提条件
- Tapo P110Mの初期設定が完了していること
- Home AssistantでのMatter統合が完了していること
- Pythonでの電力データ取得が動作していること
- 基本的なTapoMatterMonitorクラスが実装済みであること
🎯 システム概要
アーキテクチャ
| コンポーネント | 役割 | 技術要素 |
|---|---|---|
| PeriodicTapoMonitor | メイン監視クラス | asyncio, schedule |
| データ収集 | 定期的な電力データ取得 | Matter Protocol |
| 履歴管理 | 30日分のデータ保持 | インメモリ / DB |
| アラートシステム | 閾値ベースの通知 | 条件判定ロジック |
| コスト計算 | 電気代の自動計算 | 料金レート適用 |
🛠️ 実装
必要なライブラリのインストール
# 定期実行用ライブラリ
pip install schedule
# 非同期処理
pip install asyncio
# Matter通信(既にインストール済みの場合は不要)
pip install matter-server-client
pip install python-matter-server
基本クラスの前提
以下の実装は、前記事で作成したTapoMatterMonitorクラスを前提としています:
# 前記事で実装済みのクラス(参考)
class TapoMatterMonitor:
async def connect(self): ...
async def discover_devices(self): ...
async def get_power_data(self, device_id): ...
async def get_all_power_data(self): ...
async def close(self): ...
定期監視システムの完全実装
import asyncio
import schedule
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
import json
import logging
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class AlertConfig:
"""アラート設定"""
high_power_threshold: float = 1000.0 # ワット
standby_power_threshold: float = 5.0 # ワット
standby_duration_hours: float = 1.0 # 時間
custom_alerts: List[Dict] = field(default_factory=list)
@dataclass
class CostConfig:
"""電気料金設定"""
cost_per_kwh: float = 25.0 # 円/kWh(デフォルト)
peak_rate: float = 30.0 # ピーク時料金
off_peak_rate: float = 15.0 # オフピーク料金
peak_start_hour: int = 8
peak_end_hour: int = 22
class PeriodicTapoMonitor:
"""
Tapo Matterデバイスの定期監視システム
機能:
- 定期的な電力データ収集
- 履歴データの管理(30日保持)
- アラート通知
- 電気代計算
"""
def __init__(
self,
server_url: str = "ws://localhost:5580/ws",
alert_config: Optional[AlertConfig] = None,
cost_config: Optional[CostConfig] = None
):
self.monitor = TapoMatterMonitor(server_url)
self.data_history: List[Dict] = []
self.is_running: bool = False
self.alert_config = alert_config or AlertConfig()
self.cost_config = cost_config or CostConfig()
self.alert_callbacks: List[Callable] = []
async def setup(self) -> None:
"""
初期設定
- Matterサーバーへの接続
- デバイスの発見
"""
await self.monitor.connect()
await self.monitor.discover_devices()
device_count = len(self.monitor.devices)
logger.info(f"初期設定完了: {device_count}台のデバイスを発見")
def add_alert_callback(self, callback: Callable) -> None:
"""
アラート発生時のコールバックを追加
Args:
callback: アラート情報を受け取るコールバック関数
"""
self.alert_callbacks.append(callback)
async def _notify_alert(self, alert_type: str, message: str, data: Dict) -> None:
"""アラート通知を実行"""
alert_info = {
"type": alert_type,
"message": message,
"data": data,
"timestamp": datetime.now().isoformat()
}
for callback in self.alert_callbacks:
try:
if asyncio.iscoroutinefunction(callback):
await callback(alert_info)
else:
callback(alert_info)
except Exception as e:
logger.error(f"アラートコールバック実行エラー: {e}")
async def monitor_job(self) -> None:
"""
定期実行ジョブ
- 全デバイスの電力データを取得
- 履歴に追加
- 古いデータを削除
- アラートをチェック
"""
try:
power_data = await self.monitor.get_all_power_data()
for data in power_data:
# データを履歴に追加
self.data_history.append(data)
# 古いデータ(30日以上前)を削除
await self._cleanup_old_data()
# ログ出力
logger.info(
f"[{data['timestamp']}] {data['device_name']}: "
f"{data['power']:.1f}W, "
f"{data['voltage']:.1f}V, "
f"{data['current']:.3f}A, "
f"{data['energy']:.3f}kWh"
)
# アラートチェック
await self.check_alerts(data)
except Exception as e:
logger.error(f"監視エラー: {e}")
async def _cleanup_old_data(self) -> None:
"""古いデータを削除(30日以上前)"""
cutoff_date = datetime.now() - timedelta(days=30)
original_count = len(self.data_history)
self.data_history = [
d for d in self.data_history
if datetime.fromisoformat(d['timestamp']) > cutoff_date
]
removed_count = original_count - len(self.data_history)
if removed_count > 0:
logger.debug(f"古いデータを{removed_count}件削除しました")
async def check_alerts(self, data: Dict) -> None:
"""
アラートチェック
Args:
data: 電力データ
"""
power = data['power']
device_name = data['device_name']
# 高負荷アラート
if power > self.alert_config.high_power_threshold:
message = f"高負荷警告: {device_name}が{power:.1f}Wを消費中"
logger.warning(f"⚠️ {message}")
await self._notify_alert("high_power", message, data)
# スタンバイ電力アラート
if power <= self.alert_config.standby_power_threshold:
await self._check_standby_alert(data)
# カスタムアラート
for custom_alert in self.alert_config.custom_alerts:
await self._check_custom_alert(custom_alert, data)
async def _check_standby_alert(self, data: Dict) -> None:
"""スタンバイ電力アラートをチェック"""
duration = timedelta(hours=self.alert_config.standby_duration_hours)
threshold = 50 # 高負荷の閾値
# 最後の高負荷からの経過時間をチェック
recent_high = any(
d['power'] > threshold and
datetime.fromisoformat(d['timestamp']) > datetime.now() - duration
for d in self.data_history[-10:]
if d.get('device_id') == data.get('device_id')
)
if not recent_high:
message = f"スタンバイ電力注意: {data['device_name']}が低負荷継続"
logger.warning(f"⚠️ {message}")
await self._notify_alert("standby_power", message, data)
async def _check_custom_alert(self, alert_rule: Dict, data: Dict) -> None:
"""カスタムアラートをチェック"""
try:
condition = alert_rule.get("condition", {})
# 電力範囲チェック
min_power = condition.get("min_power", float('-inf'))
max_power = condition.get("max_power", float('inf'))
if min_power <= data['power'] <= max_power:
message = alert_rule.get("message", "カスタムアラート発生")
await self._notify_alert("custom", message, data)
except Exception as e:
logger.error(f"カスタムアラート評価エラー: {e}")
async def calculate_cost(self, cost_per_kwh: Optional[float] = None) -> Dict:
"""
電気代計算
Args:
cost_per_kwh: 1kWhあたりの料金(円)
Returns:
コスト情報の辞書
"""
rate = cost_per_kwh or self.cost_config.cost_per_kwh
if not self.data_history:
return {
"daily_cost": 0,
"daily_energy": 0,
"monthly_estimate": 0,
"message": "データがありません"
}
# 最新の1日分のエネルギー消費を集計
today = datetime.now().date()
daily_data = [
data for data in self.data_history
if datetime.fromisoformat(data['timestamp']).date() == today
]
if not daily_data:
return {
"daily_cost": 0,
"daily_energy": 0,
"monthly_estimate": 0,
"message": "本日のデータがありません"
}
# エネルギー消費の合計(差分計算)
daily_energy = self._calculate_daily_energy(daily_data)
daily_cost = daily_energy * rate
estimated_monthly_cost = daily_cost * 30
logger.info(f"💰 本日の電気代: {daily_cost:.0f}円")
logger.info(f"📊 推定月間電気代: {estimated_monthly_cost:.0f}円")
return {
"daily_cost": daily_cost,
"daily_energy": daily_energy,
"monthly_estimate": estimated_monthly_cost,
"rate_used": rate,
"data_points": len(daily_data)
}
def _calculate_daily_energy(self, daily_data: List[Dict]) -> float:
"""日次エネルギー消費を計算"""
if len(daily_data) < 2:
return 0
# デバイスごとにグループ化
devices = {}
for data in daily_data:
device_id = data.get('device_id', 'unknown')
if device_id not in devices:
devices[device_id] = []
devices[device_id].append(data)
total_energy = 0
for device_id, readings in devices.items():
# 時系列でソート
sorted_readings = sorted(
readings,
key=lambda x: datetime.fromisoformat(x['timestamp'])
)
if len(sorted_readings) >= 2:
# 最初と最後のenergyの差分
first_energy = sorted_readings[0].get('energy', 0)
last_energy = sorted_readings[-1].get('energy', 0)
total_energy += max(0, last_energy - first_energy)
return total_energy
async def calculate_time_based_cost(self) -> Dict:
"""時間帯別電気代計算"""
if not self.data_history:
return {"error": "データがありません"}
today = datetime.now().date()
peak_energy = 0
off_peak_energy = 0
daily_data = [
data for data in self.data_history
if datetime.fromisoformat(data['timestamp']).date() == today
]
for data in daily_data:
hour = datetime.fromisoformat(data['timestamp']).hour
energy_increment = data.get('energy', 0) / len(daily_data) # 簡易計算
if self.cost_config.peak_start_hour <= hour < self.cost_config.peak_end_hour:
peak_energy += energy_increment
else:
off_peak_energy += energy_increment
peak_cost = peak_energy * self.cost_config.peak_rate
off_peak_cost = off_peak_energy * self.cost_config.off_peak_rate
total_cost = peak_cost + off_peak_cost
return {
"peak": {
"energy_kwh": peak_energy,
"cost_yen": peak_cost,
"rate": self.cost_config.peak_rate
},
"off_peak": {
"energy_kwh": off_peak_energy,
"cost_yen": off_peak_cost,
"rate": self.cost_config.off_peak_rate
},
"total_cost": total_cost
}
async def start_periodic_monitoring(self, interval_minutes: int = 5) -> None:
"""
定期監視を開始
Args:
interval_minutes: 監視間隔(分)
"""
await self.setup()
self.is_running = True
# 初回実行
await self.monitor_job()
# 定期実行設定
schedule.every(interval_minutes).minutes.do(
lambda: asyncio.create_task(self.monitor_job())
)
# 1時間ごとにコスト計算
schedule.every(1).hours.do(
lambda: asyncio.create_task(self.calculate_cost())
)
logger.info(f"🚀 定期監視開始: {interval_minutes}分間隔")
while self.is_running:
schedule.run_pending()
await asyncio.sleep(60) # 1分待機
async def stop_monitoring(self) -> None:
"""監視を停止"""
self.is_running = False
await self.monitor.close()
logger.info("⏹️ 監視を停止しました")
def get_statistics(self) -> Dict:
"""統計情報を取得"""
if not self.data_history:
return {"error": "データがありません"}
powers = [d['power'] for d in self.data_history]
return {
"total_readings": len(self.data_history),
"avg_power": sum(powers) / len(powers),
"max_power": max(powers),
"min_power": min(powers),
"unique_devices": len(set(d.get('device_id') for d in self.data_history)),
"date_range": {
"start": min(d['timestamp'] for d in self.data_history),
"end": max(d['timestamp'] for d in self.data_history)
}
}
async def export_data(self, filepath: str, format: str = "json") -> None:
"""データをエクスポート"""
if format == "json":
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(self.data_history, f, ensure_ascii=False, indent=2)
logger.info(f"データを{filepath}にエクスポートしました")
else:
raise ValueError(f"未対応のフォーマット: {format}")
# 使用例
async def main():
# アラート設定
alert_config = AlertConfig(
high_power_threshold=1000, # 1000W超えで警告
standby_power_threshold=5, # 5W以下でスタンバイ判定
standby_duration_hours=1 # 1時間以上低負荷で通知
)
# 電気料金設定
cost_config = CostConfig(
cost_per_kwh=25, # 基本料金
peak_rate=30, # ピーク時
off_peak_rate=15, # オフピーク時
peak_start_hour=8,
peak_end_hour=22
)
# 監視システム初期化
periodic = PeriodicTapoMonitor(
alert_config=alert_config,
cost_config=cost_config
)
# アラートコールバック設定
def on_alert(alert_info):
print(f"🔔 アラート: {alert_info['message']}")
# ここでSlackやLINE通知などを実装可能
periodic.add_alert_callback(on_alert)
try:
# 5分間隔で監視開始
await periodic.start_periodic_monitoring(interval_minutes=5)
except KeyboardInterrupt:
await periodic.stop_monitoring()
if __name__ == "__main__":
asyncio.run(main())
📊 機能詳細
1. データ収集機能
定期的なポーリングで電力データを収集し、30日間保持します。
# データ構造
{
"power": 45.2, # 現在の消費電力(W)
"voltage": 119.8, # 電圧(V)
"current": 0.377, # 電流(A)
"energy": 12.5, # 累積エネルギー(kWh)
"timestamp": "2025-01-08T12:30:00",
"device_id": 12345,
"device_name": "リビングルーム"
}
2. アラートシステム
| アラート種類 | 条件 | デフォルト閾値 |
|---|---|---|
| 高負荷警告 | 消費電力が閾値超過 | 1000W |
| スタンバイ注意 | 低負荷が長時間継続 | 5W以下×1時間 |
| カスタム | 任意の条件設定 | - |
# カスタムアラートの追加例
alert_config = AlertConfig(
custom_alerts=[
{
"condition": {"min_power": 500, "max_power": 800},
"message": "中負荷範囲で運転中"
},
{
"condition": {"max_power": 0.5},
"message": "電源がOFFの可能性があります"
}
]
)
3. コスト計算機能
# 基本的なコスト計算
cost_info = await periodic.calculate_cost()
# 結果例:
# {
# "daily_cost": 150, # 本日の電気代
# "daily_energy": 6.0, # 本日の消費電力量
# "monthly_estimate": 4500 # 月間推定
# }
# 時間帯別コスト計算
time_cost = await periodic.calculate_time_based_cost()
# 結果例:
# {
# "peak": {"energy_kwh": 4.0, "cost_yen": 120},
# "off_peak": {"energy_kwh": 2.0, "cost_yen": 30},
# "total_cost": 150
# }
🚨 トラブルシューティング
よくある問題と解決策
| 問題 | 原因 | 解決方法 |
|---|---|---|
| スケジュールが実行されない | asyncio非対応 |
asyncio.create_task()でラップ |
| メモリ使用量が増加 | 履歴データの蓄積 |
_cleanup_old_data()の確認 |
| アラートが重複 | チェック頻度が高い | デバウンス処理の追加 |
| コスト計算が0 | データ不足 | データポイント数の確認 |
デバッグ用コマンド
# 統計情報の確認
stats = periodic.get_statistics()
print(json.dumps(stats, indent=2))
# データのエクスポート
await periodic.export_data("debug_data.json")
# 履歴データの確認
print(f"履歴データ数: {len(periodic.data_history)}")
print(f"最新データ: {periodic.data_history[-1] if periodic.data_history else 'なし'}")
📝 まとめ
この記事では、Tapo Matterデバイスの定期監視システムを構築しました。
実装した機能
- ✅ 定期的な電力データ収集(5分間隔)
- ✅ 30日間の履歴データ管理
- ✅ 高負荷・スタンバイ電力アラート
- ✅ カスタムアラート機能
- ✅ 日次・月次電気代計算
- ✅ 時間帯別料金計算