LoginSignup
9
12

More than 3 years have passed since last update.

AWS IoT で IoT基盤を構築 ~お試し編~

Posted at

はじめに

AWS IoT を使ってDynamoDBにデータを登録、API GatewayとLambdaでRESTful APIを作ります。

気が向いたらキャプチャ撮ります。

参考

手順

データをIoT Coreに送信してみよう

モノ(証明書)の作成

まず、コンソールにログインし、IoT Coreを開きます。

画面左部のメニューから「管理」をクリック。「モノの作成」をクリックします。
今回は「単一の AWS IoT モノの登録」を選択します。

「名前」にIoTデバイスの名前を付けます。今回はThing01としました。
「タイプ」、「グループ」、「検索可能なモノの属性」はデバイスを管理するためのオプション機能です。今回は使用しないので何も入力しません。「次へ」をクリックします。

AWS IoT Coreでは、デバイスに登録されている証明書を利用して認証を行います。
今回は「1-Click 証明書作成 (推奨)」を使用して証明書を作成します。
作成された証明書、パブリックキー、プライベートキーをダウンロードして保存しておきます。
「有効化」をクリックし、作成された証明書を使用可能にします。

「ポリシーをアタッチ」をクリックするとポリシーをアタッチする画面に遷移します。現在ポリシーを作成していないので、そのまま「モノの登録」をクリックします。

ポリシーの作成

画面左部のメニューから「安全性」-「ポリシー」をクリック。「ポリシーの作成」をクリックします。
ポリシーに名前を付けます。今回はpolicy01としました。
アドバンスドモードをクリックし、下記jsonを貼り付けます。
特定の「Resource」に「Action」を「Effect」するという情報を「Statement」内に書きます。
「Version」はjsonの書式のバージョンで、定型文のようなものです。
今回は任意のデバイスから任意の操作を許可するステートメントにしました(本番では適切な権限を設定する必要があります)。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:*"
      ],
      "Resource": [
        "*"
      ]
    }
  ]
}

「作成」をクリックしてポリシーの作成を完了します。

先程作成した証明書にポリシーをアタッチします。これにより証明書を持ったデバイスはポリシーに従った操作が可能になります。
また、このことからわかるように、

画面左部のメニューから「安全性」-「証明書」をクリックします。
先程作成した証明書をクリックし、右上の「アクション」-「ポリシーのアタッチ」を開き、policy01を選択してアタッチします。

エンドポイントの確認

画面左部のメニューから「設定」をクリックします。
カスタムエンドポイントの欄に表示されているエンドポイントが、デバイスからAWSへの接続先になります。
エンドポイントは一つしかないので、データの振り分けはポリシーで行うことになります。
したがって、ポリシーの権限管理、証明書との紐付けは重要な管理事項になります(ちょっと面倒くさい)。

SDKの準備

ここからはデバイス側の準備になります。
が、今回はデバイスは準備せず、ただのpythonのプログラムからデータを送ってみましょう。

適当な作業用フォルダ(/work)を作ります。
SDKのgithubhttps://github.com/aws/aws-iot-device-sdk-javaからzipでダウンロードし、workで解凍します。
セットアップを実行します。

python setup.py install

work配下にcertフォルダを作成し、証明書、パブリックキー、プライベートキーを格納します。
work配下に下記サンプルプログラム(を少し改造したもの)を作成します。これはSDKに入っているサンプルプログラムを改造したものです。Hello AWS!というメッセージを無限に送り続けます。「追加 parameters」という部分のエンドポイント、ルート証明書、モノの証明書、プライベートキーを記載してください。


'''
Copyright 2019 ground0state All Rights Reserved.
'''
'''
/*
 * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
 '''

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json

# 追加 ----
import uuid
import random
# ---------

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")


# Read in command-line parameters
## コメントアウト ----
# parser = argparse.ArgumentParser()
# parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
# parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
# parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
# parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
# parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
# parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
#                     help="Use MQTT over WebSocket")
# parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
#                     help="Targeted client id")
# parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
# parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
#                     help="Operation modes: %s"%str(AllowedActions))
# parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
#                     help="Message to publish")

# args = parser.parse_args()
# host = args.host
# rootCAPath = args.rootCAPath
# certificatePath = args.certificatePath
# privateKeyPath = args.privateKeyPath
# port = args.port
# useWebsocket = args.useWebsocket
# clientId = args.clientId
# topic = args.topic
# ----------------------------

# 追加 parameters ----
host = "xxxxxxxxxxxxxxxxxxx.amazonaws.com"  # エンドポイント
rootCAPath = "/xxxxx/work/cert/rootCA.pem"  # ルート証明書
certificatePath = "/xxxxx/work/cert/xxxxxxxxxx-certificate.pem.crt"  # モノの証明書
privateKeyPath = "/xxxxx/work/cert/xxxxxxxxxx-private.pem.key"  # プライベートキー
port = 8883
useWebsocket = False
clientId = "thing01"  # モノの名前
topic = "sdk/test/Python"  # データを送信・受信するトピック

mode = "both"  # 送信・受信両方の通信をする
message_text = "Hello AWS!"  # このプログラムで送信するテキスト
# --------------------

## コメントアウト ----
# if args.mode not in AllowedActions:
#     parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
#     exit(2)

# if args.useWebsocket and args.certificatePath and args.privateKeyPath:
#     parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
#     exit(2)

# if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
#     parser.error("Missing credentials for authentication.")
#     exit(2)

# # Port defaults
# if args.useWebsocket and not args.port:  # When no port override for WebSocket, default to 443
#     port = 443
# if not args.useWebsocket and not args.port:  # When no port override for non-WebSocket, default to 8883
#     port = 8883
#----------------------

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec


# コメントアウト ----
# # Connect and subscribe to AWS IoT
# myAWSIoTMQTTClient.connect()
# if args.mode == 'both' or args.mode == 'subscribe':
#     myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
# time.sleep(2)

# # Publish to the same topic in a loop forever
# loopCount = 0
# while True:
#     if args.mode == 'both' or args.mode == 'publish':
#         message = {}
#         message['message'] = args.message
#         message['sequence'] = loopCount
#         messageJson = json.dumps(message)
#         myAWSIoTMQTTClient.publish(topic, messageJson, 1)
#         if args.mode == 'publish':
#             print('Published topic %s: %s\n' % (topic, messageJson))
#         loopCount += 1
#     time.sleep(1)
#----------------------

# 追加 ----
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if mode == 'both' or mode == 'subscribe':
    myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if mode == 'both' or mode == 'publish':
        u4 = str(uuid.uuid4())
        message = {}
        message['id'] = u4
        message['payload'] = message_text
        message['option_code'] = random.randrange(1, 10)
        messageJson = json.dumps(message)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if mode == 'publish':
            print('%s Published topic %s: %s\n' % (loopCount, topic, messageJson))
        loopCount += 1
    time.sleep(1)
# ----------

データ送信

pub_sub.pyを実行します。

python pub_sub.py

AWSのコンソールに戻り、画面左部のメニューから「テスト」をクリックします。
「トピックのサブスクリプション」のテキストボックスに「sdk/test/Python」と入力し、「トピックへのサブスクライブ」をクリックします。
画面下部に送信されてきたメッセージが表示されます。

IoT Core に送られたデータを Dynamo DB に格納してみよう

ルールの作成

画面左部のメニューから「ACT」をクリックします。
「ルールの作成」をクリックします。
ルールに名前を付けます。今回はDynamo01としました。

「ルールクエリステートメント」欄に、データに対する加工操作を記述します。
「SQL バージョンの使用」は2016-03-23にします。
「ルールクエリステートメント」には下記のステートメントを記述します。

SELECT clientid() AS client_id, timestamp() AS timestamp, * FROM 'sdk/test/Python'

「アクションの追加」をクリックします。「データベーステーブル(DynamoDBv2)の複数列にメッセージを分割する」をクリックし、「アクションの設定」をクリックします。

「アクションの設定」画面で「新しいリソースを作成する」をクリックすると、「DynamoDB テーブルの作成」画面に遷移します。テーブル名とプライマリキーを入力します。テーブル名をIoT、プライマリキーをidで文字列にしました。「ソートキーの追加」チェックボックスをONにし、ソートキーはtimestampで数値型にしました。その他はデフォルトのままです。「作成」をクリックするとテーブルが作成されます。

画面下部の方にある「ロールの作成」をクリックします。
ロールの名前を付けます。今回はIoTRoleとしました。ロールの作成をクリックするとロールが作成されます。

「アクションの追加」をクリックします。「ルールの作成」画面に戻るので、「ルールの作成」をクリックします。
pub_sub.pyを実行するとデータがIoT Coreに送信され、ルールに従ってDynamo DBに格納されます。

DynamoDBサービスを開き、画面左部のメニューから「テーブル」をクリックします。
「IoT」テーブルを開き、項目タブを開きます。成功していれば、テーブルにリアルタイムにデータが投入されていきます。

LambdaでAPIを作成してデータを取り出してみよう

API GatewayとLambdaを使用して簡単なAPIを作成してみます。今回はGETメソッドでidを指定してデータを取得できるようにしましょう。エンドポイントを/documents/{id}とします。

Lambda関数に付与するロールの作成

Lambdaを作成していきますが、その前にLambdaがDynamoDBにアクセスするためのロールを作成します。
IAMサービスを開きます。
画面左部のメニューから「ロール」を選択します。
「ロールの作成」をクリックします。
「このロールを使用するサービスを選択」で「Lambda」を選択します。「次のステップ:アクセス権限」をクリックします。
ポリシーの一覧が表示されるので今回は「AmazonDynamoDBFullAccess」チェックボックスをONにします(本番では適切な権限を付与してください)。「次のステップ:タグ」をクリックします。
何もしないで「次のステップ:確認」をクリックします。
「ロール名」にIoT_Lambda_Dynamoと入力し、「ロールの作成」をクリックします。

Lambda関数の作成

Lambdaサービスを開きます。
画面左部のメニューから「関数」を選択します。
「関数の作成」をクリックします。
「一から作成」を選択し、関数名をIoT_Dynamo、「ランタイム」をpython3.7とします。
「実行ロールの選択または作成」では「既存のロールを使用する」を選択し、「既存のロール」を「IoT_Lambda_Dynamo」とします。「関数の作成」をクリックします。

「実行ロール」欄の「既存のロール」がIoT_Lambdaになっていることを確認します。
「関数コード」欄にコードエディタが表示されています。下記コードを貼り付けます。

'''
Copyright 2019 ground0state All Rights Reserved.
参考 <https://qiita.com/is_ryo/items/74f3fc70b7602888a2ac>
'''

import json
import boto3
import decimal
from datetime import datetime, timezone, timedelta
from boto3.dynamodb.conditions import Key

def lambda_handler(event, context):
    try:
        dynamoDB = boto3.resource("dynamodb")
        table = dynamoDB.Table("IoT") # DynamoDBのテーブル名

        # DynamoDBへのquery処理実行
        queryData = table.query(
            KeyConditionExpression = Key("id").eq(event['pathParameters']["id"]),  # 取得するKey情報
            ScanIndexForward = False,  # 昇順か降順か(デフォルトはtrue=昇順)
            Limit = 1  # 取得するデータ件数
        )

        items = queryData['Items']
        if len(items) == 0:  # 取得件数が0の場合は404を返す
            return {
                'statusCode': 404,
                'body': json.dumps(make_response(404, "Resouce not found."))
            }
        item = items[0]

        UTC = timezone(timedelta(hours=+0), 'UTC')
        item['timestamp'] = decimal_default(item['timestamp'])
        item['timestamp'] = datetime.fromtimestamp(item['timestamp']/1000, UTC)
        item['timestamp'] = item['timestamp'].strftime("%Y/%m/%d %H:%M:%S %Z")

        return {
            'statusCode': 200,
            'body': json.dumps(items[0], default=decimal_default)
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(make_response(500, "Internal server error."))
        }


def make_response(code, message):
    response = {}
    response["code"] = code
    response["message"] = message
    return response


def decimal_default(obj):
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError

画面上部の「保存」をクリックします。

API Gatewayの設定

API Gatewayサービスを開きます。
「プロトコルを選択する」では「RESTを選択し、「新しい API の作成」では「新しいAPI」を選択します。「API名」をIoT_Dynamoに、「エンドポイントタイプ」を「リージョン」にしました。

画面左部のメニューから「IoT_Dynamo」-「リソース」を選択します。
「アクション」から「リソースの作成」を選択します。
「リソース名」をdocuments、「リソースパス」をdocumentsにします。
「API Gateway CORS を有効にする」チェックボックスをONにします。

新しく出来た/documentsを選択し、「アクション」から「リソースの作成」を選択します。
「リソース名」をdocuments_id、「リソースパス」を{id}にします。
「API Gateway CORS を有効にする」チェックボックスをONにします。

新しく出来た/documents/{id}を選択し、「アクション」から「メソッドの作成」を選択します。
リストボックスが表示されるので「GET」を選びます。
「/documents/{id} - GET - セットアップ」画面で、「統合タイプ」を「Lambda関数」に、「Lambda プロキシ統合の使用」チェックボックスをONに、「Lambda 関数」をIoT_Dynamoにします。その他はデフォルトとします。「保存」をクリックします。

「/documents/{id} - GET - メソッドの実行」画面で、「統合リクエスト」をクリックします。画面下部の「URLパスパラメータ」をクリックします。「名前」にid、「マッピング元」にmethod.request.path.idを入力し、右端のチェックマークをクリックします。

「アクション」から「APIのデプロイ」を選択します。「デプロイされるステージ」は「新しいステージ」を選択し、「ステージ名」をdevとします。「デプロイ」をクリックします。

遷移した画面の「URL の呼び出し」にAPIのエンドポイントが表示されています。
DynamoDBに格納されている適当なデータのIDをエンドポイントに付加して、Chromeなどのブラウザでアクセスしてみましょう。

https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/documents/02d73e4d-167e-4ccb-bc39-f3e1be1b5e65

次のようなレスポンスが帰ってくれば成功です。

{"payload": "Hello AWS!", "option_code": 7.0, "id": "02d73e4d-167e-4ccb-bc39-f3e1be1b5e65", "client_id": "thing01", "timestamp": "2019/09/13 02:25:03 UTC"}

おわりに

POSTやDELETEも同様にして作成できます。これでIoT基盤の基礎はできそうです。

本番のIoT基盤を作成するにあたっては、次の課題があります。

  • 大量データを確実に捌く -> Kinesisの利用
  • DynamoDBはクエリが弱い -> 検索項目の絞り込み・セカンダリインデックスの利用
  • データの長期保存 -> S3とAthenaの利用・S3キーの構造検討
  • Lambdaの粒度 -> リソースとメソッドの組み合わせごとにLambda関数を作成する?リソース単位でまとめる?
  • 大量デバイスや証明書の管理 -> DynamoDBで管理DBつくる?
  • RESTful APIの認証 -> Cognitoの利用
  • リソースの構築省力化 -> CloudFormationの利用
  • 過去データの一括ダウンロード -> 非同期API・S3へのファイル作成・署名付きURL
  • 画像や動画データの処理 -> Kinesis?
  • 重い処理のエッジ側での対応 -> Greengrass
  • 収集したデータの分析 -> Sage MakerやIoT Anarytics

参考:http://www.aruinc.jp/file/20190416aws4.pdf

9
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
12