0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ラズパイとAWS IoTを使った見守りシステム自作で学んだこと(5)paho.mqttでセンサーデータをラズパイからAWS IoTにPublish

Last updated at Posted at 2022-08-24

ラズパイとAWS IoTを使った見守りシステム自作で学んだこと(4)cliでAWS IoTに新しいモノを登録し証明書を発行からの続きです。

今回はセンサー検知したデータをAWS IoTにpubする仕組みを説明します。下図の着色部分に該当します。

実行環境

リポジトリーのREADMEに沿って「ラズパイとAWS IoTを使った見守りシステム自作で学んだこと」全10回の投稿の中で
(2)見守りProjectの設計(名前の設定)
(3)ラズパイ物理構成とパッケージ導入
(4)cliでAWS IoTに新しいモノを登録し証明書を発行
(7) ラズパイの自動起動の設定方法(本稿の投稿時点で未作成)
の内容を構築した環境下で実行できます。

使用するファイル

treeに示した5つのPython codesとcertに格納した証明書と鍵を使います。図はモノの名前にCatBed1と命名した場合の例です。

証明書と鍵は前回の(4)稿目の環境が正しく実施できて/home/user/certに導入されていると想定して追加説明はしません。

/home/user
・
・
┣ parameters.py
┣ sensing.py
┣ counter.py
┣ awsMQTTconnect.py
┣ main.py
┣ /cert
┃ ┣ CatBed1-certificate.pem.crt
┃ ┣ CatBed1-private.pem.key
┃ ┣ AmazonRootCA1.pem
┃ ・

動作

全体の動作は図のようなシーケンスになります。

以下、個別のモジュールを説明します。

parameters.py

このファイルは/home/userに配置した一連のPythonファイルを実行するのに必要な変数を提供します。

AWS IoTにpubするのに必要な変数は5~18行目に定義されます。

まず変数定義の前に必要なライブラリーを読み込みます。

import os
import datetime
import paho.mqtt.client as mqtt

2行目のdatetimeは次の(6)稿目で使います。今回は使いません。

ここから下、9行目と17行目以外は(7)稿目に説明する環境変数からの読み取りです。各変数の役割はコメントの通りです。

HOST = os.environ['HOST_ENDPOINT']  # AWS IoT Endpoint
CACERT = os.environ['CACERT']  # root ca
CLIENTCERT = os.environ['CLIENTCERT']  # certificate
CLIENTKEY = os.environ['CLIENTKEY']  # private key
TOPIC_DETECT_COUNT = os.environ['TOPIC_DETECT']  # topic
ID = TOPIC_DETECT_COUNT.split("/")[0]
SENSOR_NO = int(os.environ['SENSOR_NO']) #normal close:0, normal open:1

次にmqtt.client()オブジェクトをインスタンス化しておきます。client_idはAWS IoT coreに登録したモノの名前と同じです。

client = mqtt.Client(client_id=ID, protocol=mqtt.MQTTv311)

赤外線センサーの信号線を接続するDETECT_PINのGPIO port番号は変更可能です。

PORT = 8883  # mqtts port
DETECT_PIN = 21 #signal port

repositoryでは以下の順序で記述されています。

parameters.py
import os
import datetime
import paho.mqtt.client as mqtt

#####################################################################
#environment for dashboard system
#####################################################################
HOST = os.environ['HOST_ENDPOINT']  # AWS IoT Endpoint
PORT = 8883  # mqtts port
CACERT = os.environ['CACERT']  # root ca
CLIENTCERT = os.environ['CLIENTCERT']  # certificate
CLIENTKEY = os.environ['CLIENTKEY']  # private key
TOPIC_DETECT_COUNT = os.environ['TOPIC_DETECT']  # topic
ID = TOPIC_DETECT_COUNT.split("/")[0]
client = mqtt.Client(client_id=ID, protocol=mqtt.MQTTv311)

DETECT_PIN = 21 #signal port

SENSOR_NO = int(os.environ['SENSOR_NO'])  #normal close:0, normal open:1

sensing.py

標準のGPIOライブラリーを使いHIGH/LOWを検出して1/0で返します。

メソッドが1つなのにクラス定義されている、どうして……
すいません、ファイルを流用した際に不要部分を消しましたが、横着してクラスを残してありますmm

import parameters as para

とあるように最初に設定したparameters.pyから値を読み込みクラス変数としてインスタンス化します。

sensing.py
import RPi.GPIO as GPIO
import time
import parameters as para

class Sensor:
    def __init__(self):
        self.detect_pin = para.DETECT_PIN #Motion sensor signal port : GPIO 21
        self.sensor_no = para.SENSOR_NO #normal close:0, normal open:1
        GPIO.setmode(GPIO.BCM) #GPIO初期化
        GPIO.setup(self.detect_pin, GPIO.IN) #port21の動作を読み取り定義

    #Output sensor HI / LO at 1/0
    def detect_counter(self): 
        sig = 0
        if self.sensor_no == 1:
            if GPIO.input(self.detect_pin) == GPIO.HIGH:
                sig = 0
            else:
                sig = 1
        else:
            if GPIO.input(self.detect_pin) == GPIO.HIGH:
                sig = 1

        return sig

parameters.pyでセンサータイプのNC/NOを識別できるようにSENSOR_NO変数にフラグを立てましたが、NOの場合はdetect_counter()の中でGPIOから入力されたHIGH/LOWをif文を使い反転させるのに使います。

counter.py

  • sig:sensing.pyからリターンされるセンサーHIGH/LOW(1/0)
  • motion_count:sigの値を積算して保存する変数

これら二つの引数をもらい最新のsig積算値をリターンします。

parameters.pyから値を読み込み使用します。
ここもメソッドが一つになってもクラスを残してありますが大目に見てくださいmm

counter.py
import datetime
import parameters as para

class Count:
    def __init__(self):
        return

    #If the sensor has HI output, increment the counter
    def motion_count(self, sig, motion_count):
            
        if sig == 1:
            motion_count = motion_count + 1

        return motion_count

awsMQTTconnect.py

ラズパイとAWS IoTでmqtts通信をします。
mqtts接続とpublishでクラスを分けました。

まずライブラリーを読み込みます。

import json
import ssl
import time
from datetime import datetime 
import subprocess
import parameters as para

mqtts接続用メソッド(class:Com)

初期化時にparameters.pyからclientインスタンス、mqtt通信に必要な証明書、鍵、エンドポイント、port No.を読み込みます。

class Com:
    def __init__(self):
        self.client = para.client
        self.cacert = para.CACERT
        self.clientCert = para.CLIENTCERT
        self.clientKey = para.CLIENTKEY
        self.host = para.HOST
        self.port = para.PORT

次にコールバック関数を定義します。
よく見かけるsample codeとの違いは、接続が確立できない場合は一定時間後にラズパイを再起動させるところです。

正常通信が前提で接続できないことが例外とすると、たまたま接続ミスしてるけど再起動させればつながるんじゃね的な発想で、自動運用のための小ネタとして組み込んでいます。

    def on_connect(self, client, userdata, flags, respons_code):
        #If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
        if respons_code != 0:
            print("respons_code:", respons_code, " flags:", flags)
            time.sleep(90)
            subprocess.call(["sudo","reboot"])
        print('Connected')

もう一つget_ssid()も、たまたまwifi接続できなかったら再起動してつなげてしまえ、という目的のメソッドです。

    def get_ssid(self):
        cmd = 'iwconfig wlan0|grep ESSID'
        r = subprocess.run(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)\
            .stdout.decode().rstrip()
        idx = r.find('ESSID:')
        #If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
        if r[idx + 7:-1] == "ff/an":
            print("ESSID:", r[idx + 7:-1])
            time.sleep(90)
            subprocess.call(["sudo","reboot"])

次は標準的なDocumentから引用できるcodeほぼそのままです。TLS通信でmqtts接続を確立してサーバー間でクロスチェックを繰り返すメソッドです。

    def aws_connect(self):
        try:
            # certifications
            self.client.tls_set(
                self.cacert,
                certfile=self.clientCert,
                keyfile=self.clientKey,
                tls_version=ssl.PROTOCOL_TLSv1_2)
            self.client.tls_insecure_set(True)

            # callback
            self.client.on_connect = self.on_connect
            #client.on_disconnect = on_disconnect

            # port, keepalive
            self.client.connect(self.host, self.port, keepalive=60)

            self.client.loop_start()

        except KeyboardInterrupt:
            time.sleep(90)
            subprocess.call(["sudo","reboot"])

publish用メソッド(class:Pub)

初期化時にparameters.pyからclientインスタンスとmqtt topicを読み込みます。

class Pub:
    def __init__(self):
        self.client = para.client
        self.topic_count = para.TOPIC_DETECT_COUNT

AWS IoTへpubするメソッドです。
10分に1度だけpubする為に使う判定フラグとpubする値(sigの積算値)を引き数として受け取ります。

def publish_motion_count(self, sub_t_count, detect_count): 

sub_tが10分毎にpubする為に使う判定フラグになります。
datetime.minuteを10で割って一桁目が0になれば毎10分(0分、10分、20分、30分、40分、50分)だと判定できます。

毎10分でなければsub_t_countフラグは0のままにします。

        t = datetime.now() 
        sub_t = str(t.minute/10)

        if sub_t[-1] != "0": 
            sub_t_count = 0

そしてsub_t_count=0で最初にsub_tの一桁目が0になった時にpublishします。

        if (sub_t[-1] == "0") and sub_t_count == 0:
            # IoTcoreへpublish
            count_data['Timestamp'] = int(time.time())
            count_data['detect_count'] = detect_count
            self.client.publish(self.topic_count, json.dumps(count_data, default=self.json_serial), qos=1) 

このままでは毎10分の間はずっとpubし続けるので、一度pubしたらsub_t_countフラグに1を立てて2回以降はpublishさせません。

sub_t_countフラグ以外にpublishした時のみTrueを、それ以外はFalseを返します。

            sub_t_count = 1
            return True, sub_t_count
            
        if (t.minute == 0 or sub_t[-1] == 0) and sub_t_count == 1: 
            pass
            
        return False, sub_t_count

もう一点、publishするjsonは{'Timestamp': int(time.time()), 'detect_count':detect_count}となりtimestampが含まれるためこれを文字列に変換する必要があります。

そこでisoformat()を使います。

    def json_serial(self, para):
        return para.isoformat()

awsMQTTconnect.pyを全てつなげます

awsMQTTconnect.py
import json
import ssl
import time
from datetime import datetime 
import subprocess
import parameters as para



class Com:
    def __init__(self):
        self.client = para.client
        self.cacert = para.CACERT
        self.clientCert = para.CLIENTCERT
        self.clientKey = para.CLIENTKEY
        self.host = para.HOST
        self.port = para.PORT
    

    #Callback function when mqtt connection is successful
    def on_connect(self, client, userdata, flags, respons_code):
        #If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
        if respons_code != 0:
            print("respons_code:", respons_code, " flags:", flags)
            time.sleep(90)
            subprocess.call(["sudo","reboot"])
        print('Connected')


    #Function to determine the establishment of wifi connection
    def get_ssid(self):
        cmd = 'iwconfig wlan0|grep ESSID'
        r = subprocess.run(cmd, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE)\
            .stdout.decode().rstrip()
        idx = r.find('ESSID:')
        #If the connection cannot be established, reboot after 90 seconds of waiting time for terminal access
        if r[idx + 7:-1] == "ff/an":
            print("ESSID:", r[idx + 7:-1])
            time.sleep(90)
            subprocess.call(["sudo","reboot"])


    #Function that launches an MQTT client and creates an object instance
    def aws_connect(self):
        try:
            # certifications
            self.client.tls_set(
                self.cacert,
                certfile=self.clientCert,
                keyfile=self.clientKey,
                tls_version=ssl.PROTOCOL_TLSv1_2)
            self.client.tls_insecure_set(True)

            # callback
            self.client.on_connect = self.on_connect
            #client.on_disconnect = on_disconnect

            # port, keepalive
            self.client.connect(self.host, self.port, keepalive=60)

            self.client.loop_start()

        except KeyboardInterrupt:
            time.sleep(90)
            subprocess.call(["sudo","reboot"])


class Pub:
    def __init__(self):
        self.client = para.client
        self.topic_count = para.TOPIC_DETECT_COUNT


    #Function that dispenses motion sensor data to the cloud at 0 seconds per minute
    def publish_motion_count(self, sub_t_count, detect_count): 

        count_data = {} #KeyValue to publish
        t = datetime.now() 
        sub_t = str(t.minute/10)

        if sub_t[-1] != "0": 
            sub_t_count = 0

        if (sub_t[-1] == "0") and sub_t_count == 0:
            # IoTcoreへpublish
            count_data['Timestamp'] = int(time.time())
            count_data['detect_count'] = detect_count
            self.client.publish(self.topic_count, json.dumps(count_data, default=self.json_serial), qos=1) 

            sub_t_count = 1
            return True, sub_t_count
            
        if (t.minute == 0 or sub_t[-1] == 0) and sub_t_count == 1: 
            pass
            
        return False, sub_t_count


    def json_serial(self, para):
        return para.isoformat()

main.py

最後にコンソールから実行するmain.pyです。

実行することでmqtts確立、連続ループの中でセンサー検知、積算、10分毎にpublishを繰り返します。

これまでに説明した自作パッケージを含むライブラリーを読み込みます。classはインスタンス化します。

import time
import sys
from awsMQTTconnect import Com, Pub
from sensing import Sensor
from counter import Count

sensor = Sensor()
com = Com()
pub = Pub()
count = Count()

awsMQTTconnect.pyがpubした時だけTrueを返してくるので、pubの直後にセンサー検知回数を積算しているdetect_countをloop()の中でリセットします。

ループは1秒毎で回します。

def loop():
    sub_t_count = 0
    detect_count = 0

    while True:
        sig_detect = sensor.detect_counter()
        detect_count = count.motion_count(sig_detect, detect_count)
        bool, sub_t_count = pub.publish_motion_count(sub_t_count, detect_count)
        if bool == True: 
            detect_count = 0
        
        time.sleep(1)

main.pyの中のmain()です。

wifi接続に失敗したら再起動させるメソッドはmain()の中に仕込みます。

90秒のtime.sleep()は起動時のwifi接続とmqtts接続の確立を待つためのものです。
Pi3B+でプロトタイプしていた時に余裕をもって時間設定していましたが、30秒くらいまで短縮してもエラーにはならないと思います。

if __name__ == '__main__':
    try:
        time.sleep(90)

        #wifi connection confirmation and MQTT connection
        com.get_ssid()
        com.aws_connect()

        #Main loop execution
        loop()

    except KeyboardInterrupt:
        sys.exit()

main.pyを全体つなげます

main.py
import time
import sys
from awsMQTTconnect import Com, Pub
from sensing import Sensor
from counter import Count


sensor = Sensor()
com = Com()
pub = Pub()
count = Count()

def loop():
    sub_t_count = 0
    detect_count = 0

    while True:
        sig_detect = sensor.detect_counter()
        detect_count = count.motion_count(sig_detect, detect_count)
        bool, sub_t_count = pub.publish_motion_count(sub_t_count, detect_count)
        if bool == True: 
            detect_count = 0
        
        time.sleep(1)


if __name__ == '__main__':
    try:
        time.sleep(90)

        #wifi connection confirmation and MQTT connection
        com.get_ssid()
        com.aws_connect()

        #Main loop execution
        loop()

    except KeyboardInterrupt:
        sys.exit()

動作確認

ラズパイの/home/userから実行します。

cd ..
python main.py

psコマンド等でpythonのプロセスが落ちずに実行されていることを確認します。

マネジメントコンソールからsubscribeを確認

CloudWatch Dashboardを確認

次回

(6) ラズパイでイベント録画してS3にuploadに続きます

追記

図追加 2022.09.03 

sensingとpublishの連続動作のシーケンス図を追加しました

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?