はじめに
Flaskを使ったことがあり,ソケット通信のプログラムを書いたことがある方なら誰でも理解できる記事です.PythonのFlaskを使って家電の制御を試してみましょう.家電の制御はnode.jsを使用することが多いですが,Pythonを使う人が増えていることもあるため,Pythonを使って家電を制御しようと思います.まず,通信プロトコルとしては,家電を制御する際に標準的に用いられるECHONET Liteを使用します.ECHONET Liteに対応している家電は多いとは言えません.しかし,ECHONET Liteで制御できない場合でも,WebAPIなどが提供されている場合はそちらから制御できることがあります.制御する家電を見つける際は,ECHONET Liteだけでなく,WebAPIを探してみるということも試してみてください.次に,今回制御する家電ですが,ECHONET LITE対応家電を用意するのは大変なので,代わりにエミュレータを使用します.
前提
開発環境は以下の通りです.今回は2台のPCを使用します.
-
エミュレータ実行用PC
- Windows 10 (64bit)
- Javaがインストール済み
-
家電制御プログラム実行用PC
- Python 3.8.2
- Ubuntu 18.04.5 LTS
エミュレータの実行
エミュレータはMoekadenRoomを使用するので,GitHubのページに移動して,README.md
のDownload executables
より圧縮ファイルをダウンロードしてください.圧縮ファイルを展開し,MoekadenRoom.exe
を実行してください.なお,実行するには,Javaがインストールされている必要があります.Javaがインストールされていない場合は,こちらからインストールしてください.MoekadenRoom.exe
を実行すると,以下のようにエミュレータが表示されます.GUIで家電を操作できるので,試してみてください.
Pythonによる家電(エミュレータ)の制御(制御命令・応答なし)
Pythonで家電に制御命令を送ります.一般的なソケット通信のクライアントプログラムとほぼ変わらないので,先にサンプルコードを示します.MoekadenRoomを起動した状態でサンプルコードを実行すると,エミュレータの照明が点灯します.
サンプルコード
import socket
import binascii
def send_command():
# MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
ip = '192.168.0.10'
# ECHONET LiteはUDPの3610番ポートを使用する
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '60',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30' # 31に変えると照明を消すことができる
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
# 16進数文字列をバイト列に変換する
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
if __name__ == '__main__':
send_command();
転送するフレーム
サンプルコードの解説になります.ECHONET Lite フレームのフォーマットに基づいて,転送するメッセージを作成します.フレームに関する説明は以下のページを参考にしてください.
上のページを参考に,EHD, TID, SEOJ, DEOJ, ESV, OPC, EPC, PDC, EDT
に値を入れています.詳細については上のページを確認してください.値は16進数です.
電文構成要素 | 値(説明) |
---|---|
ETD | 1081 (ECHONETのプロトコル種別で,1081はECHONET Liteである) |
TID | 0000 (要求と応答を紐づけるためのIDで,任意の数値でよい) |
SEOJ | 05FF01 (送信元の機器を示す.コントローラの値を入れている. 参考:機器オブジェクト詳細規定) |
DEOJ | 029001 (送信先の機器(照明)を示す.照明の値を入れている) |
ESV | 60 (60は送信先からの応答のない制御命令である) |
OPC | 01 (一度の電文で処理するプロパティ数を示す) |
EPC | 80 (プロパティを指定する.MoekadenRoomのReadme.mdから,照明(Lightning)のEPCを探す) |
PDC | 01 (EDTのバイト数を示す。制御内容が一つなので01) |
EDT | 30 (EPCで指定したプロパディ名によって、値を入れる.MoekadenRoomのReadme.mdから,照明をつけるためのEDTを探す) |
Pythonによる家電(エミュレータ)の制御(制御命令・応答あり)
先ほどの制御命令は応答がないものでした.ここでは,制御命令を送り,家電から応答を受け取ってみます.先ほどと同じように,エミュレータのランプを点灯させます.先ほどと違うのは,応答を受け取るということです.つまり,先ほどはソケット通信のクライアントプログラムのみを作成しましたが,ここでは応答を受け取るためにソケット通信のサーバプログラムに関しても作成する必要があります.クライアントプログラムとサーバプログラムでファイルを分けて別々で実行する方法と,これらを1つのファイルにまとめる方法の2通りを解説します.まずは,簡単な前者から説明します.
クライアントとサーバを別々のファイルに分ける方法
クライアントプログラムは先ほどとほぼ変わりません.変更するのはECHONET Liteのフレームの値です.ESV
の値を61
にすることで,応答ありの制御命令を送ることができます.
import socket
import binascii
def send_command():
# MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
ip = '192.168.0.10'
# ECHONET LiteはUDPの3610番ポートを使用する
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '61', # 先ほどのプログラムを変更(60 → 61)
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30'
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
# 16進数文字列をバイト列に変換する
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
if __name__ == '__main__':
send_command();
応答を受け取るプログラムは一般的なソケット通信のサーバプログラムと何ら変わりません.ただし,応答をバイト列で受け取るため,16進数文字列に変換する必要があります.応答の際に受け取るデータは,先ほど送信したフレームと同じフォーマットなので,16進数文字列に変換することで,中身の内容を読み取ることができます.
import socket
import binascii
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
data, addr = sock.recvfrom(4096)
# バイト列を16進数文字列に変換する
data = str(binascii.hexlify(data), 'utf-8')
print(data)
if __name__ == '__main__':
receive_state()
まず,MoekadenRoomを起動し,サーバプログラム,クライアントプログラムの順に実行すると,サーバプログラムの実行結果として,以下のような出力が得られます.
$ python3 socket_server.py
# 出力結果
1081000002900105ff0171018000
出力結果を読み取ると以下のようになります.EPC
の値から,制御命令に対する応答であることがわかります.
電文構成要素 | 値(説明) |
---|---|
ETD | 1081 (ECHONETのプロトコル種別で,1081はECHONET Liteである) |
TID | 0000 (要求と応答を紐づけるためのIDで,任意の数値でよい) |
SEOJ | 029001 (送信元の機器:照明) |
DEOJ | 05FF01 (送信先の機器:コントローラ) |
ESV | 71 (61に対する応答は71で返ってくる) |
OPC | 01 (一度の電文で処理するプロパティ数) |
EPC | 80 (プロパティの指定.照明(Lightning)のEPC) |
PDC | 00 (EDTのバイト数.制御ではないので00) |
EDT | 空 (制御ではないためEDTは指定しない) |
クライアントとサーバを1つのファイルにまとめる方法
サーバプログラムでは,データを受け取るまで待ち受ける(ブロッキング)ため,処理がその部分で止まってしまいます.クライアントプログラムも同時に処理するには,thread
を使うのが良いでしょう.サーバプログラムを別スレッドで処理します.
別スレッドに処理を渡すには,Thread
クラスのインスタンスを作成し,引数target
に別スレッドで処理したい関数を指定します.start
メソッドでthread
を開始します.
th = Thread(target=function)
th.start()
では,サンプルコードを示します.プログラムを実行して3秒経つと照明に制御命令を送信し,応答が返ってくるプログラムです.
import socket
import binascii
from threading import Thread
import time
def send_command():
# MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '61',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: '30', # 31に変えると照明を消すことができる
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
print('send')
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
print('receive')
print(data)
if __name__ == '__main__':
th = Thread(target=receive_state)
th.start()
time.sleep(3)
send_command()
Pythonによる家電(エミュレータ)の制御(状態要求)
家電の制御として,家電の現在の状態を取得するということをやってみます.上で説明した制御命令(応答あり・なし)に加え,ここで説明する状態要求が,家電制御のおもな種類です.状態要求に関しても応答があるため,ソケット通信のクライアントプログラムとサーバプログラムの両方を作成する必要があります.状態要求では,送信時のESV
の値は62
になり,応答時のESV
の値は72
になります.また,制御命令ではないため,PDC
の値は00
,EDT
の値は空
になります.これらを踏まえて,以下にサンプルコードを示します.サンプルコードでは,応答を5秒おきに出力します.サンプルコードを実行しつつ,MoekadenRoomの照明のボタンをクリックしてON/OFFを切り替えてみてください.出力されるEDTの値が変化するはずです.
import socket
import binascii
from threading import Thread
import time
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
while True:
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
print(data)
def confirm_state():
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '62',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '00',
format_echonet_lite[8]: ''
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
sock.sendto(msg, (ip, ECHONETport))
time.sleep(5)
if __name__ == '__main__':
th1 = Thread(target=receive_state)
th1.start()
confirm_state()
Flaskとの連携
最後に,Flaskを使用してHTTPサーバを立て,家電を制御します.HTTPサーバで家電の状態を管理することは,複数の家電を制御する際によく使う手法です.これを応用して,Androidから家電を制御することに利用したり,IoTのデータ収集にも利用することができます.まずは,そのための一歩として,Flaskと家電を制御するシンプルなプログラムを見てみましょう.ここでは,複数のthreadを使用するため,queue
を用います.queue
とは,入れた通りの順番に、データを取り出すことができる構造のことを言います.queue
の基本的な使い方は以下のようになります.
# キューの作成
q = Queue();
# キューへのデータの挿入は「put」、取り出しは「get」で行う
q_msg = {"name": "foge"}
q.put(q_msg)
q_result = q.get()
print(q_result["name"])
では,少し複雑になりますが,サンプルコードを示します.プログラムを実行する手順としては,はじめにMoekadenRoomを起動し,次にプログラムを実行し,最後にhttp://192.168.0.10:5000
にアクセスします.5秒おきに家電の状態を取得して出力,10秒おきに家電にON/OFF命令をするプログラムです.
import socket
import binascii
from flask import Flask
from queue import Queue
from threading import Thread
import time
app = Flask(__name__)
q = Queue()
state_q = Queue()
@app.route('/', methods=['GET'])
def home():
th1 = Thread(target=receive_state)
th1.start()
th2 = Thread(target=confirm_state)
th2.start()
th3 = Thread(target=send_command)
th3.start()
return 'This is a Sample Webpage'
def send_command():
while True:
time.sleep(10)
if state_q.empty():
state = '30'
else:
q_state_msg = state_q.get()
q_state = q_state_msg['state']
if q_state == '30':
state = '31'
else:
state = '30'
# MoekadenRoomを実行しているPCのIPアドレス(各自変更する)
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '60',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '01',
format_echonet_lite[8]: state # state: 30 or 31 (ON or OFF)
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(msg, (ip, ECHONETport))
print('send_command')
def receive_state():
ECHONETport = 3610
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', ECHONETport))
while True:
data, addr = sock.recvfrom(4096)
data = str(binascii.hexlify(data), 'utf-8')
queue_msg = q.get()
if state_q.empty():
data_format = queue_msg['format']
state = get_state(data, data_format)
q_state_msg = {
'state': state
}
state_q.put(q_state_msg)
print(data)
def confirm_state():
ip = '192.168.0.10'
ECHONETport = 3610
format_echonet_lite = ['EHD', 'TID', 'SEOJ', 'DEOJ', 'ESV', 'OPC', 'EPC', 'PDC', 'EDT']
data_format = {
format_echonet_lite[0]: '1081',
format_echonet_lite[1]: '0000',
format_echonet_lite[2]: '05FF01',
format_echonet_lite[3]: '029001',
format_echonet_lite[4]: '62',
format_echonet_lite[5]: '01',
format_echonet_lite[6]: '80',
format_echonet_lite[7]: '00',
format_echonet_lite[8]: ''
}
frame = ''
for key in format_echonet_lite:
frame += data_format[key]
msg = binascii.unhexlify(frame)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
sock.sendto(msg, (ip, ECHONETport))
print('send_confirm_state')
queue_msg = {
'format': data_format
}
q.put(queue_msg)
time.sleep(5)
# 16進数文字列のフレームからEDTの値を取得するための関数
def get_state(data, data_format):
data_list = []
data_pos = 0
for value in data_format.values():
element_size = len(value)
if element_size == 0:
element_size = 2
data_list.append(data[data_pos: data_pos + element_size])
data_pos += element_size
state = data_list[8]
return state
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
終わりに
PythonでECHONET Liteを用いて家電(エミュレータ)を制御しました.今回は最もシンプルに制御可能である照明を対象としました.今回のサンプルコードを元に,家電の様々な制御を実現してみてください.Pythonを使った家電の制御に関する情報は少ないので,Qiitaの記事を少しずつ増やしていきましょう.(家電を制御する最も適切なインタフェースが見つかるまで…)