はじめに
前回の記事、Worker Thread デザインパターンでスマートメーター計測ロガーでは、計測を担うオブジェクト(スレッド)を再起動することで、長期間の運用を可能にするというアプローチでした。しかしそれは最後の手段であり、あくまでもスレッドが安定して動作することが前提です。また、スレッドは動いていても、なにかの処理でブロックされてしまっているような状態では対策になりません。
特に、Bルート経由でスマートメーターと通信をする場合には、スキャンや認証といったいくつかの手順を踏む必要があり、ある手順で失敗したり、タイムアウトすることもあります。それに応じてリトライを行ったり、リセットして最初からやり直すなどの処理を考えなければいけませんが、状況に応じてやるべきことが変わってくるため、よく整理して実装しないと、永遠に待ち続けたり予想外のデータを受け取ってクラッシュしたりする可能性が高まります。
今回は、Bルートのスマートメータとの通信を、状態遷移を管理して実装してみたのでご紹介します。
ソースコード より keilog/broute.py
状態遷移図を書く
まずは、どのようなシチュエーションで何をやるべきかをきちんと整理したうえで、なるべくわかりやすくプログラムしてゆく必要があります。BrouteReaderクラスのrun()スレッドループでは大まかには以下のような状態遷移で処理を実行しています。青の四角が状態を表していて、その状態で呼び出す関数の実行結果によって、リトライしたり状態を遷移させたりしています。必ずしもこれが正解というわけではなく他の方法も考えられますが、瞬時電力や積算電力量を取得する目的ではうまく動作しています。
実装方法
これを実装するコードは以下のような流れになります。(実際はもう少し複雑)
class BrouteReader ( Worker ):
# <省略>
def run( self ):
while not self.stopEvent.is_set():
if self.state == self._STATE_INIT:
self._open()
elif self.state == self._STATE_OPEN:
self._setup()
elif self.state == self._STATE_SETUP:
self._scan()
elif self.state == self._STATE_SCAN:
self._join()
elif self.state == self._STATE_JOIN:
if time_has_come(): # 実際は前回更新時間と現在の時間を比較します
self._sendto('プロパティ値要求電文')
dataframe = self._receive() # 電文を受け取る receive()は1秒でタイムアウトする
if dataframe:
self._accept(dataframe) # 受け取った電文を処理
if long_time_has_gone():
logger.error('長い間電文が来てない')
self._term()
self._close()
self.state = self._STATE_INIT
time.sleep(5)
self._term() # スマートメーターの接続を終了
self._close() # WiSunデバイスの開放
logger.info('[STOP]')
前回の記事に紹介したように、run()内では、stopEventがセットされていない限りはループを繰り返します。
ループの中では、self.state で表される現在の状態によってどれか一つの状態に属するコードが実行されます。ここでは関数を呼び出しているだけですが関数の中で必要な処理を行い self.state を書き換えて状態を更新しています。すると次回のループでは遷移した次の状態のコードが実行されることになります。これが基本の流れです。scan() や join() の中ではリトライ処理も含めて定義してあります。状態による場合分けは長くなりがちなので、なるべくシンプルに見通しよくしておくことが大切だと思います。
少し動作の流れを説明しておきます。
sendto()で送信した電文に対するスマートメータからの応答には1秒程度かかるようです。あるループにおいて time_has_come() となり、スマートメーターのプロパティ値を要求する電文を送信したとすると、その要求電文に対する応答はすぐには届かないため、そのループ内ではなく次かその次くらいのループで処理することになるでしょう。したがってreceiveで受け取った応答あるいは通知電文は、直前に送信した要求のものとは一致するとは限らず、中を見てみないと何が入っているかわかりません。accept()はどんな電文が来ても適切な処理ができるように対応しておく必要があります。
それから注意点としては、どの関数も必ずタイムアウトするように定義しておくことです。そうでないとループがそこでブロックされてしまい、先に進まなくなってしまう可能性があります。ブロックの根源の一つとして serial.readline() があります。この関数にタイムアウトを設定(1秒程度)し、続く処理はその前提でプログラミングする必要があります。それから、リトライループや"OK"を待つだけの処理であっても永遠に待ち続けることがないようにタイムアウトを設定します。
以下に、open()とsetup()の例をあげておきます。
def _open( self ):
if self.wisundev.open():
self.state = self._STATE_OPEN
else:
# エラーの場合は5秒停止。高速無限ループがCPUを専有しないように。(以下同様)
time.sleep(5)
def _setup( self ):
# リセット
if self.wisundev.reset():
pass
else:
time.sleep(5)
return False
# BルートIDとパスワードをデバイスに設定(レジスタに登録される)
if self.wisundev.setup( self.broute_id, self.broute_pwd ):
self.state = self._STATE_SETUP
else:
time.sleep(5)
関数の中では WiSun デバイスドライバの同名の関数を呼び出していますが、実行が成功したときには同時に状態遷移を行っています。エラーが起きたときは、必ずスリープしてから関数を終了するようにします。さもなければ、デバイスが未接続であったときに高速でリトライを続け、エラーをログに吐き続けてCPUを専有してしまうことになります。同様に前述の serial.readline() のタイムアウトについてもあまり短く設定すると、ループが高速に回ってしまう恐れがあるので注意します。
安定性向上のために
状態遷移を管理して実行を制御することで、やるべきことが明確になりロジックを間違えにくくなるように思います。そしてたとえ問題が生じても修正や対策しやすいコードになるのではないでしょうか。安定動作のためには一つ一つの関数の精度を上げてゆく必要は依然としてありますが、問題の切り分けが容易なので、的確に対策のポイントを突けると思います。
精度ということでは、対応外の意図していないイベントや電文を受け取ったときの処理については十分対策しておく必要があります。他のイベントや電文である可能性を考慮せず決め打ちで配列のインデックスなどを指定するとエラーになるリスクが高まります。また、処理するデータのほとんどは16進HEX文字列なので、16進文字列として正しいかどうかチェックしています。しかし環境にもよるのかもしれませんが、この手のエラーはあまり起きないようです。
一方、通信が途絶えてしまうことは比較的よくあります。上記コード run() の終わりの方で、一定時間電文を受け取らなかった場合、INITに戻るという処理を行ってこの問題に対処しています。PANAのセッションには有効期限があり、定期的に再認証して鍵をリフレッシュする必要があるのですが、ここで使っているRL7023は自動的に再認証する機能を持っています。ログを見るとその形跡があり再認証も成功しているのですが、その後通信が途絶えているケースがありました。これに限らず、何らかの原因で通信が途絶えることは大いに有り得るものとして、この対策は必須です。他のWiSunデバイスはどうなのかわかりませんが、もし自分で再認証する必要があるならば、定期的にrejoin()を実行すれば良いでしょう。
おわりに
今回、状態遷移を管理するプログラムを作成してみて、適切なタイムアウトが必要であるとか、高速リトライ無限ループに注意など、様々なポイントがあることがわかりました。もしプログラムのミスやエラーでバースト的に電波を発信してしまうようなことになると、同じ帯域を使っている周囲の機器に迷惑をかけてしまいますし、最悪電波法に違反してしまう恐れもあります。不用意に電波を出さないように注意しなければならないと思いました。他にも注意点があればご指摘いただければ幸いです。
それから状態の名前付けに案外悩みました。結局関数名と同じにしていますが、よい命名法などあればぜひ教えてください。