概要
- SensorTagを使って時系列の環境データを測定したい
- RaspberryPi3が1個遊んでいるので使いたい
- AWSのPaaSをいくつか試したい
- DynamoDB、Lambda、API Gateway
- Slackで監視・操作したい(UI/UXは書きたくない)
→ 結果的に動いているものができたのでまとめます。
全体像
SensorTagからRaspberryPi3にデータ送信
TIのSensorTag CC2650stkを使いました。
加速度、温度、湿度、照度、ジャイロ、地磁気、気圧など多くのセンサーがついています。
SensorTagの電源
SensorTagは基本的に電池で駆動します。
ただし、センサーをたくさん動かすとその分電力消費が激しいです。
今回は、すべてのセンサー情報を時系列に取得してみたいということで、
SensorTagにデバッグ用基盤を取り付けて、RaspberryPi3とUSBで接続し、
USBケーブルで電源供給しました。これで長期間、電池を気にせずに計測できます。
SensorTagの電源のOnOff
こちらのサイトを参考にさせていただきました。
http://kinokotimes.com/2017/03/07/usb-control-method-by-raspberry-pi/
BeagleBoneBlackBox_USB電源制御
RaspberryPi側のアプリでセンサーデータの収集を行なうのですが、
たまにアプリが落ちることがあり、再起動しています。
その際に、念のためSensorTagの電源も再起動させています。
適切かどうかは置いておいて、これで起動時の状態は揃えられます。
#! /bin/sh
hub-ctrl -h 0 -P 2 -p 0
sleep 1
hub-ctrl -h 0 -P 2 -p 1
bluepyでの収集
こちらのサイトを参考にさせていただきました。
http://taku-make.blogspot.jp/2015/02/blesensortag.html
http://dev.classmethod.jp/hardware/raspberrypi/sensortag-raspberry-pi-2-script/
センサータグから情報を収集するにあたって、BLE関連のいろいろなライブラリがあったのですが、
bluepyが一番簡単に動きましたので、これで。
AWS IoTへの送信
AWSのSDKのサンプルコードをそのまま流用しました。
Apache2.0ライセンスです。BLEで取得する部分、JSONを作る部分など、変更を加えています。
基本的にX.509の認証情報を使って通信することが前提のコードになっていると思います。
'''
/*
* Copyright 2010-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
'''
###-----------------------------------------------------------------------------
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import sys
import logging
import time
import argparse
from btle import UUID, Peripheral, DefaultDelegate, AssignedNumbers
import struct
import math
from sensortag import SensorTag, KeypressDelegate
import json
from datetime import datetime
###-----------------------------------------------------------------------------
# Custom MQTT message callback
def customCallback(client, userdata, message):
print("--------------")
print("Received : " + message.payload)
print("from topic: " + message.topic)
print("--------------\n\n")
###-----------------------------------------------------------------------------
# Read in command-line parameters
parser = argparse.ArgumentParser()
### AWS!
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", dest="rootCAPath", default="root-CA.crt", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", default="certificate.pem.crt",help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", default="private.pem.key",help="Private key file path")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="Raspi_1", help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="etl/room", help="Targeted topic")
### SensorTag!
parser.add_argument('-n', action='store', dest='count', default=0,
type=int, help="Number of times to loop data")
parser.add_argument('-T',action='store', dest="sleeptime", type=float, default=5.0, help='time between polling')
parser.add_argument('-H', action='store', dest="taghost", help='MAC of BT device')
parser.add_argument('--all', action='store_true', default=True)
args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic
sleeptime = args.sleeptime
deviceID = args.clientId
###=============================================================================
if args.useWebsocket and args.certificatePath and args.privateKeyPath:
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
exit(2)
if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
parser.error("Missing credentials for authentication.")
exit(2)
###=============================================================================
# Enabling selected sensors
print('Connecting to ' + args.taghost)
tag = SensorTag(args.taghost)
if args.all:
tag.IRtemperature.enable()
tag.humidity.enable()
tag.barometer.enable()
tag.accelerometer.enable()
tag.magnetometer.enable()
tag.gyroscope.enable()
tag.battery.enable()
tag.keypress.enable()
tag.setDelegate(KeypressDelegate())
tag.lightmeter.enable()
time.sleep(1.0)
###=============================================================================
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
###=============================================================================
# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, 443)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, 8883)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(0.5) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
#myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)
###=============================================================================
# Publish to the same topic in a loop forever
loopCount = 0
Payload = {}
while True:
Payload['ID'] = str(deviceID)
ambient_temp, target_temparature = tag.IRtemperature.read()
Payload["AmbientTemp"] = ambient_temp
Payload["TargetTemp"] = target_temparature
ambient_temp, rel_humidity = tag.humidity.read()
Payload["Humidity"] = rel_humidity
ambient_temp, pressure_millibars = tag.barometer.read()
Payload["Barometer"] = pressure_millibars
Acc_x, Acc_y, Acc_z = tag.accelerometer.read()
Payload["AccX"] = Acc_x
Payload["AccY"] = Acc_y
Payload["AccZ"] = Acc_z
magnet_x, magnet_y, magnet_z = tag.magnetometer.read()
Payload["MagnetX"] = magnet_x
Payload["MagnetY"] = magnet_y
Payload["MagnetZ"] = magnet_z
gyro_x, gyro_y, gyro_z = tag.gyroscope.read()
Payload["GyroX"] = gyro_x
Payload["GyroY"] = gyro_y
Payload["GyroZ"] = gyro_z
Payload["Light"] = tag.lightmeter.read()
Payload["Batterys"] = tag.battery.read()
Payload["Count"] = loopCount
Payload["Datetime"] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
# print("try to send!")
myAWSIoTMQTTClient.publish(topic, json.dumps(Payload), 1)
# print("end!")
loopCount += 1
time.sleep(sleeptime)
Pythonプログラムの死活監視
上記のプログラムがたまに落ちます。
真面目にデバックをすることも意味はあるかもしれませんが、
ラズパイですし、Pythonですから、落ちるだろうと判断しました。
下記のようなスクリプトをcronで1分間に一回動作させます。
sendMQTT.pyは毎回終了して、cronだけで起動しても良いかもしれませんが、
まいかいPythonを起動するというのも、重たいな-というのと、
Pythonの中で繰り返しをカウントしている部分もあります。
だいたい10000回くらいは落ちずに動くこともわかったりするので、cronは死活監視ようにしています。
ちなみにcrontab -eで
-
-
-
-
- ./sendSensorData.sh
を設定しています。毎分動作です。
- ./sendSensorData.sh
-
-
-
#!/bin/sh
ps | grep python
if [ "$?" -eq 0 ]
then
logger "python is exist. exit..."
exit 0
else
logger "start reset sequence..."
sudo usbrefresh.sh
sleep 3
cd /home/pi/deviceSDK
python ./sendMQTT.py -T 59 -H "AA:BB:CC:DD:EE:FF" -e awsiotnoarn.iot.ap-northeast-1.amazonaws.com >/dev/null 2>&1 &
cd
exit 0
fi
毎周期動作させる精度
上記のスクリプトで -T 59としている部分があります。
これはPythonスクリプト内では無限ループで59秒sleepさせていることになります。
BLEでデータ取得する時間や、AWSとの通信時間など、微妙に変動があります。
リアルタイムOSを使って1分間ごとに割り込みを入れて、、、とかやっても良いかもしれませんが、
ラズパイ、raspbian jessie、Pythonという構成ですので、そこまでは無理と判断し、割り切っています。
実際に時系列のデータはとれますが、きっかり1分毎というわけには行きません。
数秒のジッタがあるというデータになります。
AWS PaaS
AWS側はサーバレスで構築します。勉強のために。
AWS IoT
こちらのサイトを参考にさせていただきました。
http://qiita.com/nsedo/items/c6f33c7cadea7023403f
あとはAWS SDKのままです。
MQTTはJSONを作って投げるだけですので、AWS IoTを使って悩むところはありませんでした。
JSONデータを受けて、DynamoDBにそのままPutしています。
DynamoDB
時系列データ保存用DB
KeyをID名と時刻情報にしました。
IoTで時系列データを組むときはこの方法が良いとどっかで聞いたことがあります。
その他は、取得したデータを並べるような内容です。
時系列データを分析するにあたって、DBには構造化しないデータを並べるだけにしています。
上記にJSONを作ってそのまま投げているところがありますので、わかると思いますが。
ステータス保存用DB
KeyをID名のみとしています。
Valueには
- Data:最後に受け取ったJSONそのまま
- isNight:照明がついていると認識しているかどうかのステート
です。
今回はLight情報の変化のみ扱っていますので、そのステートだけ保持しています。
Lambda側をステートレスにするために、ステートはDynamoDBにという狙いです。
念のため、JSONで書くとこういうデータになっています。
時系列DBの方は、下記のDataの項目のみ並んでいる事になります。
{
"Data": {
"AccX": {
"N": "0.915283203125"
},
"AccY": {
"N": "-0.129150390625"
},
"AccZ": {
"N": "0.48974609375"
},
"AmbientTemp": {
"N": "31.78125"
},
"Barometer": {
"N": "1006.03"
},
"Batterys": {
"N": "100"
},
"Count": {
"N": "261"
},
"Datetime": {
"S": "2017/09/06 13:31:35"
},
"GyroX": {
"N": "-4.0740966796875"
},
"GyroY": {
"N": "1.85394287109375"
},
"GyroZ": {
"N": "1.983642578125"
},
"Humidity": {
"N": "40.95458984375"
},
"ID": {
"S": "Raspi_1"
},
"Light": {
"N": "57.88"
},
"MagnetX": {
"N": "38.83418803418803"
},
"MagnetY": {
"N": "-19.34212454212454"
},
"MagnetZ": {
"N": "-17.842735042735043"
},
"TargetTemp": {
"N": "23.78125"
}
},
"ID": "Raspi_1",
"isNight": "0"
}
Lambda
ちゃんと書いてないところが多く、お恥ずかしいですが。。。
悩んだまま結論が出てないところとして、Exceptionの部分、内容がSlackに来るようになっていますが、
こういうときにどうすべきかなーというところで考えがまとまっていません。
LambdaでExceptionが発生すると繰り返し実行されるのか、繰り返しSlackにエラーをPostされるのも嫌なので、どうしようか、と思いつつも今後の課題としました。Lambdaの仕様も把握できてません。
部屋の状態を保持するRoomテーブルの構築も、ほぼ適当ですが
Key:ラズパイのID、ここでは"Raspi_1"
data:SensorTagの全データ、きたやつそのまんま
Light:現在のステートとして、照明が"点灯"or"消灯"と判別しているか
という設計です。
slackpost
dynamoDB streamからのデータをLambdaで受けるところです。
ETLRoomテーブルからLight情報を取得して、現在と比較し、
照明がついたor消えたを判別して、変化があればslackにpostします。
※ETLは部屋のあだ名です。
# coding : utf-8
import json
import datetime
import requests
import boto3
LIGHT_TAG='Light'
#===============================================================================
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"
# Post to Slack
def PostSlack(message, icon=':ghost:', username='ETLBot', channel="#room"):
Dict_Payload = {
"text": message,
"username": username,
"icon_emoji": icon,
"channel": channel,
}
return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))
#-------------------------------------------------------------------------------
def Check_LightChanges(new, old, IsNight):
Change2Night = None
Change2Morining = None
print("new:" , new, ", old:", old, ", IsNight:", IsNight)
if (IsNight=='1') and (new > (old + 50)):
Change2Morining = True
IsNight = '0'
elif (IsNight=='0') and (new < 10):
Change2Night = True
IsNight = '1'
# Down -> UP
if Change2Morining:
message = ":smiley: Light is Turned On. Good Morning! :smiley:"
icon = ":smiley:"
# UP -> Down
elif Change2Night:
message = ":ghost: Light is Turned Down. Good Bye! :ghost:"
icon = ":ghost:"
else:
return IsNight
PostSlack(message, icon=icon)
return IsNight
#-------------------------------------------------------------------------------
table = None
def update_table(data):
ID = data['ID']['S']
# Access to ETLRoom Table
global table
if not table:
table = boto3.resource('dynamodb').Table('ETLRoom')
response = table.get_item(Key={'ID': ID})
if response:
item = response['Item']
#PostSlack(json.dumps(item))
light = round(float(item['Data'][LIGHT_TAG]['N']))
IsNight = item['IsNight']
else:
light = 0
IsNight = Check_LightChanges(round(float(data[LIGHT_TAG]['N'])), light, IsNight)
# Update Room Table
response = table.put_item(
Item={
"ID": ID,
"Data" : data,
"IsNight": IsNight
}
)
return 0
#-------------------------------------------------------------------------------
def lambda_handler(event, context):
try:
for record in event['Records']:
dynamodb = record['dynamodb']
keys = dynamodb['Keys']
data = dynamodb['NewImage']
# Keys are "ID" and "Datetime".
id = keys['ID']['S']
datetime = keys['Datetime']['S']
print("ID:", id, "/ Date:", datetime)
update_table(data)
except Exception as e:
import traceback
message = traceback.format_exc()
print(message)
PostSlack('Meets Exception!\n' + message)
raise e
return 0
#===============================================================================
getroomenv
# coding : utf-8
import json
import requests
import boto3
# Slack Incomming Webhooks URL
SLACL_POST_URL = "https://hooks.slack.com/services/XXXXX/YYYYY/ZZZZZ"
#===============================================================================
def MakeStr(data, key, round_n):
return str(round(float(data[key]['N']), round_n))
table = None
def GetRoomEnv(id, isAll):
global table
if not table:
table = boto3.resource('dynamodb').Table('ETLRoom')
response = table.get_item(
Key={
'ID': id
}
)
data = response['Item']['Data']
light = MakeStr(data, 'Light', 1)
temp = MakeStr(data, 'TargetTemp', 1)
humid = MakeStr(data, 'Humidity', 1)
balo = MakeStr(data, 'Barometer', 1)
time = data['Datetime']['S']
message = "" \
+ "現在の温度は" + temp + "度で、湿度は"+ humid + "度です。\n" \
+ "明るさは" + light + "ルクスで、気圧は"+ balo + "hPaです。\n" \
+ "(" + time + "に計測:bar_chart:)"
# ALLの引数があれば引数があれば、全データをダンプする
if isAll:
message = ""
for d in data:
s = str(d)
v = data[s]
if "N" in v:
message += s + ":" + v["N"] + "\n"
else:
message += s + ":" + v["S"] + "\n"
return message
#-------------------------------------------------------------------------------
# POST to Slack
def PostSlack(message):
Dict_Payload = {
"text": message,
"username": 'NowBot',
"icon_emoji": ":clock3:",
"channel": '#room',
}
return requests.post(SLACL_POST_URL, data=json.dumps(Dict_Payload))
#-------------------------------------------------------------------------------
def lambda_handler(event, context):
isAll = False
try:
tri1 = 'text='
tri2 = 'trigger_word='
body = event['body']
tag = body[body.find(tri1)+len(tri1):body.find(tri2)-1]
taglist = tag.split("+")
for word in taglist:
if "ALL" in word:
isAll = True
except:
pass
try:
message = GetRoomEnv('Raspi_1', isAll)
except Exception as e:
import traceback
message = traceback.format_exc()
print(message)
PostSlack('Meets Exception!\n' + message)
raise e
PostSlack(message)
return 0
#===============================================================================
API Gateway
Slack
Custom IntegrationsにIncoming WebHooksとOutgoing WebHooksを入れました。
これでUIは使えます。超便利。
Incoming WebHooks
https://hooks.slack.com/services/XXXXXXX/YYYYYYY/ZZZZZZ
というSlackにPostするためのURLを生成し設定しておきます。
あとはLambdaからPostするのみです。
点灯と消灯は判別できます。下記が例です。
これをみると以下のことが見えてきます。
- だれかが土曜日の夕方忍び込んでこっそり仕事をしている
- 昨日、朝早く来た人は多分掃除のひとで、7:10-7:53まで滞在、意外と長い
- 最初のチームメンバーはAM9:12に来て、最後の人はPM7:49に帰った(健全!)
Outgoing WebHooks
Trigger Word
トリガーワードは「now」としています。
"now"と送るとNowBotが部屋の状態を伝えてくれます。
"now ALL"と送ると、データベースのデータを全部ダンプしてくれます。デバック用。
"now"の後に続く文言を自然言語処理(Amazon Pollyとか)して関連するパラメータ(温度とか)があれば、それを返すようなことも考えましたが、それは次の機会に。。。
URL
https://XXXXXX.execute-api.ap-northeast-1.amazonaws.com/prod/getroomenv
と設定しています。
これはAWS API Gatewayで作ったもので、裏にはLambdaが居ます。
getroomenvスクリプトが動いています。
感想
一通り動くまで3日くらいかかり、
その後2日間くらいブラッシュアップとか、様子を見たりとか、していました。
DynamoDBとLambdaについてはもう少し功夫が必要ですね。いい勉強になりました。
UIにSlackを使ったのは大正解でした。本当に便利。
時系列のデータを集められたので、
これを使って周期的な変動成分を取り、
いつもと違う動きがあれば、Slackに通知するなどできれば、
次の展開として良いと考えています。
土曜日に出社する人がいるのが通常かどうかは置いておいて。。。