18
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

コスパ最強IoT家電!TPLink製品をRaspberryPiから操作

Last updated at Posted at 2020-06-19

#TPLinkとは?
ルータを主力とする中国・深圳のネットワーク機器メーカーです。
近年はスマート電球、スマートプラグ等のIoT家電に力を入れており、コスパの良さからAmazonで独自の地位を築いています。

plug.jpg
             スマートプラグのHS105

bulb.jpg
             スマート電球のKL110

今回は、APIを使用して、
・機器のON-OFF操作
・ON-OFF、電球の明るさ等の情報取得
を、PythonおよびNode.jsで実行してみました

IoT家電として思いつく用途の多くを上記でカバーできるので
応用の可能性を感じる結果となりました!

#必要なもの

・PC
・RaspberryPi
・TPLink製スマートプラグあるいは電球
今回は下記3製品を試しました
HS105:スマートプラグ
KL110:ホワイト電球
KL130:カラー電球

##①データ取得の確認
まずは、TPLinkからデータが取得できるかターミナル上でテストします。

※参考にさせて頂いた記事
https://lmjs7.net/blog/tag/tp-link/
https://qiita.com/tmisuoka0423/items/582ff0c303abe8570ee5

###IPを調べる

tplink-smarthome-api(参考)をインストール

sudo npm install -g tplink-smarthome-api

下記コマンドで、接続しているTPLinkデバイス一覧を取得

tplink-smarthome-api search
HS105(JP) plug IOT.SMARTPLUGSWITCH 192.168.0.101 9999 B0BE76‥ スマートプラグ
KL110(JP) bulb IOT.SMARTBULB 192.168.0.102 9999 98DAC4‥ ホワイト電球
KL130(JP) bulb IOT.SMARTBULB 192.168.0.103 9999 0C8063‥ カラー電球

3つのデバイス全てが検出できていることが分かります

###デバイス動作情報の取得確認
下記コマンドで、デバイスの設定やOnOffが取得できる

tplink-smarthome-api getSysInfo [デバイスのIPアドレス]:9999

・KL130(カラー電球)の例

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
  ↓ここからがデバイスの設定
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
  ↑ここまでがデバイスの設定
  is_dimmable: 1,
  is_color: 1,
  :

on_off:0なら電源OFF、1なら電源ON
hue:色?(白色モードのとき0)
color_temp:色温度(白色モード以外のとき0)
brightness:明るさ(%単位)
と思われます

・KL110(ホワイト電球)の例

  :
  ctrl_protocols: { name: 'Linkie', version: '1.0' },
  ↓ここからがデバイスの設定
  light_state: {
    on_off: 1,
    mode: 'normal',
    hue: 0,
    saturation: 0,
    color_temp: 2700,
    brightness: 100
  },
  ↑ここまでがデバイスの設定
  is_dimmable: 1,
  is_color: 0,
  :

on_off:0なら電源OFF、1なら電源ON
hue:色相(白色モードのとき0)
saturation:彩度
color_temp:色温度(白色モード以外のとき0)
brightness:明るさ(%単位)
と思われます。
KL130とほぼ同じですが、カラーではないのでis_color: 0となっていると思われます。

・KL105(スマートプラグ)の例

  alias: '',
↓ここからがデバイスの設定
  relay_state: 1,
  on_time: 288,
  active_mode: 'none',
  feature: 'TIM',
  updating: 0,
  icon_hash: '',
  rssi: -52,
  led_off: 0,
  longitude_i: 1356352,
  latitude_i: 348422,
↑ここまでがデバイスの設定
  hwId: '047D‥',

relay_state:0なら電源OFF、1なら電源ON
on_time:連続電源ON時間
rssi: WiFiの信号強度
と思われます。
経度(logitude)と緯度(latitude)も表示されていますが、実際の場所と5キロくらいずれていて謎が深まります。

上記で、コマンドで欲しい情報が取得できることが確認できました!
次章以降で、プログラム(Node.js&Python)から取得・操作する方法を記載します。

##②Node.jsで状態取得
※「Pythonを使うからNode.jsの説明はいらん!」という方は、この章を飛ばして③に移動してください

こちらを参考に、Node.jsを
###npmにパスを通す(Windowsの場合)
Windowsだとnpmのグローバルインストール先にパスが通っておらず、Node.jsでモジュールが読み込めないので、下記を参考にパスを通してください
https://qiita.com/shiftsphere/items/5610f692899796b03f99

###npmにパスを通す(RaspberryPiの場合)
下記コマンドで、グローバルでのnpmモジュールインストール先を調べます
(なぜかWindowsのときのコマンド"npm bin -g"で見つかるフォルダとは違うようです)

npm ls -g 

下記コマンドで.profileを編集します。
※SSH環境では.profileの代わりに、.bash_profileを編集してください

nano /home/[ユーザ名]/.profile

.profileの最後に下記の1行を追加してrebootしてください

export NODE_PATH=[上で調べたパス]/node_modules

下記コマンドで指定したパスが表示されれば成功です

printenv NODE_PATH

###node.jsスクリプトの作成
下記スクリプトを作成します

tplink_test.js
const { Client } = require('tplink-smarthome-api');
const client = new Client();
client.getDevice({ host: '192.168.0.102' }).then(device => {
  device.getSysInfo().then(console.log);
});

下記コマンドでスクリプトを実行すると、①と同様に各種情報が取得できます

node tplink_test.js

※上記をcsvロギングするスクリプト(③のPythonスクリプトと同機能)も作成しましたが、私のJavaScriptスキルが低くうまく動作しないときがある(非同期部分の処理順が逆転する)ので、コードはここには貼らないこととします
下記GitHubにアップロードしたので、自己責任で改造して使用していただければと思います。
(願わくば無知な私に処理順が逆転する理由もコメント…頂けると嬉しいです笑)
https://github.com/c60evaporator/TPLink_Info_Nodejs

##③Pythonで状態取得
私のJavaScriptスキル不足でNode.jsでのロギングが上手くいかなかったので、
気を取り直してPythonで操作・ロギングするスクリプトを作りました。

PythonはNode.jsほど丁寧なドキュメントが見当たらず苦戦しましたが、こちらこちらのコードを解読して、スクリプトを作成しました。

###TPLink操作クラスの作成
上記コードを参考に、下記の3つのクラスを作成しました
TPLink_Common():プラグ、電球共通機能のクラス
TPLink_Plug():プラグ専用機能のクラス(TPLink_Common()を継承)
TPLink_Bulb():電球専用機能のクラス(TPLink_Common()を継承)
※2020/11/19修正:GetTPLinkData()クラスは冗長なので削除し、TPLink_Common().info_dict()メソッドに統合

tplink.py
import socket
from struct import pack
import json

#TPLink電球&プラグ共通クラス
class TPLink_Common():
    def __init__(self, ip, port=9999):
        """Default constructor
        """
        self.__ip = ip
        self.__port = port
    
    def info(self):
        cmd = '{"system":{"get_sysinfo":{}}}'
        receive = self.send_command(cmd)
        return receive

    def info_dict(self):
        rjson = self.info()
        rdict = json.loads(rjson)
        return rdict
    
    def send_command(self, cmd, timeout=10):
        try:
            sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock_tcp.settimeout(timeout)
            sock_tcp.connect((self.__ip, self.__port))
            sock_tcp.settimeout(None)
            sock_tcp.send(self.encrypt(cmd))
            data = sock_tcp.recv(2048)
            sock_tcp.close()

            decrypted = self.decrypt(data[4:])
            print("Sent:     ", cmd)
            print("Received: ", decrypted)
            return decrypted

        except socket.error:
            quit("Could not connect to host " + self.__ip + ":" + str(self.__port))
            return None

    def encrypt(self, string):
        key = 171
        result = pack('>I', len(string))
        for i in string:
            a = key ^ ord(i)
            key = a
            result += bytes([a])
        return result

    def decrypt(self, string):
        key = 171
        result = ""
        for i in string:
            a = key ^ i
            key = i
            result += chr(a)
        return result

#TPLinkプラグ操作用クラス
class TPLink_Plug(TPLink_Common):
    def on(self):
        cmd = '{"system":{"set_relay_state":{"state":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"system":{"set_relay_state":{"state":0}}}'
        receive = self.send_command(cmd)

    def ledon(self):
        cmd = '{"system":{"set_led_off":{"off":0}}}'
        receive = self.send_command(cmd)

    def ledoff(self):
        cmd = '{"system":{"set_led_off":{"off":1}}}'
        receive = self.send_command(cmd)
    
    def set_countdown_on(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":1,"name":"turn on"}}}'
        receive = self.send_command(cmd)

    def set_countdown_off(self, delay):
        cmd = '{"count_down":{"add_rule":{"enable":1,"delay":' + str(delay) +',"act":0,"name":"turn off"}}}'
        receive = self.send_command(cmd)
    
    def delete_countdown_table(self):
        cmd = '{"count_down":{"delete_all_rules":null}}'
        receive = self.send_command(cmd)

    def energy(self):
        cmd = '{"emeter":{"get_realtime":{}}}'
        receive = self.send_command(cmd)
        return receive

#TPLink電球操作用クラス
class TPLink_Bulb(TPLink_Common):
    def on(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":1}}}'
        receive = self.send_command(cmd)

    def off(self):
        cmd = '{"smartlife.iot.smartbulb.lightingservice":{"transition_light_state":{"on_off":0}}}'
        receive = self.send_command(cmd)

    def transition_light_state(self, hue: int = None, saturation: int = None, brightness: int = None,
                               color_temp: int = None, on_off: bool = None, transition_period: int = None,
                               mode: str = None, ignore_default: bool = None):
        # copy all given argument name-value pairs as a dict
        d = {k: v for k, v in locals().items() if k is not 'self' and v is not None}
        r = {
            'smartlife.iot.smartbulb.lightingservice': {
                'transition_light_state': d
            }
        }
        cmd = json.dumps(r)
        receive = self.send_command(cmd)
        print(receive)

    def brightness(self, brightness):
        self.transition_light_state(brightness=brightness)

    def purple(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=277, saturation=86, color_temp=0, brightness=brightness, transition_period=transition_period)

    def blue(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=240, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def cyan(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=180, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def green(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=120, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def yellow(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=60, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)

    def orange(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=39, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def red(self, brightness = None, transition_period = None):
        self.transition_light_state(hue=0, saturation=100, color_temp=0, brightness=brightness, transition_period=transition_period)
    
    def lamp_color(self, brightness = None):
        self.transition_light_state(color_temp=2700, brightness=brightness)

###TPLink操作クラスの実行方法
上記クラスは、Pythonコード上で下記のように実行できます

・電球の電源をONにしたいとき

TPLink_Bulb(電球のIPアドレス).on()

・プラグの電源をOFFにしたいとき

TPLink_Plug(プラグのIPアドレス).off()

・10秒後にプラグをONにしたいとき

TPLink_Plug(プラグのIPアドレス).set_countdown_on(10)

・電球の明るさを10%にしたいとき

TPLink_Bulb(電球のIPアドレス).brightness(10)

・電球を赤色にしたいとき(カラー電球のみ)

TPLink_Bulb(電球のIPアドレス).red()

・電球のOn-Off等の情報を取得

info = TPLink_Bulb(電球のIPアドレス).info_dict()

※上記メソッドは、取得したjson情報をdict形式に変換して出力されます。
出力される電球情報は①を参照ください
(クラスをTPLink_Plugにすればプラグ情報も取得可)

##④ロギング用Pythonスクリプトの作成
前章最後の方法を利用して、電球やプラグの情報をロギングするスクリプトを作成しました。
スクリプトの構造はこちらと同じなので、リンク先をご一読いただければと思います。

###設定ファイル
こちらの記事同様、管理をしやすくするため下記2種類の設定ファイルを作成しました
・DeviceList.csv:センサごとに必要情報を記載

DeviceList.csv
ApplianceName,ApplianceType,IP,Retry
TPLink_KL130_ColorBulb_1,TPLink_ColorBulb,192.168.0.103,2
TPLink_KL110_WhiteBulb_1,TPLink_WhiteBulb,192.168.0.102,2
TPLink_HS105_Plug_1,TPLink_Plug,192.168.0.101,2

カラムの意味は下記となります
ApplianceName:デバイス名を管理、同種類のデバイスが複数あるときの識別用
ApplianceType:デバイスの種類。
    TPLink_ColorBulb:カラー電球(KL130等)
    TPLink_WhiteBulb:白色電球(KL110等)
    TPLink_Plug:スマートプラグ(HS105等)
IP:デバイスのIPアドレス
Retry:最大再実行回数詳細(取得失敗時の再実行回数、詳しくはこちら

・config.ini:CSVおよびログ出力ディレクトリを指定

config.ini
[Path]
CSVOutput = /share/Data/Appliance
LogOutput = /share/Log/Appliance
どちらもsambaで作成した共有フォルダ内に出力すると、RaspberryPi外からアクセスできて便利です。

###実際のスクリプト

appliance_data_logger.py
from tplink import TPLink_Plug, TPLink_Bulb
import logging
from datetime import datetime, timedelta
import os
import csv
import configparser
import pandas as pd

#グローバル変数
global masterdate

######TPLinkのデータ取得######
def getdata_tplink(appliance):
    #データ値が得られないとき、最大appliance.Retry回スキャンを繰り返す
    for i in range(appliance.Retry):
        try:
            #プラグのとき
            if appliance.ApplianceType == 'TPLink_Plug':
                plg = TPLink_Plug(appliance.IP)
                applianceValue = plg.info_dict()
            #電球のとき
            elif appliance.ApplianceType == 'TPLink_ColorBulb' or appliance.ApplianceType == 'TPLink_WhiteBulb':
                blb = TPLink_Bulb(appliance.IP)
                applianceValue = blb.info_dict()
            else:
                applianceValue = None
        #エラー出たらログ出力
        except:
            logging.warning(f'retry to get data [loop{str(i)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
            applianceValue = None
            continue
        else:
            break
    
    #値取得できていたら、POSTするデータをdictに格納
    if applianceValue is not None:
        #プラグのとき
        if appliance.ApplianceType == 'TPLink_Plug':
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['relay_state']),
                'OnTime': str(applianceValue['system']['get_sysinfo']['on_time'])
            }
        #電球のとき
        else:
            data = {        
                'ApplianceName': appliance.ApplianceName,        
                'Date_Master': str(masterdate),
                'Date': str(datetime.today()),
                'IsOn': str(applianceValue['system']['get_sysinfo']['light_state']['on_off']),
                'Color': str(applianceValue['system']['get_sysinfo']['light_state']['hue']),
                'ColorTemp': str(applianceValue['system']['get_sysinfo']['light_state']['color_temp']),
                'Brightness': str(applianceValue['system']['get_sysinfo']['light_state']['brightness'])
            }
        return data
        
    #取得できていなかったら、ログ出力
    else:
        logging.error(f'cannot get data [loop{str(appliance.Retry)}, date{str(masterdate)}, appliance{appliance.ApplianceName}]')
        return None

######データのCSV出力######
def output_csv(data, csvpath):
    appliancename = data['ApplianceName']
    monthstr = masterdate.strftime('%Y%m')
    #出力先フォルダ名
    outdir = f'{csvpath}/{appliancename}/{masterdate.year}'
    #出力先フォルダが存在しないとき、新規作成
    os.makedirs(outdir, exist_ok=True)
    #出力ファイルのパス
    outpath = f'{outdir}/{appliancename}_{monthstr}.csv'
    
    #出力ファイル存在しないとき、新たに作成
    if not os.path.exists(outpath):        
        with open(outpath, 'w', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writeheader()
            writer.writerow(data)
    #出力ファイル存在するとき、1行追加
    else:
        with open(outpath, 'a', newline="") as f:
            writer = csv.DictWriter(f, data.keys())
            writer.writerow(data)

######メイン######
if __name__ == '__main__':    
    #開始時刻を取得
    startdate = datetime.today()
    #開始時刻を分単位で丸める
    masterdate = startdate.replace(second=0, microsecond=0)   
    if startdate.second >= 30:
        masterdate += timedelta(minutes=1)

    #設定ファイルとデバイスリスト読込
    cfg = configparser.ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    df_appliancelist = pd.read_csv('./ApplianceList.csv')
    #全センサ数とデータ取得成功数
    appliance_num = len(df_appliancelist)
    success_num = 0

    #ログの初期化
    logname = f"/appliancelog_{str(masterdate.strftime('%y%m%d'))}.log"
    logging.basicConfig(filename=cfg['Path']['LogOutput'] + logname, level=logging.INFO)

    #取得した全データ保持用dict
    all_values_dict = None

    ######デバイスごとにデータ取得######
    for appliance in df_appliancelist.itertuples():
        #ApplianceTypeがTPLinkeであることを確認
        if appliance.ApplianceType in ['TPLink_Plug','TPLink_ColorBulb','TPLink_WhiteBulb']:
            data = getdata_tplink(appliance)
        #上記以外
        else:
            data = None        

        #データが存在するとき、全データ保持用Dictに追加し、CSV出力
        if data is not None:
            #all_values_dictがNoneのとき、新たに辞書を作成
            if all_values_dict is None:
                all_values_dict = {data['ApplianceName']: data}
            #all_values_dictがNoneでないとき、既存の辞書に追加
            else:
                all_values_dict[data['ApplianceName']] = data

            #CSV出力
            output_csv(data, cfg['Path']['CSVOutput'])
            #成功数プラス
            success_num+=1

    #処理終了をログ出力
    logging.info(f'[masterdate{str(masterdate)} startdate{str(startdate)} enddate{str(datetime.today())} success{str(success_num)}/{str(appliance_num)}]')

上記を実行すれば、設定ファイル"CSVOutput"で指定したフォルダに、取得データがデバイス名と日時の名称でCSV出力されます
tplinkcsv.png

以上で、情報取得が完了です

##おわりに
RaspberrypPiで24時間稼働、かつPythonはIFTTTよりも自由度が高いので、色々なアイデアを具現化可能です
・人感センサと組み合わせて、人が入ったら電気が点くようにする
・30分以上人がいなければ、電気を消す
・人によって電球の明るさを自動で切り替える
などなどです。

いくつか作りたいものがあるので、製作が完了したらまた記事にしようと思います

18
31
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
31

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?