ラズパイとAWS IoTを使った見守りシステム自作で学んだこと(4)cliでAWS IoTに新しいモノを登録し証明書を発行からの続きです。
今回はセンサー検知したデータをAWS IoTにpubする仕組みを説明します。下図の着色部分に該当します。
実行環境
リポジトリーのREADMEに沿って「ラズパイとAWS IoTを使った見守りシステム自作で学んだこと」全10回の投稿の中で
(2)見守りProjectの設計(名前の設定)
(3)ラズパイ物理構成とパッケージ導入
(4)cliでAWS IoTに新しいモノを登録し証明書を発行
(7) ラズパイの自動起動の設定方法(本稿の投稿時点で未作成)
の内容を構築した環境下で実行できます。
使用するファイル
treeに示した5つのPython codesとcertに格納した証明書と鍵を使います。図はモノの名前にCatBed1と命名した場合の例です。
証明書と鍵は前回の(4)稿目の環境が正しく実施できて/home/user/certに導入されていると想定して追加説明はしません。
/home/user
・
・
┣ parameters.py
┣ sensing.py
┣ counter.py
┣ awsMQTTconnect.py
┣ main.py
┣ /cert
┃ ┣ CatBed1-certificate.pem.crt
┃ ┣ CatBed1-private.pem.key
┃ ┣ AmazonRootCA1.pem
┃ ・
動作
全体の動作は図のようなシーケンスになります。
以下、個別のモジュールを説明します。
parameters.py
このファイルは/home/userに配置した一連のPythonファイルを実行するのに必要な変数を提供します。
AWS IoTにpubするのに必要な変数は5~18行目に定義されます。
まず変数定義の前に必要なライブラリーを読み込みます。
import os
import datetime
import paho.mqtt.client as mqtt
2行目のdatetimeは次の(6)稿目で使います。今回は使いません。
ここから下、9行目と17行目以外は(7)稿目に説明する環境変数からの読み取りです。各変数の役割はコメントの通りです。
HOST = os.environ['HOST_ENDPOINT'] # AWS IoT Endpoint
CACERT = os.environ['CACERT'] # root ca
CLIENTCERT = os.environ['CLIENTCERT'] # certificate
CLIENTKEY = os.environ['CLIENTKEY'] # private key
TOPIC_DETECT_COUNT = os.environ['TOPIC_DETECT'] # topic
ID = TOPIC_DETECT_COUNT.split("/")[0]
SENSOR_NO = int(os.environ['SENSOR_NO']) #normal close:0, normal open:1
次にmqtt.client()オブジェクトをインスタンス化しておきます。client_idはAWS IoT coreに登録したモノの名前と同じです。
client = mqtt.Client(client_id=ID, protocol=mqtt.MQTTv311)
赤外線センサーの信号線を接続するDETECT_PINのGPIO port番号は変更可能です。
PORT = 8883 # mqtts port
DETECT_PIN = 21 #signal port
repositoryでは以下の順序で記述されています。
import os
import datetime
import paho.mqtt.client as mqtt
#####################################################################
#environment for dashboard system
#####################################################################
HOST = os.environ['HOST_ENDPOINT'] # AWS IoT Endpoint
PORT = 8883 # mqtts port
CACERT = os.environ['CACERT'] # root ca
CLIENTCERT = os.environ['CLIENTCERT'] # certificate
CLIENTKEY = os.environ['CLIENTKEY'] # private key
TOPIC_DETECT_COUNT = os.environ['TOPIC_DETECT'] # topic
ID = TOPIC_DETECT_COUNT.split("/")[0]
client = mqtt.Client(client_id=ID, protocol=mqtt.MQTTv311)
DETECT_PIN = 21 #signal port
SENSOR_NO = int(os.environ['SENSOR_NO']) #normal close:0, normal open:1
sensing.py
標準のGPIOライブラリーを使いHIGH/LOWを検出して1/0で返します。
メソッドが1つなのにクラス定義されている、どうして……
すいません、ファイルを流用した際に不要部分を消しましたが、横着してクラスを残してありますmm
import parameters as para
とあるように最初に設定したparameters.pyから値を読み込みクラス変数としてインスタンス化します。
import RPi.GPIO as GPIO
import time
import parameters as para
class Sensor:
def __init__(self):
self.detect_pin = para.DETECT_PIN #Motion sensor signal port : GPIO 21
self.sensor_no = para.SENSOR_NO #normal close:0, normal open:1
GPIO.setmode(GPIO.BCM) #GPIO初期化
GPIO.setup(self.detect_pin, GPIO.IN) #port21の動作を読み取り定義
#Output sensor HI / LO at 1/0
def detect_counter(self):
sig = 0
if self.sensor_no == 1:
if GPIO.input(self.detect_pin) == GPIO.HIGH:
sig = 0
else:
sig = 1
else:
if GPIO.input(self.detect_pin) == GPIO.HIGH:
sig = 1
return sig
parameters.pyでセンサータイプのNC/NOを識別できるようにSENSOR_NO変数にフラグを立てましたが、NOの場合はdetect_counter()の中でGPIOから入力されたHIGH/LOWをif文を使い反転させるのに使います。
counter.py
- sig:sensing.pyからリターンされるセンサーHIGH/LOW(1/0)
- motion_count:sigの値を積算して保存する変数
これら二つの引数をもらい最新のsig積算値をリターンします。
parameters.pyから値を読み込み使用します。
ここもメソッドが一つになってもクラスを残してありますが大目に見てくださいmm
import datetime
import parameters as para
class Count:
def __init__(self):
return
#If the sensor has HI output, increment the counter
def motion_count(self, sig, motion_count):
if sig == 1:
motion_count = motion_count + 1
return motion_count
awsMQTTconnect.py
ラズパイとAWS IoTでmqtts通信をします。
mqtts接続とpublishでクラスを分けました。
まずライブラリーを読み込みます。
import json
import ssl
import time
from datetime import datetime
import subprocess
import parameters as para
mqtts接続用メソッド(class:Com)
初期化時にparameters.pyからclientインスタンス、mqtt通信に必要な証明書、鍵、エンドポイント、port No.を読み込みます。
class Com:
def __init__(self):
self.client = para.client
self.cacert = para.CACERT
self.clientCert = para.CLIENTCERT
self.clientKey = para.CLIENTKEY
self.host = para.HOST
self.port = para.PORT
次にコールバック関数を定義します。
よく見かけるsample codeとの違いは、接続が確立できない場合は一定時間後にラズパイを再起動させるところです。
正常通信が前提で接続できないことが例外とすると、たまたま接続ミスしてるけど再起動させればつながるんじゃね的な発想で、自動運用のための小ネタとして組み込んでいます。
def on_connect(self, client, userdata, flags, respons_code):
#If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
if respons_code != 0:
print("respons_code:", respons_code, " flags:", flags)
time.sleep(90)
subprocess.call(["sudo","reboot"])
print('Connected')
もう一つget_ssid()も、たまたまwifi接続できなかったら再起動してつなげてしまえ、という目的のメソッドです。
def get_ssid(self):
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)\
.stdout.decode().rstrip()
idx = r.find('ESSID:')
#If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
if r[idx + 7:-1] == "ff/an":
print("ESSID:", r[idx + 7:-1])
time.sleep(90)
subprocess.call(["sudo","reboot"])
次は標準的なDocumentから引用できるcodeほぼそのままです。TLS通信でmqtts接続を確立してサーバー間でクロスチェックを繰り返すメソッドです。
def aws_connect(self):
try:
# certifications
self.client.tls_set(
self.cacert,
certfile=self.clientCert,
keyfile=self.clientKey,
tls_version=ssl.PROTOCOL_TLSv1_2)
self.client.tls_insecure_set(True)
# callback
self.client.on_connect = self.on_connect
#client.on_disconnect = on_disconnect
# port, keepalive
self.client.connect(self.host, self.port, keepalive=60)
self.client.loop_start()
except KeyboardInterrupt:
time.sleep(90)
subprocess.call(["sudo","reboot"])
publish用メソッド(class:Pub)
初期化時にparameters.pyからclientインスタンスとmqtt topicを読み込みます。
class Pub:
def __init__(self):
self.client = para.client
self.topic_count = para.TOPIC_DETECT_COUNT
AWS IoTへpubするメソッドです。
10分に1度だけpubする為に使う判定フラグとpubする値(sigの積算値)を引き数として受け取ります。
def publish_motion_count(self, sub_t_count, detect_count):
sub_tが10分毎にpubする為に使う判定フラグになります。
datetime.minuteを10で割って一桁目が0になれば毎10分(0分、10分、20分、30分、40分、50分)だと判定できます。
毎10分でなければsub_t_countフラグは0のままにします。
t = datetime.now()
sub_t = str(t.minute/10)
if sub_t[-1] != "0":
sub_t_count = 0
そしてsub_t_count=0で最初にsub_tの一桁目が0になった時にpublishします。
if (sub_t[-1] == "0") and sub_t_count == 0:
# IoTcoreへpublish
count_data['Timestamp'] = int(time.time())
count_data['detect_count'] = detect_count
self.client.publish(self.topic_count, json.dumps(count_data, default=self.json_serial), qos=1)
このままでは毎10分の間はずっとpubし続けるので、一度pubしたらsub_t_countフラグに1を立てて2回以降はpublishさせません。
sub_t_countフラグ以外にpublishした時のみTrueを、それ以外はFalseを返します。
sub_t_count = 1
return True, sub_t_count
if (t.minute == 0 or sub_t[-1] == 0) and sub_t_count == 1:
pass
return False, sub_t_count
もう一点、publishするjsonは{'Timestamp': int(time.time()), 'detect_count':detect_count}となりtimestampが含まれるためこれを文字列に変換する必要があります。
そこでisoformat()を使います。
def json_serial(self, para):
return para.isoformat()
awsMQTTconnect.pyを全てつなげます
import json
import ssl
import time
from datetime import datetime
import subprocess
import parameters as para
class Com:
def __init__(self):
self.client = para.client
self.cacert = para.CACERT
self.clientCert = para.CLIENTCERT
self.clientKey = para.CLIENTKEY
self.host = para.HOST
self.port = para.PORT
#Callback function when mqtt connection is successful
def on_connect(self, client, userdata, flags, respons_code):
#If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
if respons_code != 0:
print("respons_code:", respons_code, " flags:", flags)
time.sleep(90)
subprocess.call(["sudo","reboot"])
print('Connected')
#Function to determine the establishment of wifi connection
def get_ssid(self):
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)\
.stdout.decode().rstrip()
idx = r.find('ESSID:')
#If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
if r[idx + 7:-1] == "ff/an":
print("ESSID:", r[idx + 7:-1])
time.sleep(90)
subprocess.call(["sudo","reboot"])
#Function that launches an MQTT client and creates an object instance
def aws_connect(self):
try:
# certifications
self.client.tls_set(
self.cacert,
certfile=self.clientCert,
keyfile=self.clientKey,
tls_version=ssl.PROTOCOL_TLSv1_2)
self.client.tls_insecure_set(True)
# callback
self.client.on_connect = self.on_connect
#client.on_disconnect = on_disconnect
# port, keepalive
self.client.connect(self.host, self.port, keepalive=60)
self.client.loop_start()
except KeyboardInterrupt:
time.sleep(90)
subprocess.call(["sudo","reboot"])
class Pub:
def __init__(self):
self.client = para.client
self.topic_count = para.TOPIC_DETECT_COUNT
#Function that dispenses motion sensor data to the cloud at 0 seconds per minute
def publish_motion_count(self, sub_t_count, detect_count):
count_data = {} #KeyValue to publish
t = datetime.now()
sub_t = str(t.minute/10)
if sub_t[-1] != "0":
sub_t_count = 0
if (sub_t[-1] == "0") and sub_t_count == 0:
# IoTcoreへpublish
count_data['Timestamp'] = int(time.time())
count_data['detect_count'] = detect_count
self.client.publish(self.topic_count, json.dumps(count_data, default=self.json_serial), qos=1)
sub_t_count = 1
return True, sub_t_count
if (t.minute == 0 or sub_t[-1] == 0) and sub_t_count == 1:
pass
return False, sub_t_count
def json_serial(self, para):
return para.isoformat()
main.py
最後にコンソールから実行するmain.pyです。
実行することでmqtts確立、連続ループの中でセンサー検知、積算、10分毎にpublishを繰り返します。
これまでに説明した自作パッケージを含むライブラリーを読み込みます。classはインスタンス化します。
import time
import sys
from awsMQTTconnect import Com, Pub
from sensing import Sensor
from counter import Count
sensor = Sensor()
com = Com()
pub = Pub()
count = Count()
awsMQTTconnect.pyがpubした時だけTrueを返してくるので、pubの直後にセンサー検知回数を積算しているdetect_countをloop()の中でリセットします。
ループは1秒毎で回します。
def loop():
sub_t_count = 0
detect_count = 0
while True:
sig_detect = sensor.detect_counter()
detect_count = count.motion_count(sig_detect, detect_count)
bool, sub_t_count = pub.publish_motion_count(sub_t_count, detect_count)
if bool == True:
detect_count = 0
time.sleep(1)
main.pyの中のmain()です。
wifi接続に失敗したら再起動させるメソッドはmain()の中に仕込みます。
90秒のtime.sleep()は起動時のwifi接続とmqtts接続の確立を待つためのものです。
Pi3B+でプロトタイプしていた時に余裕をもって時間設定していましたが、30秒くらいまで短縮してもエラーにはならないと思います。
if __name__ == '__main__':
try:
time.sleep(90)
#wifi connection confirmation and MQTT connection
com.get_ssid()
com.aws_connect()
#Main loop execution
loop()
except KeyboardInterrupt:
sys.exit()
main.pyを全体つなげます
import time
import sys
from awsMQTTconnect import Com, Pub
from sensing import Sensor
from counter import Count
sensor = Sensor()
com = Com()
pub = Pub()
count = Count()
def loop():
sub_t_count = 0
detect_count = 0
while True:
sig_detect = sensor.detect_counter()
detect_count = count.motion_count(sig_detect, detect_count)
bool, sub_t_count = pub.publish_motion_count(sub_t_count, detect_count)
if bool == True:
detect_count = 0
time.sleep(1)
if __name__ == '__main__':
try:
time.sleep(90)
#wifi connection confirmation and MQTT connection
com.get_ssid()
com.aws_connect()
#Main loop execution
loop()
except KeyboardInterrupt:
sys.exit()
動作確認
ラズパイの/home/userから実行します。
cd ..
python main.py
psコマンド等でpythonのプロセスが落ちずに実行されていることを確認します。
マネジメントコンソールからsubscribeを確認
CloudWatch Dashboardを確認
次回
(6) ラズパイでイベント録画してS3にuploadに続きます
追記
図追加 2022.09.03
sensingとpublishの連続動作のシーケンス図を追加しました