Edited at

RaspberryPiでスマートメータの電力を取得する2

More than 1 year has passed since last update.

前回の続き

ソースがひとつ足りませんでした、、

公開ファイルも古かったので微妙に修正

次回はこのプログラムで作成された30分毎の積数から料金を計算シミュレーションしていきます。

だいぶ経ってしまいましたが。。

家のRaspberry Pi3とスマートメーターの通信が安定しないので、

近くに置けるようスイッチサイエンスさんからRaspberryPi Zeroを購入。

スイッチサイエンスさんのページを見てたら。。

Micro Dot pHATなるものまで発見してしまったので、

興味本位で購入してしまいました。

Micro Dot pHATに瞬時電力量を表示できるようにしたいと思います。

Micro Dot pHATをはんだ付けして、以下のページを参考にライブラリをインストール。

https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-micro-dot-phat

Micro Dot pHATのライブラリーがpython3でしか動かないので、

前回のプログラムを改造。

前回同様同じディレクトリに配置して、get-power.pyを実行すれば動きます。

phatがない場合はスレッド起動をコメントしないと動きません。。

要望があれば修正して公開します。

予期しない応答があった場合落ちるように作ってあります。

落ちた場合に再起動したい場合、cronになんちゃってプロセス監視入れると起動し直してくれます。

/home/hoge/get-power.pyは動作しているパスに書き換えてください。

今の所わが家はこれで、ほぼ!

無停止で瞬時電力を表示できています。

*/10 * * * * ps ax |grep -v grep | grep -q get-power.py|| python3 /home/hoge/get-power.py

pythonは素人なので、指摘等あればなんでも指摘してください。

下記のような感じで表示が、取得した時間、瞬時電力量で切り替わります。

公開したプログラムで下記のHPは電力量を取得しています。

PUMA'S HOME

IMG_5349.JPG

IMG_5350.JPG

コンフィグ


config.py

#シリアルポートデバイス名

#serialPortDev = 'COM3' # Windows の場合
serialPortDev = '/dev/ttyUSB0' # Linux(ラズパイなど)の場合
#serialPortDev = '/dev/cu.usbserial-A103BTPR' # Mac の場合

# Bルート認証ID(東京電力パワーグリッドから郵送で送られてくるヤツ)
rbid = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくるヤツ)
rbpwd = "XXXXXXXXXXXXX"

#ファイル出力設定
#瞬時電力量のファイル名
POWER_FILE_NAME = "power.log"
#積算電力量のファイル名
EACH30_FILE_NAME = "each30.log"
#瞬時電力量のファイルを出力するパス
WRITE_PATH="/home/hoge/data/"


エコネット電文とパーサー


echonet.py

#!/usr/bin/env python                                        # -*- coding: utf-8 -*-


from common import *

GET_NOW_POWER = "\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x01\x01"
GET_EACH30 = "\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE2\x00"
GET_NOW_POWER_B = b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x01\x01'
GET_EACH30_B = b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE2\x00\x03'
GET_STATUS_B = b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\x88'
GET_MF_B = b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE1'

from microdotphat import write_string, set_decimal, clear, show, scroll
from echonet import *
from config import *

import serial
import logging
import datetime

#ロガー取得
logger = logging.getLogger('echonet')

#瞬時電力量ファイル作成処理、表示用処理
def parthE7(line,ps_th) :
# 内容が瞬時電力計測値(E7)だったら
logger.info("pathE7 start")
hexPower = line[-8:] # 最後の4バイト(16進数で8文字)が瞬時電力計測値
power = str(int(hexPower, 16))
d = datetime.datetime.now()
#phatShow(power,d.strftime("%H:%M"))
ps_th.power = power
ps_th.hm = d.strftime("%H:%M")
filename = WRITE_PATH + POWER_FILE_NAME
body = "瞬時電力:"+power+"[W]"
body = body + "(" +d.strftime("%H:%M:%S") + ")"
writeFile(filename, body)
logger.info(body)
logger.info("pathE7 end")

#積算電力量ファイル作成処理
def parthE2(res,day) :
logger.info("pathE2 start")
offset = 8
pos= offset*48
line = res[32:32+pos]
flg = True
cnt = 0
body = day + ","
powerCSV = day + ","
while flg:
start = cnt*offset
intPower = int(line[start:start+offset],16)
strPower = str(intPower)
num = len(strPower)
power = strPower[0:num-1] + "." + strPower[-1:]
#power = format(strPower[:-1],".",strPower[-1:])
body = body + power + ","
cnt += 1
if 47 < cnt :
flg = False
logger.info(body)
filename = WRITE_PATH + day + "_" + EACH30_FILE_NAME
writeFile(filename, body)
logger.info(body)
logger.info("pathE2 end")


共通処理


common.py

#!/usr/bin/env python

# -*- coding: utf-8 -*-

from microdotphat import write_string, set_decimal, clear, show, scroll

import sys
import logging
import datetime
import locale
import atexit

#ロガー取得
logger = logging.getLogger('common')

def all_clear(ser) :
logger.error("all clear start")
ser.close()
clear()
write_string("Down" , kerning=False)
show()
logger.error("all clear end")

def writeFile(filename,msg) :
logger.info("writeFile start")
f = open(filename,'w')
f.write(msg)
f.close()
logger.info("writeFile end")

def str2byte(str):
return str.encode()

def byte2str(byte):
return byte.decode()


メイン処理


get-power.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

from echonet import *
from config import *
from common import *
from phatoutthread import *

import sys
import serial
import time
import logging
import logging.handlers
import datetime
import locale

#ロガー取得
logger = logging.getLogger('main')

logname = "/var/log/tools/b-route.log"
fmt = "%(asctime)s %(levelname)s %(name)s :%(message)s"
#logging.basicConfig(level=10, format=fmt)
logging.basicConfig(level=21, filename=logname, format=fmt)

# ログのファイル出力先を設定
#logger.addHandler(fh)

# シリアルポート初期化
ser = serial.Serial(serialPortDev, baudrate=115200)

ser.reset_input_buffer()

if ser.out_waiting > 0:
ser.reset_output_buffer()
atexit.register(all_clear,ser)
# Bルート認証パスワード設定
SKSETPWD ="SKSETPWD C " + rbpwd + "\r\n"
ser.write(str2byte(SKSETPWD))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# Bルート認証ID設定
SKSETRBID = "SKSETRBID " + rbid + "\r\n"
ser.write(str2byte(SKSETRBID))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

scanDuration = 4; # スキャン時間。
scanRes = {} # スキャン結果の入れ物
scanRes["Channel"]=""

# スキャンのリトライループ(何か見つかるまで)
while scanRes.get("Channel") == "" :
# アクティブスキャン(IE あり)を行う
# 時間かかります。10秒ぐらい?
SKSCAN = "SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
ser.write(str2byte(SKSCAN))

# スキャン1回について、スキャン終了までのループ
scanEnd = False
while not scanEnd :
line = ser.readline()
lineStr = byte2str(line)
logger.info(lineStr)

if lineStr.startswith("EVENT 22") :
# スキャン終わったよ(見つかったかどうかは関係なく)
scanEnd = True
elif lineStr.startswith(" ") :
# スキャンして見つかったらスペース2個あけてデータがやってくる
# 例
# Channel:39
# Channel Page:09
# Pan ID:FFFF
# Addr:FFFFFFFFFFFFFFFF
# LQI:A7
# PairID:FFFFFFFF
cols = lineStr.strip().split(':')
scanRes[cols[0]] = cols[1]
scanDuration+=1

if 14 < scanDuration and not scanRes.has_key("Channel"):
# 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
logger.error("スキャンリトライオーバー")
ser.close()
sys.exit() #### 糸冬了 ####

# スキャン結果からChannelを設定。
SKSREGS2 = "SKSREG S2 " + scanRes["Channel"] + "\r\n"
ser.write(str2byte(SKSREGS2))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# スキャン結果からPan IDを設定
SKSREGS3 = "SKSREG S3 " + scanRes["Pan ID"] + "\r\n"
ser.write(str2byte(SKSREGS3))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# MACアドレス(64bit)をIPV6リンクローカルアドレスに変換。
# (BP35A1の機能を使って変換しているけど、単に文字列変換すればいいのではという話も??)
SKLL64 = "SKLL64 " + scanRes["Addr"] + "\r\n"
ser.write(str2byte(SKLL64))
logger.info(byte2str(ser.readline()))
ipv6Addr = byte2str(ser.readline()).strip()

# PANA 接続シーケンスを開始します。
SKJOIN = "SKJOIN " + ipv6Addr + "\r\n"
ser.write(str2byte(SKJOIN))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# PANA 接続完了待ち(10行ぐらいなんか返してくる)
bConnected = False
while not bConnected :
line = byte2str(ser.readline())
if line.startswith("EVENT 24") :
logger.error("PANA 接続失敗")
ser.close()
sys.exit() #### 糸冬了 ####
elif line.startswith("EVENT 25") :
# 接続完了!
bConnected = True

# シリアル通信のタイムアウトを設定
ser.timeout = 20

# スマートメーターがインスタンスリスト通知を投げてくる
# (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
logger.info(byte2str(ser.readline()))

DAILY_TASK = True
today = ""
task_cnt = 0
#Thread Start
ps_th = phatoutthread(10)
ps_th.start()
while True :

#いつもは即時値取得
if not DAILY_TASK :
command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_NOW_POWER_B))
# コマンド送信
ser.write(str2byte(command) + GET_NOW_POWER_B)
d = datetime.datetime.today()
if today != d.strftime("%d") :
DAILY_TASK = True
#初回起動時または、日付が変更されたら前日の30分値を取得
if DAILY_TASK :
if task_cnt == 0 :
logger.debug("task_cnt 0")
command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_EACH30_B))
# コマンド送信
ser.write(str2byte(command) + GET_EACH30_B)
#if task_cnt == 1 :
# logger.debug("task_cnt 1")
# command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_STATUS_B))
# # コマンド送信
# ser.write(str2byte(command) + GET_STATUS_B)
#if task_cnt == 2 :
# logger.debug("task_cnt 2")
# command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_MF_B))
# # コマンド送信
# ser.write(str2byte(command) + GET_MF_B)

logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

line = byte2str(ser.readline()) # ERXUDPが来るはず
logger.info(line)

# 受信データはたまに違うデータが来たり、
# 取りこぼしたりして変なデータを拾うことがあるので
# チェックを厳しめにしてます。
if line.startswith("ERXUDP") :
cols = line.strip().split(' ')
res = cols[8] # UDP受信データ部分
#tid = res[4:4+4];
seoj = res[8:8+6]
#deoj = res[14,14+6]
ESV = res[20:20+2]
#OPC = res[22,22+2]
logger.debug("EPC:" + seoj)
logger.debug("ESV:" + ESV)
#エラー処理ここに入ったら落ちる
if seoj != "028801" :
logger.error("seoj:" + seoj )
ser.close()
ps_th.flg = True
event.set()
ps_th.join()
sys.exit() #### 糸冬了 ####

if seoj == "028801" and ESV == "72" :
# スマートメーター(028801)から来た応答(72)なら
EPC = res[24:24+2]
logger.debug("EPC:" + EPC)
if EPC == "E7" :
# 内容が瞬時電力計測値(E7)だったら
logger.info("PARTH E7")
parthE7(line,ps_th)
event.set()
if EPC == "E2" :
# 内容が電力計測値(E2)だったら
logger.info("PARTH E2")
d = datetime.datetime.today()
today = d.strftime("%d")
d -= datetime.timedelta(days = 1)
logger.info(today)
parthE2(res,d.strftime("%Y%m%d"))
DAILY_TASK = False
#task_cnt += 1
#if EPC == "D7" :
# task_cnt += 1
#if EPC == "88" :
# DAILY_TASK = False
# task_cnt += 0
time.sleep(30)
ser.close()



phatoutthread.py

from microdotphat import write_string, set_decimal, clear, show, scroll

from threading import (Event,Thread)
import time
import datetime
import logging

event = Event()

#ロガー取得
logger = logging.getLogger('phatOut')

class phatoutthread(Thread):

def __init__(self, t):
super(phatoutthread, self).__init__()
self.t = t
self.flg = False
self.power = "start"
d = datetime.datetime.now()
self.hm = d.strftime("%H:%M")

def run(self):
while True :
#pHat出力
logger.debug("phatShow start")
event.wait()
event.clear()
if(self.flg):
logger.info("phatShow break")
break
clear()
write_string(self.hm , kerning=False)
show()
time.sleep(self.t)
clear()
set_decimal(1,1)
if len(self.power) < 4:
write_string("0" + self.power + "kW" , kerning=False)
else:
write_string(self.power + "kW" , kerning=False)
logger.info("phatShow end")
show()
#time.sleep(self.t)
'''