4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

*Python*ECHONET Lite対応家電(エミュレータ)を制御する

Last updated at Posted at 2020-09-09

はじめに

Flaskを使ったことがあり,ソケット通信のプログラムを書いたことがある方なら誰でも理解できる記事です.PythonのFlaskを使って家電の制御を試してみましょう.家電の制御はnode.jsを使用することが多いですが,Pythonを使う人が増えていることもあるため,Pythonを使って家電を制御しようと思います.まず,通信プロトコルとしては,家電を制御する際に標準的に用いられるECHONET Liteを使用します.ECHONET Liteに対応している家電は多いとは言えません.しかし,ECHONET Liteで制御できない場合でも,WebAPIなどが提供されている場合はそちらから制御できることがあります.制御する家電を見つける際は,ECHONET Liteだけでなく,WebAPIを探してみるということも試してみてください.次に,今回制御する家電ですが,ECHONET LITE対応家電を用意するのは大変なので,代わりにエミュレータを使用します.

前提

開発環境は以下の通りです.今回は2台のPCを使用します.

  • エミュレータ実行用PC

    • Windows 10 (64bit)
    • Javaがインストール済み
  • 家電制御プログラム実行用PC

    • Python 3.8.2
    • Ubuntu 18.04.5 LTS

エミュレータの実行

エミュレータはMoekadenRoomを使用するので,GitHubのページに移動して,README.mdDownload executablesより圧縮ファイルをダウンロードしてください.圧縮ファイルを展開し,MoekadenRoom.exeを実行してください.なお,実行するには,Javaがインストールされている必要があります.Javaがインストールされていない場合は,こちらからインストールしてください.MoekadenRoom.exeを実行すると,以下のようにエミュレータが表示されます.GUIで家電を操作できるので,試してみてください.

Pythonによる家電(エミュレータ)の制御(制御命令・応答なし)

Pythonで家電に制御命令を送ります.一般的なソケット通信のクライアントプログラムとほぼ変わらないので,先にサンプルコードを示します.MoekadenRoomを起動した状態でサンプルコードを実行すると,エミュレータの照明が点灯します.

サンプルコード

echonet_lite.py
import socket
import binascii


def send_command():
    # MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
    ip = '192.168.0.10'
    # ECHONET LiteはUDPの3610番ポートを使用する
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '60',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30'  # 31に変えると照明を消すことができる
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]

    # 16進数文字列をバイト列に変換する
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    
    
if __name__ == '__main__':
    send_command();

転送するフレーム

サンプルコードの解説になります.ECHONET Lite フレームのフォーマットに基づいて,転送するメッセージを作成します.フレームに関する説明は以下のページを参考にしてください.

上のページを参考に,EHD, TID, SEOJ, DEOJ, ESV, OPC, EPC, PDC, EDTに値を入れています.詳細については上のページを確認してください.値は16進数です.

電文構成要素 値(説明)
ETD 1081 (ECHONETのプロトコル種別で,1081はECHONET Liteである)
TID 0000 (要求と応答を紐づけるためのIDで,任意の数値でよい)
SEOJ 05FF01 (送信元の機器を示す.コントローラの値を入れている.
参考:機器オブジェクト詳細規定)
DEOJ 029001 (送信先の機器(照明)を示す.照明の値を入れている)
ESV 60 (60は送信先からの応答のない制御命令である)
OPC 01 (一度の電文で処理するプロパティ数を示す)
EPC 80 (プロパティを指定する.MoekadenRoomのReadme.mdから,照明(Lightning)のEPCを探す)
PDC 01 (EDTのバイト数を示す。制御内容が一つなので01)
EDT 30 (EPCで指定したプロパディ名によって、値を入れる.MoekadenRoomのReadme.mdから,照明をつけるためのEDTを探す)

Pythonによる家電(エミュレータ)の制御(制御命令・応答あり)

先ほどの制御命令は応答がないものでした.ここでは,制御命令を送り,家電から応答を受け取ってみます.先ほどと同じように,エミュレータのランプを点灯させます.先ほどと違うのは,応答を受け取るということです.つまり,先ほどはソケット通信のクライアントプログラムのみを作成しましたが,ここでは応答を受け取るためにソケット通信のサーバプログラムに関しても作成する必要があります.クライアントプログラムとサーバプログラムでファイルを分けて別々で実行する方法と,これらを1つのファイルにまとめる方法の2通りを解説します.まずは,簡単な前者から説明します.

クライアントとサーバを別々のファイルに分ける方法

クライアントプログラムは先ほどとほぼ変わりません.変更するのはECHONET Liteのフレームの値です.ESVの値を61にすることで,応答ありの制御命令を送ることができます.

echonet_lite.py
import socket
import binascii


def send_command():
    # MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
    ip = '192.168.0.10'
    # ECHONET LiteはUDPの3610番ポートを使用する
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '61', # 先ほどのプログラムを変更(60 → 61)
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30' 
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]

    # 16進数文字列をバイト列に変換する
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    
    
if __name__ == '__main__':
    send_command();

応答を受け取るプログラムは一般的なソケット通信のサーバプログラムと何ら変わりません.ただし,応答をバイト列で受け取るため,16進数文字列に変換する必要があります.応答の際に受け取るデータは,先ほど送信したフレームと同じフォーマットなので,16進数文字列に変換することで,中身の内容を読み取ることができます.

socket_server.py
import socket
import binascii

def receive_state():
    ECHONETport = 3610

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    data, addr = sock.recvfrom(4096)
    
    # バイト列を16進数文字列に変換する
    data = str(binascii.hexlify(data), 'utf-8')
    print(data)


if __name__ == '__main__':
    receive_state()

まず,MoekadenRoomを起動し,サーバプログラム,クライアントプログラムの順に実行すると,サーバプログラムの実行結果として,以下のような出力が得られます.

$ python3 socket_server.py
# 出力結果
1081000002900105ff0171018000

出力結果を読み取ると以下のようになります.EPCの値から,制御命令に対する応答であることがわかります.

電文構成要素 値(説明)
ETD 1081 (ECHONETのプロトコル種別で,1081はECHONET Liteである)
TID 0000 (要求と応答を紐づけるためのIDで,任意の数値でよい)
SEOJ 029001 (送信元の機器:照明)
DEOJ 05FF01 (送信先の機器:コントローラ)
ESV 71 (61に対する応答は71で返ってくる)
OPC 01 (一度の電文で処理するプロパティ数)
EPC 80 (プロパティの指定.照明(Lightning)のEPC)
PDC 00 (EDTのバイト数.制御ではないので00)
EDT 空 (制御ではないためEDTは指定しない)

クライアントとサーバを1つのファイルにまとめる方法

サーバプログラムでは,データを受け取るまで待ち受ける(ブロッキング)ため,処理がその部分で止まってしまいます.クライアントプログラムも同時に処理するには,threadを使うのが良いでしょう.サーバプログラムを別スレッドで処理します.

別スレッドに処理を渡すには,Threadクラスのインスタンスを作成し,引数targetに別スレッドで処理したい関数を指定します.startメソッドでthreadを開始します.

th = Thread(target=function)
th.start()

では,サンプルコードを示します.プログラムを実行して3秒経つと照明に制御命令を送信し,応答が返ってくるプログラムです.

echonet_lite.py
import socket
import binascii
from threading import Thread
import time

def send_command():
    # MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '61',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '01',
        format_echonet_lite[8]: '30',   # 31に変えると照明を消すことができる
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.sendto(msg, (ip, ECHONETport))
    print('send')


def receive_state():
    ECHONETport = 3610

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    data, addr = sock.recvfrom(4096)
    data = str(binascii.hexlify(data), 'utf-8')
    print('receive')
    print(data)


if __name__ == '__main__':
    th = Thread(target=receive_state)
    th.start()
    time.sleep(3)
    send_command()

Pythonによる家電(エミュレータ)の制御(状態要求)

家電の制御として,家電の現在の状態を取得するということをやってみます.上で説明した制御命令(応答あり・なし)に加え,ここで説明する状態要求が,家電制御のおもな種類です.状態要求に関しても応答があるため,ソケット通信のクライアントプログラムとサーバプログラムの両方を作成する必要があります.状態要求では,送信時のESVの値は62になり,応答時のESVの値は72になります.また,制御命令ではないため,PDCの値は00EDTの値はになります.これらを踏まえて,以下にサンプルコードを示します.サンプルコードでは,応答を5秒おきに出力します.サンプルコードを実行しつつ,MoekadenRoomの照明のボタンをクリックしてON/OFFを切り替えてみてください.出力されるEDTの値が変化するはずです.

echonet_lite.py
import socket
import binascii
from threading import Thread
import time

def receive_state():
    ECHONETport = 3610
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    while True:
        data, addr = sock.recvfrom(4096)
        data = str(binascii.hexlify(data), 'utf-8')
        print(data)

def confirm_state():
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '62',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '00',
        format_echonet_lite[8]: ''
    }

    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    while True:
        sock.sendto(msg, (ip, ECHONETport))
        time.sleep(5)
        
if __name__ == '__main__':
    th1 = Thread(target=receive_state)
    th1.start()
    confirm_state()

Flaskとの連携

最後に,Flaskを使用してHTTPサーバを立て,家電を制御します.HTTPサーバで家電の状態を管理することは,複数の家電を制御する際によく使う手法です.これを応用して,Androidから家電を制御することに利用したり,IoTのデータ収集にも利用することができます.まずは,そのための一歩として,Flaskと家電を制御するシンプルなプログラムを見てみましょう.ここでは,複数のthreadを使用するため,queueを用います.queueとは,入れた通りの順番に、データを取り出すことができる構造のことを言います.queueの基本的な使い方は以下のようになります.

# キューの作成
q = Queue();
# キューへのデータの挿入は「put」、取り出しは「get」で行う
q_msg = {"name": "foge"}
q.put(q_msg)
q_result = q.get()
print(q_result["name"])

では,少し複雑になりますが,サンプルコードを示します.プログラムを実行する手順としては,はじめにMoekadenRoomを起動し,次にプログラムを実行し,最後にhttp://192.168.0.10:5000にアクセスします.5秒おきに家電の状態を取得して出力,10秒おきに家電にON/OFF命令をするプログラムです.

app.py
import socket
import binascii
from flask import Flask
from queue import Queue
from threading import Thread
import time

app = Flask(__name__)

q = Queue()
state_q = Queue()

@app.route('/', methods=['GET'])
def home():
    th1 = Thread(target=receive_state)
    th1.start()
    th2 = Thread(target=confirm_state)
    th2.start()
    th3 = Thread(target=send_command)
    th3.start()
    return 'This is a Sample Webpage'
    
def send_command():
    while True:
        time.sleep(10)
        if state_q.empty():
            state = '30'
        else:
            q_state_msg = state_q.get()
            q_state = q_state_msg['state']
            if q_state == '30':
                state = '31'
            else:
                state = '30'
        # MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
        ip = '192.168.0.10'
        ECHONETport = 3610
        format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
        data_format = {
            format_echonet_lite[0]: '1081',
            format_echonet_lite[1]: '0000',
            format_echonet_lite[2]: '05FF01',
            format_echonet_lite[3]: '029001',
            format_echonet_lite[4]: '60',
            format_echonet_lite[5]: '01',
            format_echonet_lite[6]: '80',
            format_echonet_lite[7]: '01',
            format_echonet_lite[8]: state  # state: 30 or 31 (ON or OFF)
        }
        frame = ''
        for key in format_echonet_lite:
            frame += data_format[key]
        msg = binascii.unhexlify(frame)
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(msg, (ip, ECHONETport))
        print('send_command')

def receive_state():
    ECHONETport = 3610
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('', ECHONETport))
    while True:
        data, addr = sock.recvfrom(4096)
        data = str(binascii.hexlify(data), 'utf-8')
        queue_msg = q.get()
        if state_q.empty():
            data_format = queue_msg['format']
            state = get_state(data, data_format)
            q_state_msg = {
                'state': state
            }
            state_q.put(q_state_msg)
        print(data)

def confirm_state():
    ip = '192.168.0.10'
    ECHONETport = 3610
    format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
    data_format = {
        format_echonet_lite[0]: '1081',
        format_echonet_lite[1]: '0000',
        format_echonet_lite[2]: '05FF01',
        format_echonet_lite[3]: '029001',
        format_echonet_lite[4]: '62',
        format_echonet_lite[5]: '01',
        format_echonet_lite[6]: '80',
        format_echonet_lite[7]: '00',
        format_echonet_lite[8]: ''
    }
    frame = ''
    for key in format_echonet_lite:
        frame += data_format[key]
    msg = binascii.unhexlify(frame)

    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    while True:
        sock.sendto(msg, (ip, ECHONETport))
        print('send_confirm_state')
        queue_msg = {
            'format': data_format
        }
        q.put(queue_msg)
        time.sleep(5)

# 16進数文字列のフレームからEDTの値を取得するための関数
def get_state(data, data_format):
    data_list = []
    data_pos = 0
    for value in data_format.values():
        element_size = len(value)
        if element_size == 0:
            element_size = 2
        data_list.append(data[data_pos: data_pos + element_size])
        data_pos += element_size
    state = data_list[8]
    return state

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

終わりに

PythonでECHONET Liteを用いて家電(エミュレータ)を制御しました.今回は最もシンプルに制御可能である照明を対象としました.今回のサンプルコードを元に,家電の様々な制御を実現してみてください.Pythonを使った家電の制御に関する情報は少ないので,Qiitaの記事を少しずつ増やしていきましょう.(家電を制御する最も適切なインタフェースが見つかるまで…)

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?