はじめに
AWS IoT を使ってDynamoDBにデータを登録、API GatewayとLambdaでRESTful APIを作ります。
気が向いたらキャプチャ撮ります。
参考
- IoT Coreからの振り分けについて: Amazon DynamoDB ルールを作成する
手順
データをIoT Coreに送信してみよう
モノ(証明書)の作成
まず、コンソールにログインし、IoT Coreを開きます。
画面左部のメニューから「管理」をクリック。「モノの作成」をクリックします。
今回は「単一の AWS IoT モノの登録」を選択します。
「名前」にIoTデバイスの名前を付けます。今回はThing01としました。
「タイプ」、「グループ」、「検索可能なモノの属性」はデバイスを管理するためのオプション機能です。今回は使用しないので何も入力しません。「次へ」をクリックします。
AWS IoT Coreでは、デバイスに登録されている証明書を利用して認証を行います。
今回は「1-Click 証明書作成 (推奨)」を使用して証明書を作成します。
作成された証明書、パブリックキー、プライベートキーをダウンロードして保存しておきます。
「有効化」をクリックし、作成された証明書を使用可能にします。
「ポリシーをアタッチ」をクリックするとポリシーをアタッチする画面に遷移します。現在ポリシーを作成していないので、そのまま「モノの登録」をクリックします。
ポリシーの作成
画面左部のメニューから「安全性」-「ポリシー」をクリック。「ポリシーの作成」をクリックします。
ポリシーに名前を付けます。今回はpolicy01としました。
アドバンスドモードをクリックし、下記jsonを貼り付けます。
特定の「Resource」に「Action」を「Effect」するという情報を「Statement」内に書きます。
「Version」はjsonの書式のバージョンで、定型文のようなものです。
今回は任意のデバイスから任意の操作を許可するステートメントにしました(本番では適切な権限を設定する必要があります)。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:*"
],
"Resource": [
"*"
]
}
]
}
「作成」をクリックしてポリシーの作成を完了します。
先程作成した証明書にポリシーをアタッチします。これにより証明書を持ったデバイスはポリシーに従った操作が可能になります。
また、このことからわかるように、
画面左部のメニューから「安全性」-「証明書」をクリックします。
先程作成した証明書をクリックし、右上の「アクション」-「ポリシーのアタッチ」を開き、policy01を選択してアタッチします。
エンドポイントの確認
画面左部のメニューから「設定」をクリックします。
カスタムエンドポイントの欄に表示されているエンドポイントが、デバイスからAWSへの接続先になります。
エンドポイントは一つしかないので、データの振り分けはポリシーで行うことになります。
したがって、ポリシーの権限管理、証明書との紐付けは重要な管理事項になります(ちょっと面倒くさい)。
SDKの準備
ここからはデバイス側の準備になります。
が、今回はデバイスは準備せず、ただのpythonのプログラムからデータを送ってみましょう。
適当な作業用フォルダ(/work)を作ります。
SDKのgithubhttps://github.com/aws/aws-iot-device-sdk-javaからzipでダウンロードし、workで解凍します。
セットアップを実行します。
python setup.py install
work配下にcertフォルダを作成し、証明書、パブリックキー、プライベートキーを格納します。
work配下に下記サンプルプログラム(を少し改造したもの)を作成します。これはSDKに入っているサンプルプログラムを改造したものです。Hello AWS!というメッセージを無限に送り続けます。「追加 parameters」という部分のエンドポイント、ルート証明書、モノの証明書、プライベートキーを記載してください。
'''
Copyright 2019 ground0state All Rights Reserved.
'''
'''
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
'''
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json
# 追加 ----
import uuid
import random
# ---------
AllowedActions = ['both', 'publish', 'subscribe']
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
# Read in command-line parameters
## コメントアウト ----
# parser = argparse.ArgumentParser()
# parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
# parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
# parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
# parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
# parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
# parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
# help="Use MQTT over WebSocket")
# parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
# help="Targeted client id")
# parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
# parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
# help="Operation modes: %s"%str(AllowedActions))
# parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
# help="Message to publish")
# args = parser.parse_args()
# host = args.host
# rootCAPath = args.rootCAPath
# certificatePath = args.certificatePath
# privateKeyPath = args.privateKeyPath
# port = args.port
# useWebsocket = args.useWebsocket
# clientId = args.clientId
# topic = args.topic
# ----------------------------
# 追加 parameters ----
host = "xxxxxxxxxxxxxxxxxxx.amazonaws.com" # エンドポイント
rootCAPath = "/xxxxx/work/cert/rootCA.pem" # ルート証明書
certificatePath = "/xxxxx/work/cert/xxxxxxxxxx-certificate.pem.crt" # モノの証明書
privateKeyPath = "/xxxxx/work/cert/xxxxxxxxxx-private.pem.key" # プライベートキー
port = 8883
useWebsocket = False
clientId = "thing01" # モノの名前
topic = "sdk/test/Python" # データを送信・受信するトピック
mode = "both" # 送信・受信両方の通信をする
message_text = "Hello AWS!" # このプログラムで送信するテキスト
# --------------------
## コメントアウト ----
# if args.mode not in AllowedActions:
# parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
# exit(2)
# if args.useWebsocket and args.certificatePath and args.privateKeyPath:
# parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
# exit(2)
# if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
# parser.error("Missing credentials for authentication.")
# exit(2)
# # Port defaults
# if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443
# port = 443
# if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883
# port = 8883
#----------------------
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# コメントアウト ----
# # Connect and subscribe to AWS IoT
# myAWSIoTMQTTClient.connect()
# if args.mode == 'both' or args.mode == 'subscribe':
# myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
# time.sleep(2)
# # Publish to the same topic in a loop forever
# loopCount = 0
# while True:
# if args.mode == 'both' or args.mode == 'publish':
# message = {}
# message['message'] = args.message
# message['sequence'] = loopCount
# messageJson = json.dumps(message)
# myAWSIoTMQTTClient.publish(topic, messageJson, 1)
# if args.mode == 'publish':
# print('Published topic %s: %s\n' % (topic, messageJson))
# loopCount += 1
# time.sleep(1)
#----------------------
# 追加 ----
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if mode == 'both' or mode == 'subscribe':
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)
# Publish to the same topic in a loop forever
loopCount = 0
while True:
if mode == 'both' or mode == 'publish':
u4 = str(uuid.uuid4())
message = {}
message['id'] = u4
message['payload'] = message_text
message['option_code'] = random.randrange(1, 10)
messageJson = json.dumps(message)
myAWSIoTMQTTClient.publish(topic, messageJson, 1)
if mode == 'publish':
print('%s Published topic %s: %s\n' % (loopCount, topic, messageJson))
loopCount += 1
time.sleep(1)
# ----------
データ送信
pub_sub.pyを実行します。
python pub_sub.py
AWSのコンソールに戻り、画面左部のメニューから「テスト」をクリックします。
「トピックのサブスクリプション」のテキストボックスに「sdk/test/Python」と入力し、「トピックへのサブスクライブ」をクリックします。
画面下部に送信されてきたメッセージが表示されます。
IoT Core に送られたデータを Dynamo DB に格納してみよう
ルールの作成
画面左部のメニューから「ACT」をクリックします。
「ルールの作成」をクリックします。
ルールに名前を付けます。今回はDynamo01としました。
「ルールクエリステートメント」欄に、データに対する加工操作を記述します。
「SQL バージョンの使用」は2016-03-23
にします。
「ルールクエリステートメント」には下記のステートメントを記述します。
SELECT clientid() AS client_id, timestamp() AS timestamp, * FROM 'sdk/test/Python'
「アクションの追加」をクリックします。「データベーステーブル(DynamoDBv2)の複数列にメッセージを分割する」をクリックし、「アクションの設定」をクリックします。
「アクションの設定」画面で「新しいリソースを作成する」をクリックすると、「DynamoDB テーブルの作成」画面に遷移します。テーブル名とプライマリキーを入力します。テーブル名をIoT、プライマリキーをid
で文字列にしました。「ソートキーの追加」チェックボックスをONにし、ソートキーはtimestampで数値型にしました。その他はデフォルトのままです。「作成」をクリックするとテーブルが作成されます。
画面下部の方にある「ロールの作成」をクリックします。
ロールの名前を付けます。今回はIoTRoleとしました。ロールの作成をクリックするとロールが作成されます。
「アクションの追加」をクリックします。「ルールの作成」画面に戻るので、「ルールの作成」をクリックします。
pub_sub.pyを実行するとデータがIoT Coreに送信され、ルールに従ってDynamo DBに格納されます。
DynamoDBサービスを開き、画面左部のメニューから「テーブル」をクリックします。
「IoT」テーブルを開き、項目タブを開きます。成功していれば、テーブルにリアルタイムにデータが投入されていきます。
LambdaでAPIを作成してデータを取り出してみよう
API GatewayとLambdaを使用して簡単なAPIを作成してみます。今回はGETメソッドでidを指定してデータを取得できるようにしましょう。エンドポイントを/documents/{id}
とします。
Lambda関数に付与するロールの作成
Lambdaを作成していきますが、その前にLambdaがDynamoDBにアクセスするためのロールを作成します。
IAMサービスを開きます。
画面左部のメニューから「ロール」を選択します。
「ロールの作成」をクリックします。
「このロールを使用するサービスを選択」で「Lambda」を選択します。「次のステップ:アクセス権限」をクリックします。
ポリシーの一覧が表示されるので今回は「AmazonDynamoDBFullAccess」チェックボックスをONにします(本番では適切な権限を付与してください)。「次のステップ:タグ」をクリックします。
何もしないで「次のステップ:確認」をクリックします。
「ロール名」にIoT_Lambda_Dynamo
と入力し、「ロールの作成」をクリックします。
Lambda関数の作成
Lambdaサービスを開きます。
画面左部のメニューから「関数」を選択します。
「関数の作成」をクリックします。
「一から作成」を選択し、関数名をIoT_Dynamo
、「ランタイム」をpython3.7
とします。
「実行ロールの選択または作成」では「既存のロールを使用する」を選択し、「既存のロール」を「IoT_Lambda_Dynamo」とします。「関数の作成」をクリックします。
「実行ロール」欄の「既存のロール」がIoT_Lambda
になっていることを確認します。
「関数コード」欄にコードエディタが表示されています。下記コードを貼り付けます。
'''
Copyright 2019 ground0state All Rights Reserved.
参考 <https://qiita.com/is_ryo/items/74f3fc70b7602888a2ac>
'''
import json
import boto3
import decimal
from datetime import datetime, timezone, timedelta
from boto3.dynamodb.conditions import Key
def lambda_handler(event, context):
try:
dynamoDB = boto3.resource("dynamodb")
table = dynamoDB.Table("IoT") # DynamoDBのテーブル名
# DynamoDBへのquery処理実行
queryData = table.query(
KeyConditionExpression = Key("id").eq(event['pathParameters']["id"]), # 取得するKey情報
ScanIndexForward = False, # 昇順か降順か(デフォルトはtrue=昇順)
Limit = 1 # 取得するデータ件数
)
items = queryData['Items']
if len(items) == 0: # 取得件数が0の場合は404を返す
return {
'statusCode': 404,
'body': json.dumps(make_response(404, "Resouce not found."))
}
item = items[0]
UTC = timezone(timedelta(hours=+0), 'UTC')
item['timestamp'] = decimal_default(item['timestamp'])
item['timestamp'] = datetime.fromtimestamp(item['timestamp']/1000, UTC)
item['timestamp'] = item['timestamp'].strftime("%Y/%m/%d %H:%M:%S %Z")
return {
'statusCode': 200,
'body': json.dumps(items[0], default=decimal_default)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(make_response(500, "Internal server error."))
}
def make_response(code, message):
response = {}
response["code"] = code
response["message"] = message
return response
def decimal_default(obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError
画面上部の「保存」をクリックします。
API Gatewayの設定
API Gatewayサービスを開きます。
「プロトコルを選択する」では「RESTを選択し、「新しい API の作成」では「新しいAPI」を選択します。「API名」をIoT_Dynamo
に、「エンドポイントタイプ」を「リージョン」にしました。
画面左部のメニューから「IoT_Dynamo」-「リソース」を選択します。
「アクション」から「リソースの作成」を選択します。
「リソース名」をdocuments
、「リソースパス」をdocuments
にします。
「API Gateway CORS を有効にする」チェックボックスをONにします。
新しく出来た/documentsを選択し、「アクション」から「リソースの作成」を選択します。
「リソース名」をdocuments_id
、「リソースパス」を{id}
にします。
「API Gateway CORS を有効にする」チェックボックスをONにします。
新しく出来た/documents/{id}を選択し、「アクション」から「メソッドの作成」を選択します。
リストボックスが表示されるので「GET」を選びます。
「/documents/{id} - GET - セットアップ」画面で、「統合タイプ」を「Lambda関数」に、「Lambda プロキシ統合の使用」チェックボックスをONに、「Lambda 関数」をIoT_Dynamo
にします。その他はデフォルトとします。「保存」をクリックします。
「/documents/{id} - GET - メソッドの実行」画面で、「統合リクエスト」をクリックします。画面下部の「URLパスパラメータ」をクリックします。「名前」にid
、「マッピング元」にmethod.request.path.id
を入力し、右端のチェックマークをクリックします。
「アクション」から「APIのデプロイ」を選択します。「デプロイされるステージ」は「新しいステージ」を選択し、「ステージ名」をdev
とします。「デプロイ」をクリックします。
遷移した画面の「URL の呼び出し」にAPIのエンドポイントが表示されています。
DynamoDBに格納されている適当なデータのIDをエンドポイントに付加して、Chromeなどのブラウザでアクセスしてみましょう。
https://xxxxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/documents/02d73e4d-167e-4ccb-bc39-f3e1be1b5e65
次のようなレスポンスが帰ってくれば成功です。
{"payload": "Hello AWS!", "option_code": 7.0, "id": "02d73e4d-167e-4ccb-bc39-f3e1be1b5e65", "client_id": "thing01", "timestamp": "2019/09/13 02:25:03 UTC"}
おわりに
POSTやDELETEも同様にして作成できます。これでIoT基盤の基礎はできそうです。
本番のIoT基盤を作成するにあたっては、次の課題があります。
- 大量データを確実に捌く -> Kinesisの利用
- DynamoDBはクエリが弱い -> 検索項目の絞り込み・セカンダリインデックスの利用
- データの長期保存 -> S3とAthenaの利用・S3キーの構造検討
- Lambdaの粒度 -> リソースとメソッドの組み合わせごとにLambda関数を作成する?リソース単位でまとめる?
- 大量デバイスや証明書の管理 -> DynamoDBで管理DBつくる?
- RESTful APIの認証 -> Cognitoの利用
- リソースの構築省力化 -> CloudFormationの利用
- 過去データの一括ダウンロード -> 非同期API・S3へのファイル作成・署名付きURL
- 画像や動画データの処理 -> Kinesis?
- 重い処理のエッジ側での対応 -> Greengrass
- 収集したデータの分析 -> Sage MakerやIoT Anarytics