LoginSignup
7
6

Worker Threadデザインパターンでスマートメーター計測ロガー

Last updated at Posted at 2019-12-11

はじめに

かなり前からスマートメーターの電力情報を取得することができていますが、UDP通信であることや微弱な電波であるせいか長期間の安定的な運用が難しいとの指摘もあります。

参考:スマートメーターの情報を最安ハードウェアで引っこ抜く

長期間運用できる安定性と、様々な情報ソースに対応する拡張性をもった計測ロガーについて考えてみました。考え方としては、

  • Worker Thread デザインパターンによって、機能ごとの処理を互いに分離する
  • これにより、機能ごとの再起動など柔軟に対処でき、安定性が高まる
  • また、さまざまな機能を持ったクライアント、ワーカーを追加でき、拡張性が高まる

というものです。

注1) このプログラムは現状 WiSun モジュール RL7023 Stick-D/DSS(デュアルスタック)にしか対応していません。ただ、デバイスドライバ部を少し修正すれば対応できると思います。 、RL7023 Stick-D/IPS(Bルート専用)に対応しています(Ver 0.1.1)。同時に設定ファイルの記述方法を少し変更しました。(2019/12/14追記)

注2) Python3.9では__file__の値が絶対パスとなり、また、threading.Thread.isAlive()メソッドが廃止となったため、githubで公開していたソースコードではエラーが生じました。修正版をコミットしました。
ソースコード https://github.com/kjmat/keilog
(2023/9/22追記)

Worker Thread デザインパターンによる実装

やりたいことは、

  • スマートメータのデータを取得する
  • データをファイルに保存する

の2つなので、前者をクライアントスレッド、後者をワーカースレッド、保存するデータをリクエストとして捉えれば、Worker Thread デザインパターンで設計できそうです。Pythonのスレッドは並列処理によるパフォーマンス改善は望めないそうですが、今回の場合処理のほとんどがIOなので、機能を整理してわかりやすくすることと、拡張性、安定性の向上が目的となります。

なお、Arduinoからのセンサ情報を記録したいということが開発のきっかけとなっているので、クライアントとしてシリアルポートからのデータを読み取るクラスも実装しています。

今のところ、クライアントもワーカーも管理上区別する必要がないので、共通の Worker クラスを定義し、これを継承してスマートメーター通信とファイル保存のクラスを定義することにしました。
Workerクラスで定義されているのは、管理スレッドから停止の命令をうけつけ、stop イベントをセットする stop() 関数だけです。スレッドループでは stop イベントがセットされていない限りループを繰り返し、stop イベントがセットされていたら、ループを抜けて終了処理を行い、スレッドを終了する流れになります。stop() 関数はメインスレッドから呼び出されるので、join() を実行してWorkerのスレッドが終了するまでここで待機します。

worker.py
import threading

class Worker ( threading.Thread ):
    """Woker はスレッドを停止するためのイベントを設定するための抽象クラス。
    """
    def __init__( self ):
        super().__init__()
        self.stopEvent = threading.Event()

    def stop ( self ):
        """ストップイベントをセットする。
        """
        self.stopEvent.set()
        # スレッドが終了するまで待機
        self.join()

    def run(self):
        """ストップイベントがセットされていない限り、繰り返し実行する。
        この関数はオーバーライドする。
        """
        while not self.stopEvent.is_set():
            # do something
            pass

        # ここに終了処理を書く

Worker を継承した BrouteReader、FileRecorder、SerialReader クラスについての解説はここでは省略します。機能の概要は後述しますが、詳細はソースコードのコメントを参照してください。

メインの処理は2部構成で、

  • 動作する Worker を定義する設定部
  • 設定に従ってオブジェクトを作成しスレッドを開始する実行部

から成り立っています。

設定部 keiconf.py は以下のようなコードになります。

keiconf.py
import queue
from keilib.recorder import FileRecorder
from keilib.broute   import BrouteReader, WiSunRL7023

# オブジェクト(スレッド)間で通信を行うための Queue
record_que = queue.Queue(50)

# WiSunデバイスドライバオブジェクトを作成 (2019/12/15変更)
wisundev = WiSunRL7023 (
                port='/dev/serial/by-id/xxxxx', # シリアルポート
                baud=115200,                    # ボーレート
                type=WiSunRL7023.IPS            # Bルート専用タイプ
                # type=WiSunRL7023.DSS            # デュアルスタックタイプ
            )

# 動作させるワーカオブジェクトの構成
worker_def = [
    {
        'class': FileRecorder,                  # FileRecorderオブジェクトを作成
        'args': {                               # 引数
            'fname_base': 'mydatafile',         # 記録ファイルの名前に使われる文字列
            'record_que': record_que            # 記録するデータをやり取りする Queue
        }
    },
    {
        'class': BrouteReader,                  # BrouteReaderオブジェクトを作成
        'args': {                               # 引数
            'wisundev': wisundev,               # WiSunデバイスドライバオブジェクトを指定
            'broute_id': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx', # BルートID(電力会社に申請)
            'broute_pwd': 'xxxxxxxxxxxx',       # Bルートパスワード(電力会社に申請)
            'requests':[                        # 取得するプロパティ値の定義
                { 'epc':['D3','D7','E1'], 'cycle': 3600 }, 
                                                #   積算電力量用 係数(D3),有効桁数(D7),単位(E1),3600秒ごと
                { 'epc':['E7'], 'cycle': 10 },  #   瞬時電力(E7),10秒ごと
                { 'epc':['E0','E3'], 'cycle': 300 },
                                                #   積算電力量(E0,E3),300秒ごと
            ],
            'record_que': record_que            # 記録するデータをやり取りする Queue
        }
    },
]

ここでは、worker_defリストに FileRecorderとBrouteReaderの2つのクラスがコンストラクタへの引数の構成とともに定義されています。 実行部ではここに定義されているクラスのインスタンスを作成し管理します。作成されたオブジェクトはそれぞれ別のスレッドで動作を開始しますが、スレッド間でデータを受け渡すためのチャンネルとしてqueue.Queueオブジェクトを利用しています。record_queは2つのオブジェクト間でファイルに保存するデータを受け渡すために共有されています。BrouteReaderはスマートメーターから情報を得ると体裁を整えて record_queに投入(put)します。一方FileRecorderはrecoed_queに流れてきたデータを取得(get)してファイルに保存します。Queue オブジェクトはスレッドセーフであるため、排他制御を意識せずに使用できます。
(WiSunデバイスドライバはBrouteReaderの中で作成していましたが、設定部で作成してBrouteReaderのコンストラクタにわたす形に変更しました。2019/12/15更新)

実行部 kei.py の核心部は以下のコードです。

kei.py
# Launch each worker instance(各ワーカーインスタンスの起動)
for wdef in worker_def:
    wdef['instance'] = wdef['class']( **wdef['args'] ) # インスタンスの作成
    wdef['instance'].start() # スレッドの開始

# Check if the threads have stopped at intervals, and restart them if stopped.
# 一定間隔でスレッドが停止したかどうかを確認し、停止していた場合は再起動
while True:
    for wdef in worker_def:
        if not wdef['instance'].isAlive():
            wdef['instance'] = wdef['class']( ** wdef['args'])
            wdef['instance'].start()

    time.sleep(10)

このように、ワーカースレッドが不慮のエラーでストールした場合、メインスレッド内で再起動する仕組みを入れておくことによって長期間の安定動作が可能になります。
このほか kei.py にはログファイルの処理や、終了処理のためのシグナルハンドラの設定などが含まれています。

kei.py
def exit_handler(signal, frame):
    for wdef in worker_def:
        wdef['instance'].stop()
    sys.exit(0)

signal.signal( signal.SIGHUP, exit_handler )
signal.signal( signal.SIGINT, exit_handler )
signal.signal( signal.SIGTERM, exit_handler )

詳細はソースコードを確認してください。

ソースコード https://github.com/kjmat/keilog

特徴

  • Raspberry Pi + Python3 で動作

    • Raspbian Lite (デスクトップが入ってない。ヘッドレスで運用)
    • SDカード 4G以上(データ量に応じて。16Gもあれば十分)
    • ネットワーク接続(時刻同期のためとかいろいろ)
    • python3, python3-serial, python3-requests
    • (最新ディストリビューションだと serial だけ別途インストール)
    • ラズパイでなくても大丈夫だと思いますが、前提としてつけっぱなしになります。
  • スマートメーターBルートデータ取得機能

    • WiSun モジュール RL7023 Stick-D/DSS(デュアルスタック)、D/IPS(Bルート用)に対応
    • (シングルスタックの RL7023 Stick-D/IPS を使うには修正が必要。持ってないので未実装です)
    • 取得したいスマートメーターのプロパティと取得間隔を定義できる。
      対応プロパティ = D3,D7,E0,E1,E3,E7,E8,(EA,EB)
      • D3: 係数「積算電力量計測値」を実使用量に換算する係数
      • E7: 積算電力量計測値の有効桁数
      • E1: 単位「積算電力量計測値」の単位 (乗率)
      • E0: 積算電力量 計測値 (正方向計測値)
      • E3: 積算電力量 計測値 (逆方向計測値)
      • E7: 瞬時電力計測値(逆潮流のときは負の値)
      • E8: 瞬時電流計測値(T/R相別の電流)
      • EA: 定時 積算電力量 計測値 (正方向計測値)
      • EB: 定時 積算電力量 計測値 (逆方向計測値)
    • 状態遷移による振る舞いの管理 → 接続が切れても自動的に再接続
  • シリアルポートからのデータ取得機能

    • USBに接続した Arduino などの周辺機器から入力されたデータも記録可能
    • Arduinoはセンサのライブラリや作例が豊富で使いやすいし消費電力も少ない
    • XbeeやTWELiteDIPなどで無線化すれば、離れた場所のセンサも記録できる
    • TWELiteDIPにセンサを直結した場合、電池で数年持つセンサノードが作れる
  • リモートの Http サーバーにデータを POST する機能

    • 遠隔地に置いたラズパイのデータを(そこそこ)リアルタイムに取得するため
    • Raspi + Soracom(SIM) + AK020(3Gモデム) で安定動作
  • マルチスレッド(シンプルなフレームワーク)

    • 不慮のエラーによる停止からの回復、長期連続運用が可能
    • 機能の追加が容易

起動方法

Raspbian Lite にあらかじめ pyserial をインストールしておきます。

$ sudo apt install python3-serial

適当な作業ディレクトリを作成し、以下のようにファイルを配置し、keiconf.py には構成を定義しておきます。

workdir/
    |-- keilib/
    |   |-- __init__.py
    |   |-- broute.py
    |   +-- ....
    |
    |-- keiconf.py
    +-- kei.py

プログラムの実行は次のように行います。

$ python3 kei.py

実行すると workdir/ ディレクトリ内に、計測データを保存するファイルが2つと、プログラムの実行時の情報を出力するログファイルkei.logが作成されます。ログについては、

$ DEBUG=0 python3 kei.py

のように起動すると、ログレベル = DEBUG となりログの出力先が標準出力になります。ログレベルは USR1 シグナルを受け取ると INFO <-> DEBUG で反転します。
プログラムの終了については、HUP, INT, TERM シグナルによって各オブジェクトにストップイベントを送っています。ストップイベントを受けとったオブジェクトのスレッドはリソースを開放してから終了します。

出力ファイルの形式

2つのファイルが作られます。

1. [YYYYMMDD]-[mydatafile].txt
2. sum[YYYYMMDD]-[mydatafile].txt

1.は1行につき1件のデータが記録されており、行のフォーマットは以下の形をとります。

[YYYY/MM/DD hh:mm:ss],[UnitID],[SensorID],[Value],[DataID]<改行>

なお BrouteReader の場合、出力するデータは以下の通りです。

[UnitID]    = BR(固定)
[SensorID]  = スマートメーターのプロパティコード(EPC): E7, E0 等
[Value]     = 測定値(数値)
[DataID]    = x(固定)

2.は1行に各センサーの10分ごとの平均値が記録されます。

[YYYY/MM/DD hh:m0],[UnitID],[SensorID],[AverageValue]<改行>

日付のフォーマットに秒がない点に注意です。

参考

7
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6