2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS IoTのMQTT over WebSocketでHTMLからsubscribeしリアルタイム処理

Posted at

IoTデバイスでセンシングしたデータをAWS IoTにPublishし、HTML(Webアプリケーション)からsubscribeし、リアルタイム表示をします。

#AWS IoTの設定
AWS IoTでデバイスからのセンサデータを受けられるように設定します。
##モノの登録
AWS IoTで[モノの登録]ボタンをクリックします。
image.png
[単一のモノを作成する]ボタンをクリック。
image.png
"Thing Registryにデバイスを追加"画面で登録するデバイス(モノ)の名前を登録します。ここでは"Things"という名前にし、[次へ]に進みます。
image.png
AWS IoTへデバイスを接続するための証明書を作成します。今回は、"1-Click証明書作成"の[証明書を作成]をクリックします。
image.png
デバイスの証明書、"cert.pem"と、パブリックキー"public.key"、プライベートキー"private.key"が生成されるので、ローカルにダウンロードしておきます。
また、AWS IoTのルートCA"pem"もダウンロードしておきます。
image.png

[ポリシーの作成]ボタンをクリックすると、下図画面が表示されます。
まだ、ポリシーが存在しないため、[新規ポリシーの作成]ボタンをクリックします。
image.png

##ポリシーの作成
AWS IoTリソースへのアクセス許可をモノに付与するためポリシーを設定します。左メニューの[安全性]_[ポリシー]を選択し、[ポリシーの作成]をクリックします。
image.png
ここでは、"Things_policy"という名前にしました。ステートメントを追加には、アドバンスモードに遷移し、下記JSONコードをペーストします。
image.png

statement.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:*"
      ],
      "Resource": [
        "*"
      ]
    }
  ]
}

改めて、先程作成したモノを選択し、"セキュリティ"を選択し、表示されているモノ(下図では(fbd8b...)を選択します。
image.png
右上の"アクション"から"ポリシーのアタッチ"を選択します。
image.png
先程作成したポリシーを選択し、[アタッチ]します。
image.png

##エンドポイントの確認
モノが登録されると、デバイスは、アカウントのデバイスデータエンドポイントを使用して AWS に接続できるようになります。
左メニューの[設定]を選択すると、エンドポイントが表示されますので、このエンドポイントを控えておきます。
image.png

#デバイスを準備
ここでは、ローカルPCを使用しPythonで、疑似デバイスを作成してみます。
下記Pythonコードを準備します。

device.py
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import datetime
import argparse
import json
import uuid
import random

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")

# add parameters ----
host = "xxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"  # 先程控えたエンドポイント
rootCAPath = "xxxxxxxxxxx.pem"  # root ca
certificatePath = "xxxxxxxxxx-certificate.pem.crt"  # cerfiticate for things
privateKeyPath = "xxxxxxxxxx-private.pem.key"  # private key
port = 8883
useWebsocket = False
clientId = "Things"  # things name
topic = "things/topic"  # topic
mode = "both"  # tx and rx
# --------------------

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# add ----
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
time.sleep(2)

# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if mode == 'both' or mode == 'publish':
        u4 = str(uuid.uuid4())
        message = {}
        message['count'] = loopCount
        messageJson = json.dumps(message)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if mode == 'publish':
            print('%s Published topic %s: %s\n' % (loopCount, topic, messageJson))
        loopCount += 1
    time.sleep(1)
# ----------

デバイスの証明書、パブリックキー、プライベートキー、AWS IoTのルート証明書は、先程ダウンロードしたファイルを指定してください。

#PublishされたMQTTメッセージの確認
上記Pythonコードを下記コマンドで実行すると、{"count":xxx} (xxxは0,1,2...とインクリメントされる数字)というJSONメッセージがPublishされます。

python device.py

AWS IoTの左メニューから[テスト]を選択すると、"MQTTテストクライアント"画面が表示されます。
ここで、"トピックをサブスクライブする"で、Pythonコードにしていしたトピック(今回は"things/topic"を入力し、[サブスクライブ]ボタンをクリックします。
image.png
サブスクライブに成功すると、下図のようにデバイスからのメッセージを確認することができます。
image.png

#HTMLからサブスクライブ
デバイスからのメッセージをAWS IoT上で確認することができれば、次にHTMLから、該当トピックにサブスクライブし、javascriptを使ってリアルタイム表示します。

##AWS IoTにSubscribeできるIAMユーザを作成
IAM管理コンソールから、[ユーザー]_[ユーザーを追加]を選択します。
image.png
任意ユーザー名を入力し、AWSアクセスの種類は☑プログラムによるアクセスを選択し、次のステップに進みます。
image.png
[既存のポリシーを直接アタッチ]を選択し、ポリシーには、AWSIoTFullAccessを選択します。
image.png
必要に応じ、タグを追加後、ユーザーを作成します。
ユーザーが作成されるとアクセスキーIDとシークレットアクセスキーが生成されます。[.csvのダウンロード]でアクセスキーIDとシークレットアクセスキーが記録されているcsvファイルをダウンロードし、保管しておきまます。
image.png

##HTMLファイルの作成
これで、AWS IoTのリソースにアクセスする準備が完了しました。
次に下記HTMLファイルを作成します。

subscribe.html
<html lang="ja">
<body>
  <ul id="chat">
    <li v-for="m in messages">{{ m }}</li>
  </ul>
 
  <script src="https://cdnjs.cloudflare.com/ajax/libs/vue/1.0.16/vue.min.js" type="text/javascript"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.11.2/moment.min.js" type="text/javascript"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.2/components/core-min.js" type="text/javascript"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.2/components/hmac-min.js" type="text/javascript"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/crypto-js/3.1.2/components/sha256-min.js" type="text/javascript"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
  <script type="text/javascript">
    var data = {
      messages: []
    };
 
    new Vue({
      el: '#chat',
      data: data
    });
 
    function SigV4Utils(){}
 
    SigV4Utils.sign = function(key, msg) {
      var hash = CryptoJS.HmacSHA256(msg, key);
      return hash.toString(CryptoJS.enc.Hex);
    };
 
    SigV4Utils.sha256 = function(msg) {
      var hash = CryptoJS.SHA256(msg);
      return hash.toString(CryptoJS.enc.Hex);
    };
 
    SigV4Utils.getSignatureKey = function(key, dateStamp, regionName, serviceName) {
      var kDate = CryptoJS.HmacSHA256(dateStamp, 'AWS4' + key);
      var kRegion = CryptoJS.HmacSHA256(regionName, kDate);
      var kService = CryptoJS.HmacSHA256(serviceName, kRegion);
      var kSigning = CryptoJS.HmacSHA256('aws4_request', kService);
      return kSigning;
    };
 
    function createEndpoint(regionName, awsIotEndpoint, accessKey, secretKey) {
      var time = moment.utc();
      var dateStamp = time.format('YYYYMMDD');
      var amzdate = dateStamp + 'T' + time.format('HHmmss') + 'Z';
      var service = 'iotdevicegateway';
      var region = regionName;
      var secretKey = secretKey;
      var accessKey = accessKey;
      var algorithm = 'AWS4-HMAC-SHA256';
      var method = 'GET';
      var canonicalUri = '/mqtt';
      var host = awsIotEndpoint;
 
      var credentialScope = dateStamp + '/' + region + '/' + service + '/' + 'aws4_request';
      var canonicalQuerystring = 'X-Amz-Algorithm=AWS4-HMAC-SHA256';
      canonicalQuerystring += '&X-Amz-Credential=' + encodeURIComponent(accessKey + '/' + credentialScope);
      canonicalQuerystring += '&X-Amz-Date=' + amzdate;
      canonicalQuerystring += '&X-Amz-SignedHeaders=host';
 
      var canonicalHeaders = 'host:' + host + '\n';
      var payloadHash = SigV4Utils.sha256('');
      var canonicalRequest = method + '\n' + canonicalUri + '\n' + canonicalQuerystring + '\n' + canonicalHeaders + '\nhost\n' + payloadHash;
 
      var stringToSign = algorithm + '\n' +  amzdate + '\n' +  credentialScope + '\n' +  SigV4Utils.sha256(canonicalRequest);
      var signingKey = SigV4Utils.getSignatureKey(secretKey, dateStamp, region, service);
      var signature = SigV4Utils.sign(signingKey, stringToSign);
 
      canonicalQuerystring += '&X-Amz-Signature=' + signature;
      return 'wss://' + host + canonicalUri + '?' + canonicalQuerystring;
    }
 
    var endpoint = createEndpoint(
        'ap-northeast-1',                                                                        // リージョン
        'xxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com',  // IoT エンドポイント(小文字)
        '<YOUR_ACCESS_KEY_ID>',      // アクセスキー
        '<YOUR_SECRET_KEY>');        // シークレットキー
    var clientId = Math.random().toString(36).substring(7);
    var client = new Paho.MQTT.Client(endpoint, clientId);
    var connectOptions = {
      useSSL: true,
      timeout: 3,
      mqttVersion: 4,
      onSuccess: subscribe
    };
    client.connect(connectOptions);
    client.onMessageArrived = onMessage;
    client.onConnectionLost = function(e) { console.log(e) };
 
    function subscribe() {
      client.subscribe("kcmmw/topic");                                      // subscribe トピック
      console.log("subscribed");
    }
 
    function onMessage(message) {
      data.messages.push(message.payloadString);
      console.log("message received: " + message.payloadString);
    }
  </script>
</body>
</html>

このファイルをWebサーバ上に置いたうえでページアクセスし、先程のPythonコードを実行すれば、下図のようにデバイスからのメッセージをリアルタイムで表示することができます。

MQTTのサブスクライブはmosquittoを使用しています。

###参考
AWS Cloud9で簡単にWebサーバを立ち上げる方法は、こちらを参考にしてください。

image.png

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?