Help us understand the problem. What is going on with this article?

RaspberryPiに接続したセンサの情報をCloud Watchでモニタリングする

More than 1 year has passed since last update.

構想

RaspberryPiから送信したセンサの情報をAWSIoTを使って、DynamoDBに格納する。
格納した情報をLambdaを使ってCloudWatchに流し、モニタリングする。

AWS IoTにモノを登録する

まずはAWSIoTに接続するモノを登録する。

接続をクリック

1.png

デバイスの設定の今すぐ始めるをクリック

2.png

手順を確認して今すぐ始めるをクリック

3.png

接続するモノの環境を選択

今回はRaspberryPiを使用するのでOSはLinux、言語はPythonを選択する。

4.png

モノの名前を設定する

自分の好きな名前で良い。

5.png

接続キットをダウンロードする

今回登録したモノ専用の証明書などが同梱されているので、取扱に気をつける。

6.png

画面のコマンドを実行する

以前の記事を参考にRaspberryPiのPython環境を整えてから、画面に表示されたコマンドを実行してみる。

7.png

$ ./start.sh

2017-09-10 14:30:53,709 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Try to put a publish request 2 in the TCP stack.
2017-09-10 14:30:53,710 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Publish request 2 succeeded.
Received a new message: 
b'New Message 0'
from topic: 
sdk/test/Python
--------------


2017-09-10 14:30:54,713 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Try to put a publish request 3 in the TCP stack.
2017-09-10 14:30:54,714 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Publish request 3 succeeded.
Received a new message: 
b'New Message 1'
from topic: 
sdk/test/Python
--------------


2017-09-10 14:30:55,717 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Try to put a publish request 4 in the TCP stack.
2017-09-10 14:30:55,718 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Publish request 4 succeeded.
Received a new message: 
b'New Message 2'
from topic: 
sdk/test/Python
--------------

上記のようなメッセージが流れ出したら成功。

送信内容を変更する

上記で実行したサンプルプログラムでは、永遠とNew Message Xというメッセージを送り続けているだけなので、これを温度/湿度センサ(DHT11)の出力に変える。

まずは先程ダウンロードしたconnect_device_packageの直下にhttps://github.com/szazo/DHT11_Python.gitをクローンする。

そして、./start.shで呼び出されているbasicPubSub.pyの中身を改変する。

connect_device_package/aws-iot-device-sdk-python/samples/basicPubSub/basicPubSub.py
'''
/*
 * Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
 '''

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from DHT11_Python import dht11
import RPi.GPIO as GPIO
import sys
import logging
import time
import argparse
import datetime
import json


# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")

# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub", help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
    parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
    exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
    parser.error("Missing credentials for authentication.")
    exit(2)

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, 443)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, 8883)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

# initialize GPIO
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.cleanup()

# read data using pin 2
instance = dht11.DHT11(pin=2)

while True:
    result = instance.read()
    if result.is_valid():
        data = {'timestamp': str(datetime.datetime.now()),
                'clientId': clientId,
                'temperature': result.temperature,
                'humidity': result.humidity
                }
        myAWSIoTMQTTClient.publish(topic, json.dumps(data), 1)
        time.sleep(1)

DHT11の結果をディクショナリ形式で格納し、myAWSIoTMQTTClient.publish(topic, json.dumps(data), 1)json.dumpsしてパブリッシュする。

また、start.shも次の設定で扱いやすいように、
basicPubSub.pyの呼び出しオプションを追加する。

start.sh
# stop script on error
set -e

# Check to see if root CA file exists, download if not
if [ ! -f ./root-CA.crt ]; then
  printf "\nDownloading AWS IoT Root CA certificate from Symantec...\n"
  curl https://www.symantec.com/content/en/us/enterprise/verisign/roots/VeriSign-Class%203-Public-Primary-Certification-Authority-G5.pem > root-CA.crt
fi

# install AWS Device SDK for Python if not already installed
if [ ! -d ./aws-iot-device-sdk-python ]; then
  printf "\nInstalling AWS SDK...\n"
  git clone https://github.com/aws/aws-iot-device-sdk-python.git
  pushd aws-iot-device-sdk-python
  python setup.py install
  popd
fi

# run pub/sub sample app using certificates downloaded in package
printf "\nRunning pub/sub sample application...\n"
python aws-iot-device-sdk-python/samples/basicPubSub/basicPubSub.py -e a33hl2ob9z80a1.iot.us-west-2.amazonaws.com -r root-CA.crt -c RaspberryPi01.cert.pem -k RaspberryPi01.private.key -t dht11

追加したのは-t dht11という部分で、
これはトピックの名前を設定する部分である。

この名前を設定しておくことで、受信したメッセージがどのトピックのものか判別がつき、
AWSIoT側で受信後の処理を振り分けることができる。

AWS IAMで必要な権限を持ったロールを作成しておく

本来は機能毎にロールを分けて必要以上に権限を持たせないのが良いが、
面倒くさいので今回は一つのロールに全て持たせる。
(どの権限が必要になるかわからなかったので、多めにアタッチした。
あとで勉強しておく必要あり。)

  • CloudWatchFullAccess
  • AmazonDynamoDBFullAccess
  • AmazonDynamoDBFullAccesswithDataPipeline
  • AWSLambdaDynamoDBExecutionRole
  • AWSLambdaInvocation-DynamoDB

AWSIoTでルールを設定する

ルールをクリック

8.png

右上の作成ボタンをクリック

9.png

名前と説明を設定する

10.png

どのメッセージにルールを適用するかを設定する

属性は*、トピックフィルターは先程、basicPubSub.pyの呼び出しに追加した-tオプションの値を入力する。

こうすることで-tオプションで指定した名前と一致したもののみにこのルールを適用することができる。

11.png

メッセージに対してどのような処理を行うか決める。

今回はまず、AWS IoTで受信したメッセージをDynamoDBに送り込みたいので、そのように設定する。

12.png

13.png

新しいリソースを作成するをクリック。

14.png

DHT11のデータを格納するテーブルを作成する

15.png

16.png

こんな感じに設定して、テーブルを作成する。

テーブルを選択する

もとの画面を戻りテーブルを選択する。

17.png

ハッシュキーの値とレンジキーの値が空欄になっているので、
そこには上記のように${timestamp}などを指定する。

なお、メッセージとして送信されるjsonの各メンバーには${member}という形でアクセスできる。

ロールを指定する

18.png

ここには前もって用意しておいたロールを指定する。

19.png

これでアクションが追加できたので、ルールを作成するをクリックしてルール作成は完了。

確認

実行してみて、DynamoDBにデータが入るか確認する。

$ ./start.sh
 .
 .
 .

2017-09-09 23:32:23,411 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Try to put a publish request 1394 in the TCP stack.
2017-09-09 23:32:23,412 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Publish request 1394 succeeded.
Received a new message: 
b'{"timestamp": "2017-09-09 23:32:23.409885", "clientId": "basicPubSub", "temperature": 27, "humidity": 70}'
from topic: 
dht11
--------------


2017-09-09 23:32:25,715 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Try to put a publish request 1395 in the TCP stack.
2017-09-09 23:32:25,716 - AWSIoTPythonSDK.core.protocol.mqttCore - DEBUG - Publish request 1395 succeeded.
Received a new message: 
b'{"timestamp": "2017-09-09 23:32:25.714083", "clientId": "basicPubSub", "temperature": 27, "humidity": 70}'
from topic: 
dht11
--------------

20.png

うまくいってるようだ。
NとかSとかは、NumberStringの略だそう。
最初NorthSouthかと思ってなんのこっちゃってなったけど。

DynamoDBからCloudWatchにデータを流す

トリガーの作成

長いけどあと一息。
DynamoDBのテーブルの画面から、トリガー => トリガーの作成 => 新規関数と選択。

21.png

22.png

トリガーの設定

ステップ1でdynamodb-process-streamが選択された状態で、ステップ2から始まるが、ステップ1に一旦戻る。

23.png

今回はdynamodb-process-streamのPython3版を選択した。

24.png

28.png

テーブルを指定し、開始位置は水平トリムにする。

25.png

トリガーの有効化は後でするので、チェックを入れずに次へ。

関数を定義する

26.png

適当に基本情報を埋める。

27.png

上図のように関数を定義できるので、以下のコードに変更する。

from __future__ import print_function

import json
import boto3
from decimal import Decimal

print('Loading function')


def lambda_handler(event, context):
    # print("Received event: " + json.dumps(event, indent=2))
    client = boto3.client('cloudwatch') 
    for record in event['Records']:
        # print(record['eventID'])
        # print(record['eventName'])
        # print("DynamoDB Record: " + json.dumps(record['dynamodb'], indent=2))
        print(record['dynamodb']['NewImage'])
        print(record['dynamodb']['NewImage']['timestamp']['S'])
        response = client.put_metric_data(
            Namespace='dht11',
            MetricData=[
                {
                    'MetricName': 'temperature',
                    'Dimensions': [
                        {
                            'Name': 'clientId',
                            'Value': record['dynamodb']['NewImage']['payload']['M']['clientId']['S'],
                        },
                    ],
                    'Value': Decimal(record['dynamodb']['NewImage']['payload']['M']['temperature']['N']),
                    'Unit': 'None'
                },
            ]
        ) 
        response = client.put_metric_data(
            Namespace='dht11',
            MetricData=[
                {
                    'MetricName': 'humidity',
                    'Dimensions': [
                        {
                            'Name': 'clientId',
                            'Value': record['dynamodb']['NewImage']['payload']['M']['clientId']['S'],
                        },
                    ],
                    'Value': Decimal(record['dynamodb']['NewImage']['payload']['M']['humidity']['N']),
                    'Unit': 'Percent'
                },
            ]
        )

編集したら、確認をして関数を保存する。

トリガーの有効化

DynamoDBのトリガーの画面に戻ると、作成した関数が出現している。
作成した関数を選択肢してトリガーの編集ボタンをクリック。

29.png

トリガーを選択してトリガーを有効化する。

30.png

CloudWatchでモニタリングする

あとはCloudWatch側で表示の設定をすれば、温度や湿度の推移が見えるようになる。
ダッシュボード => ダッシュボードの作成でダッシュボードを作成した後、
ウィジェットの追加からtemperaturehumidityを検索する。

Lambdaで作成されたメトリクスが出てくるので、それを選択すると・・・

31.png

無事、確認できる。

まとめ

  • AWSIoT => DynamoDB => Lambda => CloudWatchの連携で、DHT11センサの値をモニタリングすることができた。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした