LoginSignup
12
8

More than 3 years have passed since last update.

AWS IoT Device Shadowを試すためのpython mock

Last updated at Posted at 2017-02-19

はじめに

AWS IoTを利用する1つの理由として、Thing shadowによるThing/Deviceコントロールが出来ることがあるかと思います。
一方で概要は理解できたものの、"Thing"がないから動作確認ができないとお思いの方も多いかと、、
こちらについては実はDeviceSDKを使えばLinux上で確認が可能です。
ここでは、EC2を仮想デバイスとして、Thing shadowの動作確認をしてみようと思います。

[追記]
Shadow意味について質問されたこともあるのでちょっと追記しておきます。
本サンプルなどのように1台のデバイスの管理であれば、人手で整理するのも難しくないと思いますが、IoTの場合、多数のセンサーやGatewayの管理が必要になります。また、必ずしも近くにあるとも限りません。その多数/遠方にあるデバイスの設定をクラウド経由で管理/変更出来ることがshadowの価値だと思います。

準備

AWS IoT

Thingの作成

registryを選択し、thingsを選択。Thingペインが表示されたら、画面上部のcreateを押す。
スクリーンショット 2017-02-07 23.08.11.png

Thingの作成ですが、任意の名前を設定し、create thingを押す。
ここではthinnameを、shadow-testとしてます。

Thing作成後の画面で、Certificateを選択。表示された画面のcreate certificateを押す。
スクリーンショット 2017-02-07 23.11.03.png

作成された証明書3つをダウンロードしておいてください。(cert.pem/public.key/private.key)
また画面下部に表示された Activeを押し、証明書を有効化します。

Endpointの確認

AWS IoTコンソール、画面左下にあるSettingsを押し、表示された画面にエンドポイント確認、textなどにコピーしておいてください。
endpointはregionごとに一つとなります。
{random}.iot.{region}.amazon.com
の形で表示されます。

EC2

EC2の起動

EC2でamazon linuxを起動してください。t2.microで十分です。
EC2の起動方法がわからない方、こちらをご参考に。
HDDサイズもデフォルトで十分です。SSHでログインするためのGlobal IPの設定をお忘れなく。

EC2へIoT DeviceSDKのinstall

mockはpythonSDKで作っているのでPythonSDKをインストールします。
EC2のコンソールから以下のコマンドを発行

pip install AWSIoTPythonSDK

dummyプログラム(mock)の設置

必要なものとして

  • プログラム本体
  • setup.json
  • 証明書 3つ (rootCA/private/cert)

を配置します
本投稿では、/tmpの下で作業することを想定して記述します。

mkdir /tmp/dummy-shadow
cd /tmp/dummy-shadow
mkdir certs
cd certs

証明書の設定

AWS IoT Coreには現在2つのエンドポイントが提供されており、エンドポイント毎に適切な証明書をご選択下さい。詳細はこちら
推奨は、Amazon Trust Services エンドポイント となります。

curl https://www.amazontrust.com/repository/AmazonRootCA1.pem -o /tmp/root.pem

またこのcertsディレクトリに先程AWS IoTで作成したprivate/certの証明書を配置します。
scp/viでのコピペでも構いません。ファイル名は
cert => cert.pem
private => private.pem
としてください。以下の結果になるようにしてください。

ls -1 /tmp/certs
cert.pem
private.pem
root.pem

dummy(mock)の設置

/tmp/dummy-shadowで以下のファイルを配置します。
YOUR_AWSIOT_DOMAINは各自の環境に併せて、設定して下さい。
ドメインの確認は、AWS IoTのコンソール Settings => Custom endpointを参照して下さい。

setup.json
{
   "AWSIoT": {
        "ENDPOINT":"YOUR_AWSIOT_DOMAIN",
        "CERT_PATH":"./certs/",
        "KEYS":["cert.pem", "public.pem", "root.pem"]
   },
   "DEVICE":{
        "NAME":"shadow-test",
   }
}

以下にdummyプログラム

dummy-shadow.py
import os
import sys
import json
import time
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient


CONFIG_FILE = './setup.json'

#アプリ起動時の状態を固定で定義(shadowと同期する変数)
SIGNAL    = "green"
WAIT_TIME = 10

#コールバック関数からshadowを読み出すのでglobalで定義
shadow         = None
shadow_hundler = None

##########################################################
# setup.jsonから設定を読み出す
##########################################################
def readConfig():
    print 'start readConfig func'

    try:
        # read config file
        f = open(CONFIG_FILE, "r")
        jsonData = json.load(f)
        f.close()

        list = {}
        list["ENDPOINT"]    = jsonData["AWSIoT"]["ENDPOINT"]
        list["CERT_PATH"]   = jsonData["AWSIoT"]["CERT_PATH"]
        list["DEVICE_NAME"] = jsonData["DEVICE"]["NAME"]

        return list

    except Exception as e:
        print 'Config load error'
        print e.message
        sys.exit()

##########################################################
# Shadow update命令の発行
##########################################################
def updateThing(report):
    try: 
        report_json = '{"state":{"reported":'+ report + '}}'
        print "send currnet status to cloud-shadow"
        print report_json
        shadow_hundler.shadowUpdate(report_json, None, 5)

        return

    except Exception as e:
        print e.message
        sys.exit()

##########################################################
# shadowRegisterDeltaCallbackからコールバック型で呼び出される
#
##########################################################
def getDelta(payload, responseStatus, token):
    try:
        print '======get Delta======'
        dict_delta = json.loads(payload)
        print dict_delta
        state = dict_delta["state"]

        if state.has_key("wait_time"):
            delta = int(state["wait_time"])
            global WAIT_TIME
            WAIT_TIME = delta
            print "wait time change :%s" % WAIT_TIME
        if state.has_key('signal'):
            global SIGNAL
            SIGNAL = state['signal']
            print "SIGNAL change :%s" % SIGNAL

        report = json.dumps(dict_delta["state"])
        updateThing(report)

        return

    except Exception as e:
        print "Error on Delta function"
        print e.message
        raise

################################################
# Shadow接続の手続き関数
################################################
def initShadow(Config):
    ##--need device cert / private / rootCA--
    # rootCA: get from symantec
    ROOT_KEY    = Config['CERT_PATH'] + 'root.pem'
    CERT_KEY    = Config['CERT_PATH'] + 'cert.pem'
    PRIVATE_KEY = Config['CERT_PATH'] + 'private.pem'

    try:
        # init shadow connect procedure
        global shadow
        shadow = AWSIoTMQTTShadowClient(Config["DEVICE_NAME"])
        shadow.configureEndpoint(Config["ENDPOINT"], 8883)    # Setting URL-ENDPOINT & Port
        shadow.configureCredentials(ROOT_KEY, PRIVATE_KEY, CERT_KEY ) # Cert file setting
        shadow.configureConnectDisconnectTimeout(10)# CONNACK wait time (sec)
        shadow.configureMQTTOperationTimeout(5)     # QoS1 publish (sec)
        print 'start connct shadow'
        shadow.connect()
        print 'shadow connect'

        return

    except Exception as e:
        print 'Error on Init Shadow'
        raise

####
if __name__ == '__main__':
    Config = readConfig()

    try:
        initShadow(Config)
        print 'satrt subscribe shadow'
        shadow_hundler = shadow.createShadowHandlerWithName(Config['DEVICE_NAME'], True)
        shadow_hundler.shadowRegisterDeltaCallback(getDelta)
        default_state = '{"wait_time":' + str(WAIT_TIME) + ', "signal":"green"}'
        updateThing(default_state)

        while True:
            time.sleep(WAIT_TIME)
            print 'Currnet WAIT_TIME=%s' % str(WAIT_TIME)
            pass

    except KeyboardInterrupt:
         print 'Keyboard Interrupt'
         sys.exit()

    except Exception as e:
        print e.message
        sys.exit()

実行方法

python PROGRAM.py &

でバックグラウンド起動します。

shadow update

コンソールもしくは、CLIでshadow updateをします。
Resistor => Thing => [Thing名] => Shadowを選択します。

以下のようにdesiredセクションを追加し、wait_time:30 , signal:"green"で登録してみます。
するとdeltaとして、 wait_time:30 つまり差分のみがdeltaとしてthingにレポートされます。

スクリーンショット 2017-02-19 22.20.44.png

プログラム側のコンソールを見ると、deltaを受けソレが表示さているはずです。
で、コードの方をみてもらうとわかりますが、変数のwait_timeを変更し、[state]を送りかえしています。
deltaのstateには、結果整合としてあるべき状態が書いてあるので、正しく状態遷移が出来たらこれをreportするだけでいちいちJSONを作成する必要はありません。
上記の通り、正しく状態遷移出来ない状態で report するとズレますので要注意。

このレポートが送られるとdeltaがなくなり、desired/reportedが同じ値になっているはずです。

大まかなシーケンス

大まかなシーケンスを以下に示します。

スクリーンショット 2017-02-19 22.38.16.png

プログラム起動時に report statusとして ハードコーディングされている
SIGNAL = "green"
WAIT_TIME = 10
を通知します。何度か、shadowの状態を変えた場合はプログラム起動直後に上記初期パラメータを通知し、即deltaを受ける事になります。

免責

本投稿は、個人の意見で、所属する企業や団体は関係ありません。
また掲載しているsampleプログラムの動作に関しても保障いたしませんので、参考程度にしてください。

12
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
8