Help us understand the problem. What is going on with this article?

RaspberryPiでスマートメータの電力を取得する2

More than 1 year has passed since last update.

前回の続き

ソースがひとつ足りませんでした、、
公開ファイルも古かったので微妙に修正

次回はこのプログラムで作成された30分毎の積数から料金を計算シミュレーションしていきます。

だいぶ経ってしまいましたが。。
家のRaspberry Pi3とスマートメーターの通信が安定しないので、
近くに置けるようスイッチサイエンスさんからRaspberryPi Zeroを購入。

スイッチサイエンスさんのページを見てたら。。
Micro Dot pHATなるものまで発見してしまったので、
興味本位で購入してしまいました。

Micro Dot pHATに瞬時電力量を表示できるようにしたいと思います。

Micro Dot pHATをはんだ付けして、以下のページを参考にライブラリをインストール。
https://learn.pimoroni.com/tutorial/sandyj/getting-started-with-micro-dot-phat

Micro Dot pHATのライブラリーがpython3でしか動かないので、
前回のプログラムを改造。

前回同様同じディレクトリに配置して、get-power.pyを実行すれば動きます。
phatがない場合はスレッド起動をコメントしないと動きません。。
要望があれば修正して公開します。

予期しない応答があった場合落ちるように作ってあります。
落ちた場合に再起動したい場合、cronになんちゃってプロセス監視入れると起動し直してくれます。
/home/hoge/get-power.pyは動作しているパスに書き換えてください。
今の所わが家はこれで、ほぼ!
無停止で瞬時電力を表示できています。

*/10 * * * * ps ax |grep -v grep | grep -q get-power.py|| python3 /home/hoge/get-power.py

pythonは素人なので、指摘等あればなんでも指摘してください。
下記のような感じで表示が、取得した時間、瞬時電力量で切り替わります。

公開したプログラムで下記のHPは電力量を取得しています。
PUMA'S HOME

IMG_5349.JPG
IMG_5350.JPG

コンフィグ

config.py
#シリアルポートデバイス名
#serialPortDev = 'COM3'  # Windows の場合
serialPortDev = '/dev/ttyUSB0'  # Linux(ラズパイなど)の場合
#serialPortDev = '/dev/cu.usbserial-A103BTPR'    # Mac の場合

# Bルート認証ID(東京電力パワーグリッドから郵送で送られてくるヤツ)
rbid  = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# Bルート認証パスワード(東京電力パワーグリッドからメールで送られてくるヤツ)
rbpwd = "XXXXXXXXXXXXX"

#ファイル出力設定
#瞬時電力量のファイル名
POWER_FILE_NAME = "power.log"
#積算電力量のファイル名
EACH30_FILE_NAME = "each30.log"
#瞬時電力量のファイルを出力するパス
WRITE_PATH="/home/hoge/data/"

エコネット電文とパーサー

echonet.py
#!/usr/bin/env python                                        # -*- coding: utf-8 -*-

from common import *

GET_NOW_POWER = "\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x01\x01"
GET_EACH30 =   "\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE2\x00"
GET_NOW_POWER_B = b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE7\x01\x01'
GET_EACH30_B =    b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE2\x00\x03'
GET_STATUS_B =   b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\x88'
GET_MF_B =   b'\x10\x81\x12\x34\x05\xFF\x01\x02\x88\x01\x62\x01\xE1'


from microdotphat import write_string, set_decimal, clear, show, scroll
from echonet import *
from config import *

import serial
import logging
import datetime

#ロガー取得
logger = logging.getLogger('echonet')

#瞬時電力量ファイル作成処理、表示用処理
def parthE7(line,ps_th) :
    # 内容が瞬時電力計測値(E7)だったら
    logger.info("pathE7 start")
    hexPower = line[-8:]    # 最後の4バイト(16進数で8文字)が瞬時電力計測値
    power = str(int(hexPower, 16))
    d = datetime.datetime.now()
    #phatShow(power,d.strftime("%H:%M"))
    ps_th.power = power
    ps_th.hm = d.strftime("%H:%M")
    filename = WRITE_PATH + POWER_FILE_NAME
    body = "瞬時電力:"+power+"[W]"
    body = body + "(" +d.strftime("%H:%M:%S") + ")"
    writeFile(filename, body)
    logger.info(body)
    logger.info("pathE7 end")

#積算電力量ファイル作成処理
def parthE2(res,day) :
    logger.info("pathE2 start")
    offset = 8
    pos= offset*48
    line = res[32:32+pos]
    flg = True
    cnt = 0
    body = day + ","
    powerCSV = day + ","
    while flg:
        start = cnt*offset
        intPower = int(line[start:start+offset],16)
        strPower = str(intPower)
        num = len(strPower)
        power = strPower[0:num-1] + "." + strPower[-1:]
        #power = format(strPower[:-1],".",strPower[-1:])
        body = body +  power + ","
        cnt += 1
        if 47 < cnt :
            flg = False
    logger.info(body)
    filename = WRITE_PATH + day + "_" + EACH30_FILE_NAME
    writeFile(filename, body)
    logger.info(body)
    logger.info("pathE2 end")

共通処理

common.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from microdotphat import write_string, set_decimal, clear, show, scroll

import sys
import logging
import datetime
import locale
import atexit

#ロガー取得
logger = logging.getLogger('common')

def all_clear(ser) :
    logger.error("all clear start")
    ser.close()
    clear()
    write_string("Down" , kerning=False)
    show()
    logger.error("all clear end")

def writeFile(filename,msg) :
    logger.info("writeFile start")
    f = open(filename,'w')
    f.write(msg)
    f.close()
    logger.info("writeFile end")

def str2byte(str):
    return str.encode()

def byte2str(byte):
    return byte.decode()

メイン処理

get-power.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from echonet import *
from config import *
from common import *
from phatoutthread import *

import sys
import serial
import time
import logging
import logging.handlers
import datetime
import locale


#ロガー取得
logger = logging.getLogger('main')

logname = "/var/log/tools/b-route.log"
fmt = "%(asctime)s %(levelname)s %(name)s :%(message)s"
#logging.basicConfig(level=10, format=fmt)
logging.basicConfig(level=21, filename=logname, format=fmt)


# ログのファイル出力先を設定
#logger.addHandler(fh)


# シリアルポート初期化
ser = serial.Serial(serialPortDev, baudrate=115200)

ser.reset_input_buffer()

if ser.out_waiting > 0:
        ser.reset_output_buffer()
atexit.register(all_clear,ser)
# Bルート認証パスワード設定
SKSETPWD ="SKSETPWD C " + rbpwd + "\r\n"
ser.write(str2byte(SKSETPWD))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# Bルート認証ID設定
SKSETRBID = "SKSETRBID " + rbid + "\r\n"
ser.write(str2byte(SKSETRBID))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

scanDuration = 4;   # スキャン時間。
scanRes = {} # スキャン結果の入れ物
scanRes["Channel"]=""

# スキャンのリトライループ(何か見つかるまで)
while scanRes.get("Channel") == "" :
    # アクティブスキャン(IE あり)を行う
    # 時間かかります。10秒ぐらい?
    SKSCAN = "SKSCAN 2 FFFFFFFF " + str(scanDuration) + "\r\n"
    ser.write(str2byte(SKSCAN))

    # スキャン1回について、スキャン終了までのループ
    scanEnd = False
    while not scanEnd :
        line = ser.readline()
        lineStr = byte2str(line)
        logger.info(lineStr)

        if lineStr.startswith("EVENT 22") :
            # スキャン終わったよ(見つかったかどうかは関係なく)
            scanEnd = True
        elif lineStr.startswith("  ") :
            # スキャンして見つかったらスペース2個あけてデータがやってくる
            # 例
            #  Channel:39
            #  Channel Page:09
            #  Pan ID:FFFF
            #  Addr:FFFFFFFFFFFFFFFF
            #  LQI:A7
            #  PairID:FFFFFFFF
            cols = lineStr.strip().split(':')
            scanRes[cols[0]] = cols[1]
    scanDuration+=1

    if 14 < scanDuration and not scanRes.has_key("Channel"):
        # 引数としては14まで指定できるが、7で失敗したらそれ以上は無駄っぽい
        logger.error("スキャンリトライオーバー")
        ser.close()
        sys.exit()  #### 糸冬了 ####

# スキャン結果からChannelを設定。
SKSREGS2 = "SKSREG S2 " + scanRes["Channel"] + "\r\n"
ser.write(str2byte(SKSREGS2))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# スキャン結果からPan IDを設定
SKSREGS3 = "SKSREG S3 " + scanRes["Pan ID"] + "\r\n"
ser.write(str2byte(SKSREGS3))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# MACアドレス(64bit)をIPV6リンクローカルアドレスに変換。
# (BP35A1の機能を使って変換しているけど、単に文字列変換すればいいのではという話も??)
SKLL64 = "SKLL64 " + scanRes["Addr"] + "\r\n"
ser.write(str2byte(SKLL64))
logger.info(byte2str(ser.readline()))
ipv6Addr = byte2str(ser.readline()).strip()

# PANA 接続シーケンスを開始します。
SKJOIN = "SKJOIN " + ipv6Addr + "\r\n"
ser.write(str2byte(SKJOIN))
logger.info(byte2str(ser.readline()))
logger.info(byte2str(ser.readline()))

# PANA 接続完了待ち(10行ぐらいなんか返してくる)
bConnected = False
while not bConnected :
    line = byte2str(ser.readline())
    if line.startswith("EVENT 24") :
        logger.error("PANA 接続失敗")
        ser.close()
        sys.exit()  #### 糸冬了 ####
    elif line.startswith("EVENT 25") :
        # 接続完了!
        bConnected = True

# シリアル通信のタイムアウトを設定
ser.timeout = 20

# スマートメーターがインスタンスリスト通知を投げてくる
# (ECHONET-Lite_Ver.1.12_02.pdf p.4-16)
logger.info(byte2str(ser.readline()))

DAILY_TASK = True
today = ""
task_cnt = 0
#Thread Start
ps_th = phatoutthread(10)
ps_th.start()
while True :

    #いつもは即時値取得
    if not DAILY_TASK :
        command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_NOW_POWER_B))
        # コマンド送信
        ser.write(str2byte(command) + GET_NOW_POWER_B)
        d = datetime.datetime.today()
        if today != d.strftime("%d") :
            DAILY_TASK = True
    #初回起動時または、日付が変更されたら前日の30分値を取得
    if DAILY_TASK :
        if task_cnt == 0 :
            logger.debug("task_cnt 0")
            command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_EACH30_B))
            # コマンド送信
            ser.write(str2byte(command) + GET_EACH30_B)
        #if task_cnt == 1 :
        #    logger.debug("task_cnt 1")
        #    command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_STATUS_B))
        #    # コマンド送信
        #    ser.write(str2byte(command) + GET_STATUS_B)
        #if task_cnt == 2 :
        #    logger.debug("task_cnt 2")
        #    command = "SKSENDTO 1 {0} 0E1A 1 {1:04X} ".format(ipv6Addr, len(GET_MF_B))
        #    # コマンド送信
        #    ser.write(str2byte(command) + GET_MF_B)

    logger.info(byte2str(ser.readline()))
    logger.info(byte2str(ser.readline()))
    logger.info(byte2str(ser.readline()))

    line = byte2str(ser.readline())         # ERXUDPが来るはず
    logger.info(line)

    # 受信データはたまに違うデータが来たり、
    # 取りこぼしたりして変なデータを拾うことがあるので
    # チェックを厳しめにしてます。
    if line.startswith("ERXUDP") :
        cols = line.strip().split(' ')
        res = cols[8]   # UDP受信データ部分
        #tid = res[4:4+4];
        seoj = res[8:8+6]
        #deoj = res[14,14+6]
        ESV = res[20:20+2]
        #OPC = res[22,22+2]
        logger.debug("EPC:" + seoj)
        logger.debug("ESV:" + ESV)
        #エラー処理ここに入ったら落ちる
        if seoj != "028801" :
            logger.error("seoj:" + seoj )
            ser.close()
            ps_th.flg = True
            event.set()
            ps_th.join()
            sys.exit()  #### 糸冬了 ####

        if seoj == "028801" and ESV == "72" :
            # スマートメーター(028801)から来た応答(72)なら
            EPC = res[24:24+2]
            logger.debug("EPC:" + EPC)
            if EPC == "E7" :
                # 内容が瞬時電力計測値(E7)だったら
                logger.info("PARTH E7")
                parthE7(line,ps_th)
                event.set()
            if EPC == "E2" :
                # 内容が電力計測値(E2)だったら
                logger.info("PARTH E2")
                d = datetime.datetime.today()
                today = d.strftime("%d")
                d -= datetime.timedelta(days = 1)
                logger.info(today)
                parthE2(res,d.strftime("%Y%m%d"))
                DAILY_TASK = False
                #task_cnt += 1
            #if EPC == "D7" :
            #    task_cnt += 1
            #if EPC == "88" :
            #    DAILY_TASK = False
            #    task_cnt += 0
        time.sleep(30)
ser.close()
phatoutthread.py
from microdotphat import write_string, set_decimal, clear, show, scroll
from threading import (Event,Thread)
import time
import datetime
import logging

event = Event()

#ロガー取得
logger = logging.getLogger('phatOut')

class phatoutthread(Thread):


    def __init__(self, t):
        super(phatoutthread, self).__init__()
        self.t = t
        self.flg = False
        self.power = "start"
        d = datetime.datetime.now()
        self.hm = d.strftime("%H:%M")

    def run(self):
        while True :
            #pHat出力
            logger.debug("phatShow start")
            event.wait()
            event.clear()
            if(self.flg):
                logger.info("phatShow break")
                break
            clear()
            write_string(self.hm , kerning=False)
            show()
            time.sleep(self.t)
            clear()
            set_decimal(1,1)
            if len(self.power) < 4:
                write_string("0" + self.power + "kW" , kerning=False)
            else:
                write_string(self.power + "kW" , kerning=False)
            logger.info("phatShow end")
            show()
            #time.sleep(self.t)
'''
puma_46
システム開発やってます
http://puma46.mydns.jp
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away