公式ドキュメント
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/iot-rule-actions.html
##IoT action とは
IoT rure で受け取ったTopicのデータを他のサービスへのアクションにつなげるのがIoT action。
IoT rule内でこのIoT actionを設定することでAWS IoT Coreと他サービス間でデータ連携を行うことができるようになります。
ここでは比較的簡単にサービス間の連携を行うことができる方法として、
- AWS IoT Core->AWSの他サービスへのデータ連携
- AWS IoT Core->RESTを使った他サービスへのデータ連携
の2点について説明していきます。
IoT rule自体の説明は以下の記事を参考にしてください
https://qiita.com/tomzono/private/12f6253c864f8f8c4070
AWS IoT Core->AWSの他サービスへのデータ連携
IoTRureを使ってAWS IoT CoreからAWSの他サービスへデータを連携するには、ルール内のアクションを追加して連携したいサービスを選択します。
今回はデバイスから受け取ったデータをDynamoDBに保存して最終受信データを確認できるようにしていきたいと思います。
まず[ACT]>[ルール]から設定を行うIoT ruleを選択し、
アクションの追加から[DynamoDBのテーブル(DynamoDBv2)~~]を選択しアクションを選択します。
連携するサービスを選択すると各サービスの連携先を設定する画面が表示されるので設定を行うとアクションとして追加されます。
DynamoDB(DynamoDBv2)の場合はIoT ruleで受け取った値を記録するためのテーブルとDynamoDBへのアクセス権を持っているロールを選択します。
アクションの設定を行った後はIoT ruleのアクションに設定したアクションが表示されるようになります。
連携の設定を行った後は、AWS IoT CoreのTopicに送られてきたデータがすべてルールーアクションを通じて連携先に送信されるようになります。
値に閾値などを設定してアクションに条件を付けたい場合はルールクエリの設定を行ってください。
指定したDynamoDBのテーブルを覗いてみると以下のようなデータが保存されていました。
このDynamoDBのアクションでは送られてきたJSONデータの第一階層の内容をカラムとしてDBに書き込んでくれるみたいですね。
今回はDBのパーティションキーをclientIDとして登録してるので、同じclientIDからのデータは上書き保存されていきますが、パーティションキーをtimestampにしておけば恐らく送られてくるデータが全て記録される履歴のようなものも作れそうですね。
(DynamoDBとかは入ってるレコードの量とかで追加課金が掛かったりするのでそこらへんは自己責任で...)
AWS IoT Core->RESTを使った他サービスへのデータ連携
他サービスにRESTでデータを送るにはLambdaを使って送信することができます。
REST用のLambda関数を作成して、ルールアクションに呼び出したいLamda関数を設定することでルールがトリガーされた際にlambdaが実行されます。
今回はデバイスから送られてきたデータをruleactionでLambdaをキックしてLambda内でデータ成型を行いVantiqというクラウドサービスにRestAPIを使ってデータを送信します。
※Vantiqとは
データの受信から複合イベント処理を行い人や物へのコラボレーションを行えるクラウドサービス
https://vantiq.co.jp/
###VantiqにデバイスからのmessageをRESTするためのLambda関数
import json
import requests
#from botocore.vendored import requests
#//////////////////////////////////////////////////////////////////////////////
vantiq_uri = "ENTER_VANTIQ_URI"
token = "ENTER_VANTIQ_TOKEN"
VANTIQ_HEADERS = {
"Authorization" : "Bearer " + token,
"content-type" : "application/json"
}
#//////////////////////////////////////////////////////////////////////////////
def lambda_handler(event, context):
print(event)
#Determining vacant seats
if event['SensorData'] < 7 :
Status = True
else:
Status = False
print(Status)
#Creating a send file for Vantiq
data={
"RoomNo": event["DBdata"]["roomNo"],
"ElementByID":event["DBdata"]["mapElementID"],
"UseageStatus": Status,
"Sensorvalue":event['SensorData']
}
#Send data
result=requests.post(vantiq_uri,headers=VANTIQ_HEADERS,data=json.dumps(data))
print("vantiq_responceResult")
print(result)
lambda側の設定が終わったら、デバイス側のソースコードも変更してみましょう。
現在動いてるプログラムが、AWSIoTのチュートリアルのソースコードが動いているので、デバイス側がセンサーデータを送るようなソースコードを記載します。(実際のセンサをつないでないのでコード内ではセンサー値として0~9の値をランダムで送信しています。)
###デバイス用Pythonプログラム
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time
import json
import random
#import configdata from AWSConfigFile.py
import AWSConfigFile as config
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE
ENDPOINT = config.ENDPOINT
CLIENT_ID = config.CLIENT_ID
PATH_TO_CERT = config.PATH_TO_CERT
PATH_TO_KEY = config.PATH_TO_KEY
PATH_TO_ROOT = config.PATH_TO_ROOT
TOPIC = config.TOPIC
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERT,
pri_key_filepath=PATH_TO_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_ROOT,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=6
)
print("Connecting to {} with client ID '{}'...".format(
ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("AWSAWS IoT Core_MQTTConnected!")
# Publish message to server desired number of times.
try :
while True:
time.sleep(30)
SensorData = random.randint(0,9)
message ={
"message":"SendSensorData",
"SensorData":SensorData,
"clientID":CLIENT_ID
}
mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
print("Published: '" + json.dumps(message) + "' to the topic: " + TOPIC)
except KeyboardInterrupt:
print('Exit')
try~exceptの部分が今までのコードから変更された部分です。
やってることはループの中で30秒ごとに着座センサの代わりに0~9のランダムな値を生成してAWS IoT Coreに送信しているだけです。
###IoT ruleからLambdaに送られてるデータ内容
"data": {
//デバイスからの元データ
"message": "SendSensorData",
"SensorData": 3,
"clientID": "10PC202010012",
//ルールクエリで追加したtimestamp
"timestamp": 1617697569446,
//dynamoDBから抽出した端末の管理データ
"DBdata": {
"geoJSON": {
"coordinates": [
139.764658,
35.688752
],
"type": "POINT"
},
"address": "東京都港区芝公園4丁目2−8",
"clientID": "10PC202010012",
"roomNo": "301",
"tell": "xxx-xxxx-xxxx",
"installationDate": "2021/04/02",
"admin": "Tom",
"deviceName": "NotePCClient",
"mapElementID": "MapElement301"
}
}
前回のIoT ruleの部分で開設した内容が今回のデータにも追加されてますね。
###Lambdaで成型処理してVantiqで受信しているデータ
{
"RoomNo": "301",
"UseageStatus": true,
"discription": "Room301",
"ElementByID": "MapElement301",
"Sensorvalue": "3"
}
Lambda上では送られてきたセンサー値からstatusの判断を行って、Vantiqが必要なデータだけにデータ成型しているのが分かると思います。
Vantiq内でやってる処理はRESTAPIで飛んでくるJSONデータに対して、どこの部屋が[RoomNo],利用中かどうか[UseageStatus]を受け取って、画面表示用のHTMLのElementID[ElementByID]の表示/非表示の切り替えを行う
といった処理だけ行うように設定を行ってます。
Restで受け取ったデータを表示できるようなアプリがあれば
それを使って代用してもらえればいい感じになるかと思います。
一応Lambda内の処理でtrue/falseが切り替わることによってこんな感じで座席の利用状況が分かるようになってます。
ユースケースの構成で今回の内容は以下の赤枠内のIoT action~他サービスへの連携部分の構築ができました。
これでデバイスからAWS IoT Coreを通じてWebAppまでのデータのやりとりが出来るようになりました。
ユースケースの主な構築作業はここまでで終了ですが、ここからはAWS IoT Coreの機能で使えると便利なIoT shadowとFleetprovisioningについて説明していきたいと思います。
AWSAWS IoT Core検証メモ④:IoT shadow について
##追記
4/6日時点でAWSLambdaに標準で入っていたライブラリbotocore.vendoredが利用できなくなってた模様。
このライブラリを使った際に以下のエラーが表示されていた。
/var/runtime/botocore/vendored/requests/api.py:72: DeprecationWarning: You are using the post() function from 'botocore.vendored.requests'. This dependency was removed from Botocore and will be removed from Lambda after 2021/03/31. https://aws.amazon.com/blogs/developer/removing-the-vendored-version-of-requests-from-botocore/. Install the requests package, 'import requests' directly, and use the requests.post() function instead.
HttpRequestを使いたい場合には、以下の記事を参考にpythonのrequestsモジュールをレイヤーに追加することで回避することができたので参考にしてみてください
https://qiita.com/afukuma/items/b7191025700a7829967c