続き
https://qiita.com/0_1geek311/items/417fca6e6885282c14a4
こちらの記事の続きになります。
前回、ラズパイを使って温度データを取得するところまでは出来ため、次はその温度データをAWS IoT → DynamoDB → Amazon ESと流し、Kibanaによるグラフ表示まで実際に実装していきたいと思います。
システム構成図
実装① 温度データ → AWS IoT → DynamoDB
システム構成図の①の部分を実装します。
ダミーデータ → AWS IoT → DynamoDBの実装は以下の記事で作成したのでこちらを流用します。
https://qiita.com/0_1geek311/items/93ac822b8561c6dbce3c
※設定方法は上記の記事を参照。
一通り設定が完了すると、
DynamoDB(テーブル名:iot_test)に以下のデータがAWS IoT経由で登録される状態まで設定できます。
・「time」(データ送信時間)
・「deviceNo」(デバイスNo) ※固定値
・「ttl」(データ有効時間)
次に、ダミーデータを送信していた部分をラズパイからの温度データを送信する設定に変更していきます。
設定したラズパイにAWS IoTで取得した以下のKeyファイルを任意のディレクトリに保管。
・XXX.crt
・XXX-private.pem.key
・XXX-public.pem.key
・AmazonRootCA1.pem
更にpipコマンドでAWSIoTPythonSDKをインストール。
pip install AWSIoTPythonSDK
実際に実行するPGは以下の通り。
'''
Send Time Sample
'''
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import datetime
import logging
# Setup Param
host = "XXXXX-ats.iot.us-west-2.amazonaws.com" ★Rest APIエンドポイントを入力
# for Raspberry Pi
rootCAPath = "AmazonRootCA1.pem" ★ダウンロードしたKeyを入力
certificatePath = "XXX-certificate.pem.crt"
privateKeyPath = "XXX-private.pem.key"
useWebsocket = False
clientId = "testRule"
topic = "testRule/time"
deviceNo = "031868f200ff"
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
# Initial
time.sleep(2)
try:
while True:
nowtime = datetime.datetime.now()
nowsettime = int(time.mktime(nowtime.timetuple())) # UNIX Time
#send Command
message = {}
message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S')
message['deviceNo'] = deviceNo
f = open("/sys/bus/w1/devices/28-031868f200ff/w1_slave", "r")
line = f .readlines()[1].replace("\n", "")
n = int(line.find("t="))
message['temperature'] = int(line[n+2:]) * 0.001
f.close()
messageJson = json.dumps(message)
print (messageJson)
myAWSIoTMQTTClient.publish(topic, messageJson, 1)
time.sleep(60)
except:
import traceback
traceback.print_exc()
print ("Terminated")
こちらのPGもほぼ流用になりますが、ダミーデータを送っていたときと違うのが、
「ttl」(データ有効時間)を削除し、新たに「temperature」(温度データ)を追加しました。
温度データは以下のように指定のファイルに保管されているため、
f = open("/sys/bus/w1/devices/28-031868f200ff/w1_slave", "r")
line = f .readlines()[1].replace("\n", "")
n = int(line.find("t="))
message['temperature'] = int(line[n+2:]) * 0.001
f.close()
上記の部分で温度データが保存されているファイルを読み取り、「t=」以降の温度データを取得し、1000倍されている数値を正しく修正したあと、AWS IoTへ送信しています。
PGの実行
python awsTest.py
dynamoDB ※ひとまず60秒ごとに送信が行われるためおおよそ1分単位でデータが蓄積
実装② DynamoDB → Lambda → Amazon ES
登録された温度データをAmazon ESに送るために、DynamoDBへデータ登録をトリガーにAWS Lumbdaを発火させます。
まずは、DynamoDBの「概要」でストリームの詳細にて「ストリームの管理」ボタンをクリックし、ストリームを有効化します。
表示タイプはデフォルトのままで、「有効化」をクリック。
※「最新のストリーム ARN」項目にARNが表示されていればOK。
続いて Amazon ESの設定を行います。
Amazon ESのドメイン作成画面にて、下記の通り設定し、「次へ」をクリック。
※Kibanaのバージョン6.5になります。
ネットワーク構成は「パブリックアクセス」とし、アクセスポリシーの設定は「ドメインへのオープンアクセスを許可」を設定。画面下部の「次へ」ボタンをクリック。
※今回はテストのため制限は掛けず、パブリックとします。
確認画面で内容を確認し、ドメインを作成。
※作成完了までしばらく時間がかかります。
作成が完了すると、「エンドポイント」「ドメインARN」「Kibana」が表示される。
※ここの値は、PGで使用するため、控えておく。
次に、AWS Lambdaを開き、言語python3.6でLambda関数を作成します。※名前は任意
PGは以下の通り、
import os
import requests
import json
from aws_requests_auth.aws_auth import AWSRequestsAuth
import string
import datetime
from urllib.parse import urlparse, parse_qs
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] != 'INSERT':
return
strTime = (record['dynamodb']['NewImage']['time']['S'])
strDevice = (record['dynamodb']['NewImage']['deviceNo']['S'])
strTemperature = (record['dynamodb']['NewImage']['temperature']['S'])
now = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")
auth = AWSRequestsAuth(aws_access_key= os.environ.get('AWS_ACCESS_KEY_ID'),
aws_secret_access_key= os.environ.get('AWS_SECRET_ACCESS_KEY'),
aws_token = os.environ.get('AWS_SESSION_TOKEN'),
aws_host='search-es-test-XXXXX.es.amazonaws.com', ※メモしたエンドポイントの値を「https://」無しで入力」
aws_region='us-west-2',
aws_service='es')
url = ("https://search-es-test-xxxxx.us-west-2.es.amazonaws.com/kibana/sensor?pretty") ※メモしたエンドポイントを入力
data = json.dumps({ "time": strTime,
"deviceNo": strDevice,
"temperature": float(strTemperature),
"@timestamp": now })
res = requests.post(url,
data=data,
headers={'Content-Type':'application/json'},
auth=auth)
return {"result": "OK"}
※上記のPGファイルと、importライブラリをダウンロードした後、一つのファイルにzip化して
Lambdaへアップロード。
zipのアップロード完了後、Lambdaの設定画面で、
下記のように、トリガーにDynamoDBを設定し、テーブルを「iot-test」を選択。
Lambda「実行ロール」はDynamoDBの読み書きの権限を付与したものを設定、
加えて「基本設定」のタイムアウトを念の為3 → 15くらいに変更しておきます。
変更が完了後、画面上部の「保存」ボタンをクリック。
Lambdaの設定は以上なります。
設定が完了後、ラズパイから送信されている温度データが、DynamoDB → Lambdaを経由して、Amazon ESへ追加されます。
③確認 Amazon ES → Kibana、グラフ作成
送信されたデータを確認するため、Kinabaを表示し、確認していきます。
Amazon ESのダッシュボードを開き、ドメイン「es-test」をクリック。
「Kibana」に表示されているリンクURLをクリックする。
Kibanaの初期画面が表示されますので、「Management」をクリック。
ボタン「Index Patterns」→ 「Create index pattern」とクリックし、
Create index pattern画面を表示。index patternに「kibana」と入力し、
有効化された「Next step」ボタンをクリックする。
次に「Time Filter field naem」のリストから送信したデータの1つの「@timestamp」を選択肢、「Create index pattern」ボタンをクリックし、indexを作成する。
※作成完了後の画面で、「★kibana」と表示されていればOK。
indexが登録できたので、グラフ表示のための設定を行っていきます。
今回は「折れ線グラフ」をつかったグラフ表示を行います。
まず、画面左のメニュー「Discover」をクリックします。
「Selected fields」の一覧にある「temperature」にカーソルを合わせ、表示された「Add」ボタンをクリックする。クリック後、Selected fieldsにtemperatureが追加される。
データを確認後、画面左メニューの「Visualaize」をクリック。
次の画面で「From a New Search, Select Index」のNameで「kibana*」をクリック。
画面が切り替わり、以下のような画面になります。
ここの赤枠の部分でX軸、Y軸を設定し、折れ線グラフを表示させていきます。
「Y-Axis」と「X-Axis」を以下の通り設定し、反映ボタンをクリックします。
これで、グラフ表示ができました。
画面をリロードすれば最新の温度データが表示されます。
※↑温度が一部急に上昇しているのは試しに温度センサーを指で挟んだため。
※もっとデザインを凝りたい場合はkibana canvasなどの希望に合ったpluginを入れることで実装ができます。