Help us understand the problem. What is going on with this article?

温度センサーから取得したデータをAWS IoT→DynamoDB経由でAmazon ESにデータを送信する<2/2>

続き

https://qiita.com/0_1geek311/items/417fca6e6885282c14a4
こちらの記事の続きになります。

前回、ラズパイを使って温度データを取得するところまでは出来ため、次はその温度データをAWS IoT → DynamoDB → Amazon ESと流し、Kibanaによるグラフ表示まで実際に実装していきたいと思います。

システム構成図

demo.jpeg
※構成図は前回と同じ

実装① 温度データ → AWS IoT → DynamoDB

システム構成図の①の部分を実装します。
ダミーデータ → AWS IoT → DynamoDBの実装は以下の記事で作成したのでこちらを流用します。
https://qiita.com/0_1geek311/items/93ac822b8561c6dbce3c
※設定方法は上記の記事を参照。

一通り設定が完了すると、
DynamoDB(テーブル名:iot_test)に以下のデータがAWS IoT経由で登録される状態まで設定できます。
・「time」(データ送信時間)
・「deviceNo」(デバイスNo) ※固定値
・「ttl」(データ有効時間)

次に、ダミーデータを送信していた部分をラズパイからの温度データを送信する設定に変更していきます。
設定したラズパイにAWS IoTで取得した以下のKeyファイルを任意のディレクトリに保管。
・XXX.crt
・XXX-private.pem.key
・XXX-public.pem.key
・AmazonRootCA1.pem

更にpipコマンドでAWSIoTPythonSDKをインストール。

pip install AWSIoTPythonSDK

img_r1.jpg

実際に実行するPGは以下の通り。

awsTest.py
'''
Send Time Sample
'''

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import time
import json
import datetime
import logging

# Setup Param
host = "XXXXX-ats.iot.us-west-2.amazonaws.com" ★Rest APIエンドポイントを入力

# for Raspberry Pi
rootCAPath = "AmazonRootCA1.pem"         ★ダウンロードしたKeyを入力
certificatePath = "XXX-certificate.pem.crt"
privateKeyPath = "XXX-private.pem.key"

useWebsocket = False
clientId = "testRule"
topic = "testRule/time"
deviceNo = "031868f200ff"

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)      # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)            # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)    # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)         # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
# Initial
time.sleep(2)

try:
    while True:
        nowtime = datetime.datetime.now()
        nowsettime = int(time.mktime(nowtime.timetuple()))  # UNIX Time
        #send Command
        message = {}
        message['time'] = nowtime.strftime('%Y/%m/%d %H:%M:%S')
        message['deviceNo'] = deviceNo
        f = open("/sys/bus/w1/devices/28-031868f200ff/w1_slave", "r")
        line = f .readlines()[1].replace("\n", "")
        n = int(line.find("t="))
        message['temperature']  = int(line[n+2:]) * 0.001 
        f.close()
        messageJson = json.dumps(message)
        print (messageJson)
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        time.sleep(60)
except:
    import traceback
    traceback.print_exc()

print ("Terminated")

こちらのPGもほぼ流用になりますが、ダミーデータを送っていたときと違うのが、
「ttl」(データ有効時間)を削除し、新たに「temperature」(温度データ)を追加しました。
温度データは以下のように指定のファイルに保管されているため、
img_w1.jpg

f = open("/sys/bus/w1/devices/28-031868f200ff/w1_slave", "r")
line = f .readlines()[1].replace("\n", "")
n = int(line.find("t="))
message['temperature']  = int(line[n+2:]) * 0.001 
f.close()

上記の部分で温度データが保存されているファイルを読み取り、「t=」以降の温度データを取得し、1000倍されている数値を正しく修正したあと、AWS IoTへ送信しています。

PGの実行

python awsTest.py

img_r2.jpg

dynamoDB ※ひとまず60秒ごとに送信が行われるためおおよそ1分単位でデータが蓄積
img_d1.jpg

実装② DynamoDB → Lambda → Amazon ES

登録された温度データをAmazon ESに送るために、DynamoDBへデータ登録をトリガーにAWS Lumbdaを発火させます。

まずは、DynamoDBの「概要」でストリームの詳細にて「ストリームの管理」ボタンをクリックし、ストリームを有効化します。
img_es5.JPG
表示タイプはデフォルトのままで、「有効化」をクリック。
img_es6.JPG
※「最新のストリーム ARN」項目にARNが表示されていればOK。

続いて Amazon ESの設定を行います。
Amazon ESのドメイン作成画面にて、下記の通り設定し、「次へ」をクリック。
※Kibanaのバージョン6.5になります。
img_es1.JPG

Amazon ESのドメイン名、インスタンスタイプを設定。
img_es2.JPG

ネットワーク構成は「パブリックアクセス」とし、アクセスポリシーの設定は「ドメインへのオープンアクセスを許可」を設定。画面下部の「次へ」ボタンをクリック。
※今回はテストのため制限は掛けず、パブリックとします。
img_es3.JPG

確認画面で内容を確認し、ドメインを作成。
※作成完了までしばらく時間がかかります。

作成が完了すると、「エンドポイント」「ドメインARN」「Kibana」が表示される。
※ここの値は、PGで使用するため、控えておく。
img_es7.JPG

次に、AWS Lambdaを開き、言語python3.6でLambda関数を作成します。※名前は任意
PGは以下の通り、

lambda_function.py
import os
import requests
import json
from aws_requests_auth.aws_auth import AWSRequestsAuth
import string
import datetime
from urllib.parse import urlparse, parse_qs

def lambda_handler(event, context):
    for record in event['Records']:
        if record['eventName'] != 'INSERT':
            return
        strTime = (record['dynamodb']['NewImage']['time']['S'])
        strDevice = (record['dynamodb']['NewImage']['deviceNo']['S'])
        strTemperature = (record['dynamodb']['NewImage']['temperature']['S'])
        now = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")

    auth = AWSRequestsAuth(aws_access_key= os.environ.get('AWS_ACCESS_KEY_ID'),
                           aws_secret_access_key= os.environ.get('AWS_SECRET_ACCESS_KEY'),
                           aws_token = os.environ.get('AWS_SESSION_TOKEN'),
                           aws_host='search-es-test-XXXXX.es.amazonaws.com', ※メモしたエンドポイントの値を「https://」無しで入力」
                           aws_region='us-west-2',
                           aws_service='es')                   
    url = ("https://search-es-test-xxxxx.us-west-2.es.amazonaws.com/kibana/sensor?pretty") ※メモしたエンドポイントを入力
    data = json.dumps({ "time": strTime, 
                        "deviceNo": strDevice, 
                        "temperature": float(strTemperature), 
                        "@timestamp": now })
    res = requests.post(url, 
                        data=data, 
                        headers={'Content-Type':'application/json'}, 
                        auth=auth)

    return {"result": "OK"}

※上記のPGファイルと、importライブラリをダウンロードした後、一つのファイルにzip化して
Lambdaへアップロード。
img_es9.JPG

zipのアップロード完了後、Lambdaの設定画面で、
下記のように、トリガーにDynamoDBを設定し、テーブルを「iot-test」を選択。
img_es8.JPG

Lambda「実行ロール」はDynamoDBの読み書きの権限を付与したものを設定、
加えて「基本設定」のタイムアウトを念の為3 → 15くらいに変更しておきます。
変更が完了後、画面上部の「保存」ボタンをクリック。
img_es10.JPG

Lambdaの設定は以上なります。

設定が完了後、ラズパイから送信されている温度データが、DynamoDB → Lambdaを経由して、Amazon ESへ追加されます。

③確認 Amazon ES → Kibana、グラフ作成

送信されたデータを確認するため、Kinabaを表示し、確認していきます。

Amazon ESのダッシュボードを開き、ドメイン「es-test」をクリック。
「Kibana」に表示されているリンクURLをクリックする。
img_es11.JPG

Kibanaの初期画面が表示されますので、「Management」をクリック。
img_es12.JPG

ボタン「Index Patterns」→ 「Create index pattern」とクリックし、
Create index pattern画面を表示。index patternに「kibana」と入力し、
有効化された「Next step」ボタンをクリックする。
img_es13.JPG

次に「Time Filter field naem」のリストから送信したデータの1つの「@timestamp」を選択肢、「Create index pattern」ボタンをクリックし、indexを作成する。
img_es14.JPG
※作成完了後の画面で、「★kibana」と表示されていればOK。

indexが登録できたので、グラフ表示のための設定を行っていきます。
今回は「折れ線グラフ」をつかったグラフ表示を行います。
まず、画面左のメニュー「Discover」をクリックします。
img0.JPG

「Selected fields」の一覧にある「temperature」にカーソルを合わせ、表示された「Add」ボタンをクリックする。クリック後、Selected fieldsにtemperatureが追加される。
img0-1.JPG

データを確認後、画面左メニューの「Visualaize」をクリック。
img_es15.JPG

次の画面で「From a New Search, Select Index」のNameで「kibana*」をクリック。
img_es16.JPG

画面が切り替わり、以下のような画面になります。
ここの赤枠の部分でX軸、Y軸を設定し、折れ線グラフを表示させていきます。
img_es17.JPG

「Y-Axis」と「X-Axis」を以下の通り設定し、反映ボタンをクリックします。
img_es18.JPG

これで、グラフ表示ができました。
画面をリロードすれば最新の温度データが表示されます。
img2.JPG

※↑温度が一部急に上昇しているのは試しに温度センサーを指で挟んだため。
※もっとデザインを凝りたい場合はkibana canvasなどの希望に合ったpluginを入れることで実装ができます。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away