Python
AWS
RaspberryPi
lambda
Slack

SensorTagからRaspberryPi3経由でAWSのPaaSを使ってSlackに環境データを通知する遊び

More than 1 year has passed since last update.

概要

  • SensorTagを使って時系列の環境データを測定したい
  • RaspberryPi3が1個遊んでいるので使いたい
  • AWSのPaaSをいくつか試したい
    • DynamoDB、Lambda、API Gateway
  • Slackで監視・操作したい(UI/UXは書きたくない)

→ 結果的に動いているものができたのでまとめます。

全体像

SensorTag2Slack.png
図はwww.draw.ioで作成しました。超便利。

SensorTagからRaspberryPi3にデータ送信

TIのSensorTag CC2650stkを使いました。
加速度、温度、湿度、照度、ジャイロ、地磁気、気圧など多くのセンサーがついています。

SensorTagの電源

SensorTagは基本的に電池で駆動します。
ただし、センサーをたくさん動かすとその分電力消費が激しいです。
今回は、すべてのセンサー情報を時系列に取得してみたいということで、
SensorTagにデバッグ用基盤を取り付けて、RaspberryPi3とUSBで接続し、
USBケーブルで電源供給しました。これで長期間、電池を気にせずに計測できます。

SensorTagの電源のOnOff

こちらのサイトを参考にさせていただきました。
http://kinokotimes.com/2017/03/07/usb-control-method-by-raspberry-pi/
BeagleBoneBlackBox_USB電源制御

RaspberryPi側のアプリでセンサーデータの収集を行なうのですが、
たまにアプリが落ちることがあり、再起動しています。
その際に、念のためSensorTagの電源も再起動させています。
適切かどうかは置いておいて、これで起動時の状態は揃えられます。

usbrefresh.sh
#! /bin/sh
hub-ctrl -h 0 -P 2 -p 0
sleep 1
hub-ctrl -h 0 -P 2 -p 1

bluepyでの収集

こちらのサイトを参考にさせていただきました。
http://taku-make.blogspot.jp/2015/02/blesensortag.html
http://dev.classmethod.jp/hardware/raspberrypi/sensortag-raspberry-pi-2-script/
センサータグから情報を収集するにあたって、BLE関連のいろいろなライブラリがあったのですが、
bluepyが一番簡単に動きましたので、これで。

AWS IoTへの送信

AWSのSDKのサンプルコードをそのまま流用しました。
Apache2.0ライセンスです。BLEで取得する部分、JSONを作る部分など、変更を加えています。
基本的にX.509の認証情報を使って通信することが前提のコードになっていると思います。

sendMQTT.py
'''
/*
 * Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
 '''

###-----------------------------------------------------------------------------

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import sys
import logging
import time
import argparse
from btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
from sensortag import SensorTag, KeypressDelegate
import json
from datetime import datetime

###-----------------------------------------------------------------------------

# Custom MQTT message callback
def customCallback(client, userdata, message):
        print("--------------")
    print("Received  : " + message.payload)
    print("from topic: " + message.topic)
    print("--------------\n\n")

###-----------------------------------------------------------------------------

# Read in command-line parameters
parser = argparse.ArgumentParser()

### AWS!
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", dest="rootCAPath", default="root-CA.crt", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", default="certificate.pem.crt",help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", default="private.pem.key",help="Private key file path")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="Raspi_1", help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="etl/room", help="Targeted topic")

### SensorTag!
parser.add_argument('-n', action='store', dest='count', default=0,
        type=int, help="Number of times to loop data")
parser.add_argument('-T',action='store', dest="sleeptime", type=float, default=5.0, help='time between polling')
parser.add_argument('-H', action='store', dest="taghost", help='MAC of BT device')
parser.add_argument('--all', action='store_true', default=True)

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic
sleeptime = args.sleeptime
deviceID = args.clientId

###=============================================================================

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
    parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
    exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
    parser.error("Missing credentials for authentication.")
    exit(2)

###=============================================================================

# Enabling selected sensors
print('Connecting to ' + args.taghost)
tag = SensorTag(args.taghost)
if args.all:
    tag.IRtemperature.enable()
    tag.humidity.enable()
    tag.barometer.enable()
    tag.accelerometer.enable()
    tag.magnetometer.enable()
    tag.gyroscope.enable()
    tag.battery.enable()
    tag.keypress.enable()
    tag.setDelegate(KeypressDelegate())
    tag.lightmeter.enable()
    time.sleep(1.0)

###=============================================================================

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

###=============================================================================

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, 443)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, 8883)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(0.5)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
#myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

###=============================================================================

# Publish to the same topic in a loop forever
loopCount = 0
Payload = {}
while True:
    Payload['ID'] = str(deviceID)

    ambient_temp, target_temparature = tag.IRtemperature.read()
    Payload["AmbientTemp"] = ambient_temp
    Payload["TargetTemp"] = target_temparature

    ambient_temp, rel_humidity = tag.humidity.read()
    Payload["Humidity"] = rel_humidity

    ambient_temp, pressure_millibars = tag.barometer.read()
    Payload["Barometer"] = pressure_millibars

    Acc_x, Acc_y, Acc_z = tag.accelerometer.read()
    Payload["AccX"] = Acc_x
    Payload["AccY"] = Acc_y
    Payload["AccZ"] = Acc_z

    magnet_x, magnet_y, magnet_z = tag.magnetometer.read()
    Payload["MagnetX"] = magnet_x
    Payload["MagnetY"] = magnet_y
    Payload["MagnetZ"] = magnet_z

    gyro_x, gyro_y, gyro_z = tag.gyroscope.read()
    Payload["GyroX"] = gyro_x
    Payload["GyroY"] = gyro_y
    Payload["GyroZ"] = gyro_z

    Payload["Light"] = tag.lightmeter.read()
    Payload["Batterys"] = tag.battery.read()

    Payload["Count"] = loopCount
    Payload["Datetime"] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")

#    print("try to send!")
    myAWSIoTMQTTClient.publish(topic, json.dumps(Payload), 1)
#    print("end!")
    loopCount += 1
    time.sleep(sleeptime)

Pythonプログラムの死活監視

上記のプログラムがたまに落ちます。
真面目にデバックをすることも意味はあるかもしれませんが、
ラズパイですし、Pythonですから、落ちるだろうと判断しました。
下記のようなスクリプトをcronで1分間に一回動作させます。
sendMQTT.pyは毎回終了して、cronだけで起動しても良いかもしれませんが、
まいかいPythonを起動するというのも、重たいな-というのと、
Pythonの中で繰り返しをカウントしている部分もあります。
だいたい10000回くらいは落ちずに動くこともわかったりするので、cronは死活監視ようにしています。

ちなみにcrontab -eで
* * * * * ./sendSensorData.sh
を設定しています。毎分動作です。

sendSensorData.sh
#!/bin/sh

ps | grep python
if [ "$?" -eq 0 ]
then
  logger "python is exist.  exit..."
  exit 0
else

  logger "start reset sequence..."
  sudo usbrefresh.sh
  sleep 3

  cd /home/pi/deviceSDK
  python ./sendMQTT.py -T 59 -H "AA:BB:CC:DD:EE:FF" -e awsiotnoarn.iot.ap-northeast-1.amazonaws.com  >/dev/null 2>&1 & 

  cd
  exit 0
fi

毎周期動作させる精度

上記のスクリプトで -T 59としている部分があります。
これはPythonスクリプト内では無限ループで59秒sleepさせていることになります。
BLEでデータ取得する時間や、AWSとの通信時間など、微妙に変動があります。
リアルタイムOSを使って1分間ごとに割り込みを入れて、、、とかやっても良いかもしれませんが、
ラズパイ、raspbian jessie、Pythonという構成ですので、そこまでは無理と判断し、割り切っています。
実際に時系列のデータはとれますが、きっかり1分毎というわけには行きません。
数秒のジッタがあるというデータになります。

AWS PaaS

AWS側はサーバレスで構築します。勉強のために。

AWS IoT

こちらのサイトを参考にさせていただきました。
http://qiita.com/nsedo/items/c6f33c7cadea7023403f
あとはAWS SDKのままです。
MQTTはJSONを作って投げるだけですので、AWS IoTを使って悩むところはありませんでした。
JSONデータを受けて、DynamoDBにそのままPutしています。

DynamoDB

時系列データ保存用DB

KeyをID名と時刻情報にしました。
IoTで時系列データを組むときはこの方法が良いとどっかで聞いたことがあります。
その他は、取得したデータを並べるような内容です。
時系列データを分析するにあたって、DBには構造化しないデータを並べるだけにしています。
上記にJSONを作ってそのまま投げているところがありますので、わかると思いますが。

ステータス保存用DB

KeyをID名のみとしています。
Valueには
- Data:最後に受け取ったJSONそのまま
- isNight:照明がついていると認識しているかどうかのステート
です。
今回はLight情報の変化のみ扱っていますので、そのステートだけ保持しています。
Lambda側をステートレスにするために、ステートはDynamoDBにという狙いです。

念のため、JSONで書くとこういうデータになっています。
時系列DBの方は、下記のDataの項目のみ並んでいる事になります。

room.json
{
  "Data": {
    "AccX": {
      "N": "0.915283203125"
    },
    "AccY": {
      "N": "-0.129150390625"
    },
    "AccZ": {
      "N": "0.48974609375"
    },
    "AmbientTemp": {
      "N": "31.78125"
    },
    "Barometer": {
      "N": "1006.03"
    },
    "Batterys": {
      "N": "100"
    },
    "Count": {
      "N": "261"
    },
    "Datetime": {
      "S": "2017/09/06 13:31:35"
    },
    "GyroX": {
      "N": "-4.0740966796875"
    },
    "GyroY": {
      "N": "1.85394287109375"
    },
    "GyroZ": {
      "N": "1.983642578125"
    },
    "Humidity": {
      "N": "40.95458984375"
    },
    "ID": {
      "S": "Raspi_1"
    },
    "Light": {
      "N": "57.88"
    },
    "MagnetX": {
      "N": "38.83418803418803"
    },
    "MagnetY": {
      "N": "-19.34212454212454"
    },
    "MagnetZ": {
      "N": "-17.842735042735043"
    },
    "TargetTemp": {
      "N": "23.78125"
    }
  },
  "ID": "Raspi_1",
  "isNight": "0"
}

Lambda

ちゃんと書いてないところが多く、お恥ずかしいですが。。。
悩んだまま結論が出てないところとして、Exceptionの部分、内容がSlackに来るようになっていますが、
こういうときにどうすべきかなーというところで考えがまとまっていません。
LambdaでExceptionが発生すると繰り返し実行されるのか、繰り返しSlackにエラーをPostされるのも嫌なので、どうしようか、と思いつつも今後の課題としました。Lambdaの仕様も把握できてません。

部屋の状態を保持するRoomテーブルの構築も、ほぼ適当ですが
Key:ラズパイのID、ここでは"Raspi_1"
data:SensorTagの全データ、きたやつそのまんま
Light:現在のステートとして、照明が"点灯"or"消灯"と判別しているか
という設計です。

slackpost

dynamoDB streamからのデータをLambdaで受けるところです。
ETLRoomテーブルからLight情報を取得して、現在と比較し、
照明がついたor消えたを判別して、変化があればslackにpostします。
※ETLは部屋のあだ名です。

slackpost.py
# coding : utf-8
import json
import datetime
import requests
import boto3

LIGHT_TAG='Light'

#===============================================================================
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"

# Post to Slack
def PostSlack(message, icon=':ghost:', username='ETLBot', channel="#room"):

    Dict_Payload = {
    "text": message,
    "username": username,
    "icon_emoji": icon,
    "channel": channel,
    }
    return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))

#-------------------------------------------------------------------------------

def Check_LightChanges(new, old, IsNight):

    Change2Night = None
    Change2Morining = None
    print("new:" , new, ", old:", old, ", IsNight:", IsNight)

    if (IsNight=='1') and (new > (old + 50)):
        Change2Morining = True
        IsNight = '0'
    elif (IsNight=='0') and (new < 10):
        Change2Night = True
        IsNight = '1'

    # Down -> UP
    if Change2Morining:
        message = ":smiley: Light is Turned On. Good Morning! :smiley:"
        icon = ":smiley:"
    # UP -> Down
    elif Change2Night:
        message = ":ghost: Light is Turned Down. Good Bye! :ghost:"
        icon = ":ghost:"
    else:
        return IsNight

    PostSlack(message, icon=icon)
    return IsNight
#-------------------------------------------------------------------------------

table = None
def update_table(data):
    ID = data['ID']['S']

    # Access to ETLRoom Table
    global table
    if not table:
        table = boto3.resource('dynamodb').Table('ETLRoom')
    response = table.get_item(Key={'ID': ID})
    if response:
        item = response['Item']
        #PostSlack(json.dumps(item))
        light = round(float(item['Data'][LIGHT_TAG]['N']))
        IsNight = item['IsNight']
    else:
        light = 0

    IsNight = Check_LightChanges(round(float(data[LIGHT_TAG]['N'])), light, IsNight)

    # Update Room Table
    response = table.put_item(
    Item={
          "ID": ID,
          "Data" : data,
          "IsNight": IsNight
        }
    )

    return 0

#-------------------------------------------------------------------------------

def lambda_handler(event, context):

    try:
        for record in event['Records']:
            dynamodb = record['dynamodb']
            keys = dynamodb['Keys']
            data = dynamodb['NewImage']

        # Keys are "ID" and "Datetime".
        id = keys['ID']['S']
        datetime = keys['Datetime']['S']
        print("ID:", id, "/ Date:", datetime)

        update_table(data)
    except Exception as e:
        import traceback
        message = traceback.format_exc()
        print(message)
        PostSlack('Meets Exception!\n' + message)
        raise e

    return 0

#===============================================================================

getroomenv

getroomenv.py
# coding : utf-8
import json
import requests
import boto3

# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"

#===============================================================================

def MakeStr(data, key, round_n):
    return str(round(float(data[key]['N']), round_n))

table = None
def GetRoomEnv(id, isAll):

    global table
    if not table:
        table = boto3.resource('dynamodb').Table('ETLRoom')

    response = table.get_item(
        Key={
            'ID': id
        }
    )

    data = response['Item']['Data']
    light = MakeStr(data, 'Light', 1)
    temp = MakeStr(data, 'TargetTemp', 1)
    humid = MakeStr(data, 'Humidity', 1)
    balo = MakeStr(data, 'Barometer', 1)
    time = data['Datetime']['S']

    message = "" \
        + "現在の温度は" + temp + "度で、湿度は"+ humid + "度です。\n" \
        + "明るさは" + light + "ルクスで、気圧は"+ balo + "hPaです。\n" \
        + "(" + time + "に計測:bar_chart:)"

    # ALLの引数があれば引数があれば、全データをダンプする
    if isAll:
        message = ""
        for d in data:
            s = str(d)
            v = data[s]
            if "N" in v:
                message += s + ":" + v["N"] + "\n"
            else:
                message += s + ":" + v["S"] + "\n"

    return message

#-------------------------------------------------------------------------------

# POST to Slack
def PostSlack(message):

    Dict_Payload = {
    "text": message,
    "username": 'NowBot',
    "icon_emoji": ":clock3:",
    "channel": '#room',
    }
    return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))

#-------------------------------------------------------------------------------
def lambda_handler(event, context):

    isAll = False
    try:
        tri1 = 'text='
        tri2 = 'trigger_word='
        body = event['body']
        tag = body[body.find(tri1)+len(tri1):body.find(tri2)-1]
        taglist = tag.split("+")
        for word in taglist:
            if "ALL" in word:
                isAll = True
    except:
        pass

    try:
        message = GetRoomEnv('Raspi_1', isAll)
    except Exception as e:
        import traceback
        message = traceback.format_exc()
        print(message)
        PostSlack('Meets Exception!\n' + message)
        raise e

    PostSlack(message)
    return 0

#===============================================================================

API Gateway

Slack

Custom IntegrationsにIncoming WebHooksとOutgoing WebHooksを入れました。
これでUIは使えます。超便利。

Incoming WebHooks

https://hooks.slack.com/services/XXXXXXX/YYYYYYY/ZZZZZZ
というSlackにPostするためのURLを生成し設定しておきます。
あとはLambdaからPostするのみです。
点灯と消灯は判別できます。下記が例です。

スクリーンショット 2017-09-05 21.07.36.png

これをみると以下のことが見えてきます。

  • だれかが土曜日の夕方忍び込んでこっそり仕事をしている
  • 昨日、朝早く来た人は多分掃除のひとで、7:10-7:53まで滞在、意外と長い
  • 最初のチームメンバーはAM9:12に来て、最後の人はPM7:49に帰った(健全!)

Outgoing WebHooks

Trigger Word

トリガーワードは「now」としています。
"now"と送るとNowBotが部屋の状態を伝えてくれます。

スクリーンショット 2017-09-05 20.07.48.png

"now ALL"と送ると、データベースのデータを全部ダンプしてくれます。デバック用。

スクリーンショット 2017-09-05 20.11.08.png

"now"の後に続く文言を自然言語処理(Amazon Pollyとか)して関連するパラメータ(温度とか)があれば、それを返すようなことも考えましたが、それは次の機会に。。。

URL

https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/prod/getroomenv
と設定しています。
これはAWS API Gatewayで作ったもので、裏にはLambdaが居ます。
getroomenvスクリプトが動いています。

感想

一通り動くまで3日くらいかかり、
その後2日間くらいブラッシュアップとか、様子を見たりとか、していました。
DynamoDBとLambdaについてはもう少し功夫が必要ですね。いい勉強になりました。
UIにSlackを使ったのは大正解でした。本当に便利。

時系列のデータを集められたので、
これを使って周期的な変動成分を取り、
いつもと違う動きがあれば、Slackに通知するなどできれば、
次の展開として良いと考えています。
土曜日に出社する人がいるのが通常かどうかは置いておいて。。。