#TPLinkとは?
ルータを主力とする中国・深圳のネットワーク機器メーカーです。
近年はスマート電球、スマートプラグ等のIoT家電に力を入れており、コスパの良さからAmazonで独自の地位を築いています。
今回は、APIを使用して、
・機器のON-OFF操作
・ON-OFF、電球の明るさ等の情報取得
を、PythonおよびNode.jsで実行してみました
IoT家電として思いつく用途の多くを上記でカバーできるので
応用の可能性を感じる結果となりました!
#必要なもの
・PC
・RaspberryPi
・TPLink製スマートプラグあるいは電球
今回は下記3製品を試しました
HS105:スマートプラグ
KL110:ホワイト電球
KL130:カラー電球
##①データ取得の確認
まずは、TPLinkからデータが取得できるかターミナル上でテストします。
※参考にさせて頂いた記事
https://lmjs7.net/blog/tag/tp-link/
https://qiita.com/tmisuoka0423/items/582ff0c303abe8570ee5
###IPを調べる
tplink-smarthome-api(参考)をインストール
sudo npm install -g tplink-smarthome-api
下記コマンドで、接続しているTPLinkデバイス一覧を取得
tplink-smarthome-api search
HS105(JP) plug IOT.SMARTPLUGSWITCH 192.168.0.101 9999 B0BE76‥ スマートプラグ
KL110(JP) bulb IOT.SMARTBULB 192.168.0.102 9999 98DAC4‥ ホワイト電球
KL130(JP) bulb IOT.SMARTBULB 192.168.0.103 9999 0C8063‥ カラー電球
3つのデバイス全てが検出できていることが分かります
###デバイス動作情報の取得確認
下記コマンドで、デバイスの設定やOnOffが取得できる
tplink-smarthome-api getSysInfo [デバイスのIPアドレス]:9999
・KL130(カラー電球)の例
:
ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ここからがデバイスの設定
light_state: {
on_off: 1,
mode: 'normal',
hue: 0,
saturation: 0,
color_temp: 2700,
brightness: 100
},
↑ここまでがデバイスの設定
is_dimmable: 1,
is_color: 1,
:
on_off:0なら電源OFF、1なら電源ON
hue:色?(白色モードのとき0)
color_temp:色温度(白色モード以外のとき0)
brightness:明るさ(%単位)
と思われます
・KL110(ホワイト電球)の例
:
ctrl_protocols: { name: 'Linkie', version: '1.0' },
↓ここからがデバイスの設定
light_state: {
on_off: 1,
mode: 'normal',
hue: 0,
saturation: 0,
color_temp: 2700,
brightness: 100
},
↑ここまでがデバイスの設定
is_dimmable: 1,
is_color: 0,
:
on_off:0なら電源OFF、1なら電源ON
hue:色相(白色モードのとき0)
saturation:彩度
color_temp:色温度(白色モード以外のとき0)
brightness:明るさ(%単位)
と思われます。
KL130とほぼ同じですが、カラーではないのでis_color: 0となっていると思われます。
・KL105(スマートプラグ)の例
alias: '',
↓ここからがデバイスの設定
relay_state: 1,
on_time: 288,
active_mode: 'none',
feature: 'TIM',
updating: 0,
icon_hash: '',
rssi: -52,
led_off: 0,
longitude_i: 1356352,
latitude_i: 348422,
↑ここまでがデバイスの設定
hwId: '047D‥',
relay_state:0なら電源OFF、1なら電源ON
on_time:連続電源ON時間
rssi: WiFiの信号強度
と思われます。
経度(logitude)と緯度(latitude)も表示されていますが、実際の場所と5キロくらいずれていて謎が深まります。
上記で、コマンドで欲しい情報が取得できることが確認できました!
次章以降で、プログラム(Node.js&Python)から取得・操作する方法を記載します。
##②Node.jsで状態取得
※「Pythonを使うからNode.jsの説明はいらん!」という方は、この章を飛ばして③に移動してください
こちらを参考に、Node.jsを
###npmにパスを通す(Windowsの場合)
Windowsだとnpmのグローバルインストール先にパスが通っておらず、Node.jsでモジュールが読み込めないので、下記を参考にパスを通してください
https://qiita.com/shiftsphere/items/5610f692899796b03f99
###npmにパスを通す(RaspberryPiの場合)
下記コマンドで、グローバルでのnpmモジュールインストール先を調べます
(なぜかWindowsのときのコマンド"npm bin -g"で見つかるフォルダとは違うようです)
npm ls -g
下記コマンドで.profileを編集します。
※SSH環境では.profileの代わりに、.bash_profileを編集してください
nano /home/[ユーザ名]/.profile
.profileの最後に下記の1行を追加してrebootしてください
export NODE_PATH=[上で調べたパス]/node_modules
下記コマンドで指定したパスが表示されれば成功です
printenv NODE_PATH
###node.jsスクリプトの作成
下記スクリプトを作成します
const { Client } = require('tplink-smarthome-api');
const client = new Client();
client.getDevice({ host: '192.168.0.102' }).then(device => {
device.getSysInfo().then(console.log);
});
下記コマンドでスクリプトを実行すると、①と同様に各種情報が取得できます
node tplink_test.js
※上記をcsvロギングするスクリプト(③のPythonスクリプトと同機能)も作成しましたが、私のJavaScriptスキルが低くうまく動作しないときがある(非同期部分の処理順が逆転する)ので、コードはここには貼らないこととします
下記GitHubにアップロードしたので、自己責任で改造して使用していただければと思います。
(願わくば無知な私に処理順が逆転する理由もコメント…頂けると嬉しいです笑)
https://github.com/c60evaporator/TPLink_Info_Nodejs
##③Pythonで状態取得
私のJavaScriptスキル不足でNode.jsでのロギングが上手くいかなかったので、
気を取り直してPythonで操作・ロギングするスクリプトを作りました。
PythonはNode.jsほど丁寧なドキュメントが見当たらず苦戦しましたが、こちらやこちらのコードを解読して、スクリプトを作成しました。
###TPLink操作クラスの作成
上記コードを参考に、下記の3つのクラスを作成しました
TPLink_Common():プラグ、電球共通機能のクラス
TPLink_Plug():プラグ専用機能のクラス(TPLink_Common()を継承)
TPLink_Bulb():電球専用機能のクラス(TPLink_Common()を継承)
※2020/11/19修正:GetTPLinkData()クラスは冗長なので削除し、TPLink_Common().info_dict()メソッドに統合
import socket
from struct import pack
import json
#TPLink電球&プラグ共通クラス
class TPLink_Common():
def __init__(self, ip, port=9999):
"""Default constructor
"""
self.__ip = ip
self.__port = port
def info(self):
cmd = '{"system":{"get_sysinfo":{}}}'
receive = self.send_command(cmd)
return receive
def info_dict(self):
rjson = self.info()
rdict = json.loads(rjson)
return rdict
def send_command(self, cmd, timeout=10):
try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(timeout)
sock_tcp.connect((self.__ip, self.__port))
sock_tcp.settimeout(None)
sock_tcp.send(self.encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()
decrypted = self.decrypt(data[4:])
print("Sent: ", cmd)
print("Received: ", decrypted)
return decrypted
except socket.error:
quit("Could not connect to host " + self.__ip + ":" + str(self.__port))
return None
def encrypt(self, string):
key = 171
result = pack('>I', len(string))
for i in string:
a = key ^ ord(i)
key = a
result += bytes([a])
return result
def decrypt(self, string):
key = 171
result = ""
for i in string:
a = key ^ i
key = i
result += chr(a)
return result
#TPLinkプラグ操作用クラス
class TPLink_Plug(TPLink_Common):
def on(self):
cmd = '{"system":{"set_relay_state":{"state":1}}}'
receive = self.send_command(cmd)
def off(self):
cmd = '{"system":{"set_relay_state":{"state":0}}}'
receive = self.send_command(cmd)
def ledon(self):
cmd = '{"system":{"set_led_off":{"off":0}}}'
receive = self.send_command(cmd)
def ledoff(self):
cmd = '{"system":{"set_led_off":{"off":1}}}'
receive = self.send_command(cmd)
def set_countdown_on(self, delay):
cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":1,"name":"turn on"}}}'
receive = self.send_command(cmd)
def set_countdown_off(self, delay):
cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":0,"name":"turn off"}}}'
receive = self.send_command(cmd)
def delete_countdown_table(self):
cmd = '{"count_down":{"delete_all_rules":null}}'
receive = self.send_command(cmd)
def energy(self):
cmd = '{"emeter":{"get_realtime":{}}}'
receive = self.send_command(cmd)
return receive
#TPLink電球操作用クラス
class TPLink_Bulb(TPLink_Common):
def on(self):
cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":1}}}'
receive = self.send_command(cmd)
def off(self):
cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":0}}}'
receive = self.send_command(cmd)
def transition_light_state(self, hue: int = None, saturation: int = None, brightness: int = None,
color_temp: int = None, on_off: bool = None, transition_period: int = None,
mode: str = None, ignore_default: bool = None):
# copy all given argument name-value pairs as a dict
d = {k: v for k, v in locals().items() if k is not 'self' and v is not None}
r = {
'smartlife.iot.smartbulb.lightingservice': {
'transition_light_state': d
}
}
cmd = json.dumps(r)
receive = self.send_command(cmd)
print(receive)
def brightness(self, brightness):
self.transition_light_state(brightness=brightness)
def purple(self, brightness = None, transition_period = None):
self.transition_light_state(hue=277, saturation=86, color_temp=0, brightness=brightness, transition_period=transition_period)
def blue(self, brightness = None, transition_period = None):
self.transition_light_state(hue=240, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def cyan(self, brightness = None, transition_period = None):
self.transition_light_state(hue=180, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def green(self, brightness = None, transition_period = None):
self.transition_light_state(hue=120, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def yellow(self, brightness = None, transition_period = None):
self.transition_light_state(hue=60, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def orange(self, brightness = None, transition_period = None):
self.transition_light_state(hue=39, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def red(self, brightness = None, transition_period = None):
self.transition_light_state(hue=0, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
def lamp_color(self, brightness = None):
self.transition_light_state(color_temp=2700, brightness=brightness)
###TPLink操作クラスの実行方法
上記クラスは、Pythonコード上で下記のように実行できます
・電球の電源をONにしたいとき
TPLink_Bulb(電球のIPアドレス).on()
・プラグの電源をOFFにしたいとき
TPLink_Plug(プラグのIPアドレス).off()
・10秒後にプラグをONにしたいとき
TPLink_Plug(プラグのIPアドレス).set_countdown_on(10)
・電球の明るさを10%にしたいとき
TPLink_Bulb(電球のIPアドレス).brightness(10)
・電球を赤色にしたいとき(カラー電球のみ)
TPLink_Bulb(電球のIPアドレス).red()
・電球のOn-Off等の情報を取得
info = TPLink_Bulb(電球のIPアドレス).info_dict()
※上記メソッドは、取得したjson情報をdict形式に変換して出力されます。
出力される電球情報は①を参照ください
(クラスをTPLink_Plugにすればプラグ情報も取得可)
##④ロギング用Pythonスクリプトの作成
前章最後の方法を利用して、電球やプラグの情報をロギングするスクリプトを作成しました。
スクリプトの構造はこちらと同じなので、リンク先をご一読いただければと思います。
###設定ファイル
こちらの記事同様、管理をしやすくするため下記2種類の設定ファイルを作成しました
・DeviceList.csv:センサごとに必要情報を記載
DeviceList.csv
ApplianceName,ApplianceType,IP,Retry
TPLink_KL130_ColorBulb_1,TPLink_ColorBulb,192.168.0.103,2
TPLink_KL110_WhiteBulb_1,TPLink_WhiteBulb,192.168.0.102,2
TPLink_HS105_Plug_1,TPLink_Plug,192.168.0.101,2
カラムの意味は下記となります
ApplianceName:デバイス名を管理、同種類のデバイスが複数あるときの識別用
ApplianceType:デバイスの種類。
TPLink_ColorBulb:カラー電球(KL130等)
TPLink_WhiteBulb:白色電球(KL110等)
TPLink_Plug:スマートプラグ(HS105等)
IP:デバイスのIPアドレス
Retry:最大再実行回数詳細(取得失敗時の再実行回数、詳しくはこちら)
・config.ini:CSVおよびログ出力ディレクトリを指定
config.ini
[Path]
CSVOutput = /share/Data/Appliance
LogOutput = /share/Log/Appliance
どちらもsambaで作成した共有フォルダ内に出力すると、RaspberryPi外からアクセスできて便利です。
###実際のスクリプト
from tplink import TPLink_Plug, TPLink_Bulb
import logging
from datetime import datetime, timedelta
import os
import csv
import configparser
import pandas as pd
#グローバル変数
global masterdate
######TPLinkのデータ取得######
def getdata_tplink(appliance):
#データ値が得られないとき、最大appliance.Retry回スキャンを繰り返す
for i in range(appliance.Retry):
try:
#プラグのとき
if appliance.ApplianceType == 'TPLink_Plug':
plg = TPLink_Plug(appliance.IP)
applianceValue = plg.info_dict()
#電球のとき
elif appliance.ApplianceType == 'TPLink_ColorBulb' or appliance.ApplianceType == 'TPLink_WhiteBulb':
blb = TPLink_Bulb(appliance.IP)
applianceValue = blb.info_dict()
else:
applianceValue = None
#エラー出たらログ出力
except:
logging.warning(f'retry to get data [loop{str(i)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
applianceValue = None
continue
else:
break
#値取得できていたら、POSTするデータをdictに格納
if applianceValue is not None:
#プラグのとき
if appliance.ApplianceType == 'TPLink_Plug':
data = {
'ApplianceName': appliance.ApplianceName,
'Date_Master': str(masterdate),
'Date': str(datetime.today()),
'IsOn': str(applianceValue['system']['get_sysinfo']['relay_state']),
'OnTime': str(applianceValue['system']['get_sysinfo']['on_time'])
}
#電球のとき
else:
data = {
'ApplianceName': appliance.ApplianceName,
'Date_Master': str(masterdate),
'Date': str(datetime.today()),
'IsOn': str(applianceValue['system']['get_sysinfo']['light_state']['on_off']),
'Color': str(applianceValue['system']['get_sysinfo']['light_state']['hue']),
'ColorTemp': str(applianceValue['system']['get_sysinfo']['light_state']['color_temp']),
'Brightness': str(applianceValue['system']['get_sysinfo']['light_state']['brightness'])
}
return data
#取得できていなかったら、ログ出力
else:
logging.error(f'cannot get data [loop{str(appliance.Retry)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
return None
######データのCSV出力######
def output_csv(data, csvpath):
appliancename = data['ApplianceName']
monthstr = masterdate.strftime('%Y%m')
#出力先フォルダ名
outdir = f'{csvpath}/{appliancename}/{masterdate.year}'
#出力先フォルダが存在しないとき、新規作成
os.makedirs(outdir, exist_ok=True)
#出力ファイルのパス
outpath = f'{outdir}/{appliancename}_{monthstr}.csv'
#出力ファイル存在しないとき、新たに作成
if not os.path.exists(outpath):
with open(outpath, 'w', newline="") as f:
writer = csv.DictWriter(f, data.keys())
writer.writeheader()
writer.writerow(data)
#出力ファイル存在するとき、1行追加
else:
with open(outpath, 'a', newline="") as f:
writer = csv.DictWriter(f, data.keys())
writer.writerow(data)
######メイン######
if __name__ == '__main__':
#開始時刻を取得
startdate = datetime.today()
#開始時刻を分単位で丸める
masterdate = startdate.replace(second=0, microsecond=0)
if startdate.second >= 30:
masterdate += timedelta(minutes=1)
#設定ファイルとデバイスリスト読込
cfg = configparser.ConfigParser()
cfg.read('./config.ini', encoding='utf-8')
df_appliancelist = pd.read_csv('./ApplianceList.csv')
#全センサ数とデータ取得成功数
appliance_num = len(df_appliancelist)
success_num = 0
#ログの初期化
logname = f"/appliancelog_{str(masterdate.strftime('%y%m%d'))}.log"
logging.basicConfig(filename=cfg['Path']['LogOutput'] + logname, level=logging.INFO)
#取得した全データ保持用dict
all_values_dict = None
######デバイスごとにデータ取得######
for appliance in df_appliancelist.itertuples():
#ApplianceTypeがTPLinkeであることを確認
if appliance.ApplianceType in ['TPLink_Plug','TPLink_ColorBulb','TPLink_WhiteBulb']:
data = getdata_tplink(appliance)
#上記以外
else:
data = None
#データが存在するとき、全データ保持用Dictに追加し、CSV出力
if data is not None:
#all_values_dictがNoneのとき、新たに辞書を作成
if all_values_dict is None:
all_values_dict = {data['ApplianceName']: data}
#all_values_dictがNoneでないとき、既存の辞書に追加
else:
all_values_dict[data['ApplianceName']] = data
#CSV出力
output_csv(data, cfg['Path']['CSVOutput'])
#成功数プラス
success_num+=1
#処理終了をログ出力
logging.info(f'[masterdate{str(masterdate)} startdate{str(startdate)} enddate{str(datetime.today())} success{str(success_num)}/{str(appliance_num)}]')
上記を実行すれば、設定ファイル"CSVOutput"で指定したフォルダに、取得データがデバイス名と日時の名称でCSV出力されます
以上で、情報取得が完了です
##おわりに
RaspberrypPiで24時間稼働、かつPythonはIFTTTよりも自由度が高いので、色々なアイデアを具現化可能です
・人感センサと組み合わせて、人が入ったら電気が点くようにする
・30分以上人がいなければ、電気を消す
・人によって電球の明るさを自動で切り替える
などなどです。
いくつか作りたいものがあるので、製作が完了したらまた記事にしようと思います