1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Tapo Matter 消費電力定期取得 (実践編)

Last updated at Posted at 2026-01-13

📝 はじめに

この記事は、
Tapo Matter 消費電力定期取得(準備編) #Python - Qiita
実践編 になります。

Tapo P110Mスマートプラグから取得した電力データを定期的に収集・分析し、アラート通知や電気代計算を自動化するシステムを構築していきます。

この記事の対象読者

  • Tapo Matterの基本的な電力データ取得ができている方
  • 定期的なデータ収集・監視システムを構築したい方
  • 電力消費の異常検知やコスト管理を自動化したい方
  • Pythonでの非同期プログラミングに興味がある方

前提条件

  • Tapo P110Mの初期設定が完了していること
  • Home AssistantでのMatter統合が完了していること
  • Pythonでの電力データ取得が動作していること
  • 基本的なTapoMatterMonitorクラスが実装済みであること

🎯 システム概要

アーキテクチャ

コンポーネント 役割 技術要素
PeriodicTapoMonitor メイン監視クラス asyncio, schedule
データ収集 定期的な電力データ取得 Matter Protocol
履歴管理 30日分のデータ保持 インメモリ / DB
アラートシステム 閾値ベースの通知 条件判定ロジック
コスト計算 電気代の自動計算 料金レート適用

🛠️ 実装

必要なライブラリのインストール

# 定期実行用ライブラリ
pip install schedule

# 非同期処理
pip install asyncio

# Matter通信(既にインストール済みの場合は不要)
pip install matter-server-client
pip install python-matter-server

基本クラスの前提

以下の実装は、前記事で作成したTapoMatterMonitorクラスを前提としています:

# 前記事で実装済みのクラス(参考)
class TapoMatterMonitor:
    async def connect(self): ...
    async def discover_devices(self): ...
    async def get_power_data(self, device_id): ...
    async def get_all_power_data(self): ...
    async def close(self): ...

定期監視システムの完全実装

import asyncio
import schedule
import time
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, field
import json
import logging

# ログ設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


@dataclass
class AlertConfig:
    """アラート設定"""
    high_power_threshold: float = 1000.0  # ワット
    standby_power_threshold: float = 5.0  # ワット
    standby_duration_hours: float = 1.0   # 時間
    custom_alerts: List[Dict] = field(default_factory=list)


@dataclass
class CostConfig:
    """電気料金設定"""
    cost_per_kwh: float = 25.0  # 円/kWh(デフォルト)
    peak_rate: float = 30.0     # ピーク時料金
    off_peak_rate: float = 15.0  # オフピーク料金
    peak_start_hour: int = 8
    peak_end_hour: int = 22


class PeriodicTapoMonitor:
    """
    Tapo Matterデバイスの定期監視システム

    機能:
    - 定期的な電力データ収集
    - 履歴データの管理(30日保持)
    - アラート通知
    - 電気代計算
    """

    def __init__(
        self,
        server_url: str = "ws://localhost:5580/ws",
        alert_config: Optional[AlertConfig] = None,
        cost_config: Optional[CostConfig] = None
    ):
        self.monitor = TapoMatterMonitor(server_url)
        self.data_history: List[Dict] = []
        self.is_running: bool = False
        self.alert_config = alert_config or AlertConfig()
        self.cost_config = cost_config or CostConfig()
        self.alert_callbacks: List[Callable] = []

    async def setup(self) -> None:
        """
        初期設定
        - Matterサーバーへの接続
        - デバイスの発見
        """
        await self.monitor.connect()
        await self.monitor.discover_devices()
        device_count = len(self.monitor.devices)
        logger.info(f"初期設定完了: {device_count}台のデバイスを発見")

    def add_alert_callback(self, callback: Callable) -> None:
        """
        アラート発生時のコールバックを追加

        Args:
            callback: アラート情報を受け取るコールバック関数
        """
        self.alert_callbacks.append(callback)

    async def _notify_alert(self, alert_type: str, message: str, data: Dict) -> None:
        """アラート通知を実行"""
        alert_info = {
            "type": alert_type,
            "message": message,
            "data": data,
            "timestamp": datetime.now().isoformat()
        }

        for callback in self.alert_callbacks:
            try:
                if asyncio.iscoroutinefunction(callback):
                    await callback(alert_info)
                else:
                    callback(alert_info)
            except Exception as e:
                logger.error(f"アラートコールバック実行エラー: {e}")

    async def monitor_job(self) -> None:
        """
        定期実行ジョブ
        - 全デバイスの電力データを取得
        - 履歴に追加
        - 古いデータを削除
        - アラートをチェック
        """
        try:
            power_data = await self.monitor.get_all_power_data()

            for data in power_data:
                # データを履歴に追加
                self.data_history.append(data)

                # 古いデータ(30日以上前)を削除
                await self._cleanup_old_data()

                # ログ出力
                logger.info(
                    f"[{data['timestamp']}] {data['device_name']}: "
                    f"{data['power']:.1f}W, "
                    f"{data['voltage']:.1f}V, "
                    f"{data['current']:.3f}A, "
                    f"{data['energy']:.3f}kWh"
                )

                # アラートチェック
                await self.check_alerts(data)

        except Exception as e:
            logger.error(f"監視エラー: {e}")

    async def _cleanup_old_data(self) -> None:
        """古いデータを削除(30日以上前)"""
        cutoff_date = datetime.now() - timedelta(days=30)
        original_count = len(self.data_history)

        self.data_history = [
            d for d in self.data_history
            if datetime.fromisoformat(d['timestamp']) > cutoff_date
        ]

        removed_count = original_count - len(self.data_history)
        if removed_count > 0:
            logger.debug(f"古いデータを{removed_count}件削除しました")

    async def check_alerts(self, data: Dict) -> None:
        """
        アラートチェック

        Args:
            data: 電力データ
        """
        power = data['power']
        device_name = data['device_name']

        # 高負荷アラート
        if power > self.alert_config.high_power_threshold:
            message = f"高負荷警告: {device_name}{power:.1f}Wを消費中"
            logger.warning(f"⚠️ {message}")
            await self._notify_alert("high_power", message, data)

        # スタンバイ電力アラート
        if power <= self.alert_config.standby_power_threshold:
            await self._check_standby_alert(data)

        # カスタムアラート
        for custom_alert in self.alert_config.custom_alerts:
            await self._check_custom_alert(custom_alert, data)

    async def _check_standby_alert(self, data: Dict) -> None:
        """スタンバイ電力アラートをチェック"""
        duration = timedelta(hours=self.alert_config.standby_duration_hours)
        threshold = 50  # 高負荷の閾値

        # 最後の高負荷からの経過時間をチェック
        recent_high = any(
            d['power'] > threshold and
            datetime.fromisoformat(d['timestamp']) > datetime.now() - duration
            for d in self.data_history[-10:]
            if d.get('device_id') == data.get('device_id')
        )

        if not recent_high:
            message = f"スタンバイ電力注意: {data['device_name']}が低負荷継続"
            logger.warning(f"⚠️ {message}")
            await self._notify_alert("standby_power", message, data)

    async def _check_custom_alert(self, alert_rule: Dict, data: Dict) -> None:
        """カスタムアラートをチェック"""
        try:
            condition = alert_rule.get("condition", {})

            # 電力範囲チェック
            min_power = condition.get("min_power", float('-inf'))
            max_power = condition.get("max_power", float('inf'))

            if min_power <= data['power'] <= max_power:
                message = alert_rule.get("message", "カスタムアラート発生")
                await self._notify_alert("custom", message, data)

        except Exception as e:
            logger.error(f"カスタムアラート評価エラー: {e}")

    async def calculate_cost(self, cost_per_kwh: Optional[float] = None) -> Dict:
        """
        電気代計算

        Args:
            cost_per_kwh: 1kWhあたりの料金(円)

        Returns:
            コスト情報の辞書
        """
        rate = cost_per_kwh or self.cost_config.cost_per_kwh

        if not self.data_history:
            return {
                "daily_cost": 0,
                "daily_energy": 0,
                "monthly_estimate": 0,
                "message": "データがありません"
            }

        # 最新の1日分のエネルギー消費を集計
        today = datetime.now().date()
        daily_data = [
            data for data in self.data_history
            if datetime.fromisoformat(data['timestamp']).date() == today
        ]

        if not daily_data:
            return {
                "daily_cost": 0,
                "daily_energy": 0,
                "monthly_estimate": 0,
                "message": "本日のデータがありません"
            }

        # エネルギー消費の合計(差分計算)
        daily_energy = self._calculate_daily_energy(daily_data)

        daily_cost = daily_energy * rate
        estimated_monthly_cost = daily_cost * 30

        logger.info(f"💰 本日の電気代: {daily_cost:.0f}")
        logger.info(f"📊 推定月間電気代: {estimated_monthly_cost:.0f}")

        return {
            "daily_cost": daily_cost,
            "daily_energy": daily_energy,
            "monthly_estimate": estimated_monthly_cost,
            "rate_used": rate,
            "data_points": len(daily_data)
        }

    def _calculate_daily_energy(self, daily_data: List[Dict]) -> float:
        """日次エネルギー消費を計算"""
        if len(daily_data) < 2:
            return 0

        # デバイスごとにグループ化
        devices = {}
        for data in daily_data:
            device_id = data.get('device_id', 'unknown')
            if device_id not in devices:
                devices[device_id] = []
            devices[device_id].append(data)

        total_energy = 0
        for device_id, readings in devices.items():
            # 時系列でソート
            sorted_readings = sorted(
                readings,
                key=lambda x: datetime.fromisoformat(x['timestamp'])
            )

            if len(sorted_readings) >= 2:
                # 最初と最後のenergyの差分
                first_energy = sorted_readings[0].get('energy', 0)
                last_energy = sorted_readings[-1].get('energy', 0)
                total_energy += max(0, last_energy - first_energy)

        return total_energy

    async def calculate_time_based_cost(self) -> Dict:
        """時間帯別電気代計算"""
        if not self.data_history:
            return {"error": "データがありません"}

        today = datetime.now().date()
        peak_energy = 0
        off_peak_energy = 0

        daily_data = [
            data for data in self.data_history
            if datetime.fromisoformat(data['timestamp']).date() == today
        ]

        for data in daily_data:
            hour = datetime.fromisoformat(data['timestamp']).hour
            energy_increment = data.get('energy', 0) / len(daily_data)  # 簡易計算

            if self.cost_config.peak_start_hour <= hour < self.cost_config.peak_end_hour:
                peak_energy += energy_increment
            else:
                off_peak_energy += energy_increment

        peak_cost = peak_energy * self.cost_config.peak_rate
        off_peak_cost = off_peak_energy * self.cost_config.off_peak_rate
        total_cost = peak_cost + off_peak_cost

        return {
            "peak": {
                "energy_kwh": peak_energy,
                "cost_yen": peak_cost,
                "rate": self.cost_config.peak_rate
            },
            "off_peak": {
                "energy_kwh": off_peak_energy,
                "cost_yen": off_peak_cost,
                "rate": self.cost_config.off_peak_rate
            },
            "total_cost": total_cost
        }

    async def start_periodic_monitoring(self, interval_minutes: int = 5) -> None:
        """
        定期監視を開始

        Args:
            interval_minutes: 監視間隔(分)
        """
        await self.setup()
        self.is_running = True

        # 初回実行
        await self.monitor_job()

        # 定期実行設定
        schedule.every(interval_minutes).minutes.do(
            lambda: asyncio.create_task(self.monitor_job())
        )

        # 1時間ごとにコスト計算
        schedule.every(1).hours.do(
            lambda: asyncio.create_task(self.calculate_cost())
        )

        logger.info(f"🚀 定期監視開始: {interval_minutes}分間隔")

        while self.is_running:
            schedule.run_pending()
            await asyncio.sleep(60)  # 1分待機

    async def stop_monitoring(self) -> None:
        """監視を停止"""
        self.is_running = False
        await self.monitor.close()
        logger.info("⏹️ 監視を停止しました")

    def get_statistics(self) -> Dict:
        """統計情報を取得"""
        if not self.data_history:
            return {"error": "データがありません"}

        powers = [d['power'] for d in self.data_history]

        return {
            "total_readings": len(self.data_history),
            "avg_power": sum(powers) / len(powers),
            "max_power": max(powers),
            "min_power": min(powers),
            "unique_devices": len(set(d.get('device_id') for d in self.data_history)),
            "date_range": {
                "start": min(d['timestamp'] for d in self.data_history),
                "end": max(d['timestamp'] for d in self.data_history)
            }
        }

    async def export_data(self, filepath: str, format: str = "json") -> None:
        """データをエクスポート"""
        if format == "json":
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(self.data_history, f, ensure_ascii=False, indent=2)
            logger.info(f"データを{filepath}にエクスポートしました")
        else:
            raise ValueError(f"未対応のフォーマット: {format}")


# 使用例
async def main():
    # アラート設定
    alert_config = AlertConfig(
        high_power_threshold=1000,  # 1000W超えで警告
        standby_power_threshold=5,   # 5W以下でスタンバイ判定
        standby_duration_hours=1     # 1時間以上低負荷で通知
    )

    # 電気料金設定
    cost_config = CostConfig(
        cost_per_kwh=25,      # 基本料金
        peak_rate=30,         # ピーク時
        off_peak_rate=15,     # オフピーク時
        peak_start_hour=8,
        peak_end_hour=22
    )

    # 監視システム初期化
    periodic = PeriodicTapoMonitor(
        alert_config=alert_config,
        cost_config=cost_config
    )

    # アラートコールバック設定
    def on_alert(alert_info):
        print(f"🔔 アラート: {alert_info['message']}")
        # ここでSlackやLINE通知などを実装可能

    periodic.add_alert_callback(on_alert)

    try:
        # 5分間隔で監視開始
        await periodic.start_periodic_monitoring(interval_minutes=5)
    except KeyboardInterrupt:
        await periodic.stop_monitoring()


if __name__ == "__main__":
    asyncio.run(main())

📊 機能詳細

1. データ収集機能

定期的なポーリングで電力データを収集し、30日間保持します。

# データ構造
{
    "power": 45.2,          # 現在の消費電力(W)
    "voltage": 119.8,       # 電圧(V)
    "current": 0.377,       # 電流(A)
    "energy": 12.5,         # 累積エネルギー(kWh)
    "timestamp": "2025-01-08T12:30:00",
    "device_id": 12345,
    "device_name": "リビングルーム"
}

2. アラートシステム

アラート種類 条件 デフォルト閾値
高負荷警告 消費電力が閾値超過 1000W
スタンバイ注意 低負荷が長時間継続 5W以下×1時間
カスタム 任意の条件設定 -
# カスタムアラートの追加例
alert_config = AlertConfig(
    custom_alerts=[
        {
            "condition": {"min_power": 500, "max_power": 800},
            "message": "中負荷範囲で運転中"
        },
        {
            "condition": {"max_power": 0.5},
            "message": "電源がOFFの可能性があります"
        }
    ]
)

3. コスト計算機能

# 基本的なコスト計算
cost_info = await periodic.calculate_cost()
# 結果例:
# {
#     "daily_cost": 150,      # 本日の電気代
#     "daily_energy": 6.0,    # 本日の消費電力量
#     "monthly_estimate": 4500  # 月間推定
# }

# 時間帯別コスト計算
time_cost = await periodic.calculate_time_based_cost()
# 結果例:
# {
#     "peak": {"energy_kwh": 4.0, "cost_yen": 120},
#     "off_peak": {"energy_kwh": 2.0, "cost_yen": 30},
#     "total_cost": 150
# }

🚨 トラブルシューティング

よくある問題と解決策

問題 原因 解決方法
スケジュールが実行されない asyncio非対応 asyncio.create_task()でラップ
メモリ使用量が増加 履歴データの蓄積 _cleanup_old_data()の確認
アラートが重複 チェック頻度が高い デバウンス処理の追加
コスト計算が0 データ不足 データポイント数の確認

デバッグ用コマンド

# 統計情報の確認
stats = periodic.get_statistics()
print(json.dumps(stats, indent=2))

# データのエクスポート
await periodic.export_data("debug_data.json")

# 履歴データの確認
print(f"履歴データ数: {len(periodic.data_history)}")
print(f"最新データ: {periodic.data_history[-1] if periodic.data_history else 'なし'}")

📝 まとめ

この記事では、Tapo Matterデバイスの定期監視システムを構築しました。

実装した機能

  • ✅ 定期的な電力データ収集(5分間隔)
  • ✅ 30日間の履歴データ管理
  • ✅ 高負荷・スタンバイ電力アラート
  • ✅ カスタムアラート機能
  • ✅ 日次・月次電気代計算
  • ✅ 時間帯別料金計算
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?