Help us understand the problem. What is going on with this article?

AWS GreengrassでLチカ:クラウドとエッジについて考える(2)

More than 1 year has passed since last update.

前回記事(AWS GreengrassでLチカ:クラウドとエッジについて考える(1))の続きになります.
取り組む課題や環境,クラウドタイプのやり方についてはそちらを参照ください.

2. AWS Greengrassにてネットワーク型信号機を実現する

本記事で取り組む課題は"ネットワーク型信号機の設計"ですが,クラウドの考え方を素直に適用した場合,いろいろと問題が発生しそうなことについては前回記事にて述べました.今回はそれらの問題を解消すべく,エッジの考え方を導入したネットワーク型信号機をAWS Greengrassを使って実現します.

今回もできるだけ単純なモデルとするため,構成は以下としました.

p2.png

大きな違いはメッセージの発行元です.

  • クラウドタイプ : インターネットをはさんだサーバー上で稼働するLambda関数がメッセージ発行元
  • エッジタイプ : RasPi3自身(正確にはRasPi3上で動作するLambda関数)がメッセージ発行元

構成を見る限り自立して稼働しそうな気がします...なんだかエッジぽいです.
早速作成してみます.

信号機側にデプロイされるLambda関数

まずはこれを作成することにします.作成はAWS Lambdaのコンソール上で実施しました.AWS Lambdaのコンソールから [関数の作成]選択,"Greengrass"で検索すると現状では2件ヒットします.うちPython版の"greengrass-hello-world"をテンプレートにして作業を進めます.
今回は,テンプレートを少しだけ変更して以下としました.

import greengrasssdk
import platform
from threading import Timer
import time

client = greengrasssdk.client('iot-data')

my_platform = platform.platform()

def greengrass_hello_world_run():
    # (追加部分)緑点灯のためのメッセージ発行,後5秒待ち
    client.publish(topic='io/led', payload='{"red":0,"green":1}')
    time.sleep(5)
    # (追加部分)赤点灯のためのメッセージ発行,後5秒待ち
    client.publish(topic='io/led', payload='{"red":1,"green":0}')
    # 関数自身を呼ぶことで無限ループ
    Timer(5, greengrass_hello_world_run).start()

# 実行開始
greengrass_hello_world_run()

def function_handler(event, context):
    return

※本来はデバイスシャドウを利用すべきかもしれませんが,簡素化のために直にメッセージを発行する形にしています

AWS Greengrass を通じてこのLambda関数がエッジ側(RasPi3)にデプロイされると,(無限ループのおかげで)動き続けることになります.エッジ側で動くLambda関数は通常のLambda関数と異なり,動作時間上限がありません.デプロイの過程やセキュリティの担保はGreengrassが面倒をみてくれるので,いろいろ考えなくてすみます.

信号機側S/W

前回記事にて作成したS/Wを改良する形で進めます.AWS Greengrassのサンプルを参考にしました.

主な変更点はMQTTクライアントの接続先を,サーバー側ではなく,RasPi3上で動作するGreengrassコア相手にする事です.接続先情報はサーバー側に問い合わせる機構となっているようです.

# -*- coding: utf-8 -*-

# AWS Greengrass を取り扱うためのモジュール
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import RPi.GPIO as GPIO
import time
import json
import sys
import uuid
import os

# gpio pins
LED_R = 14
LED_G = 15
LEVEL = [GPIO.LOW, GPIO.HIGH]

# host
host = "xxxxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
port = 8883

# certs
rootca_path = "./certs/root-ca.pem"
certificate_path = "./certs/thing-certificate.pem.crt"
privatekey_path = "./certs/thing-private.pem.key"
thing_name = "RasPi3"

def onMessage(message):    # メッセージを受け取った際に呼ばれる
    print(message.payload)
    payload = json.loads(message.payload.decode('utf-8'))
    print(" > Message : {}".format(payload))
    print("--------------\n")   

    try:
        red_lv = int(payload['red'])    
        green_lv = int(payload['green'])
        GPIO.output(LED_R, LEVEL[red_lv])
        GPIO.output(LED_G, LEVEL[green_lv])
    except:
        print(" ! Invalid message payload")     

def getGgCoreInfo():    # エッジ上Greengrass Coreの情報取得
    GROUP_CA_PATH = "./groupCA/"

    # Greengrass coreの検索
    discovery_info_provider = DiscoveryInfoProvider()
    discovery_info_provider.configureEndpoint(host)
    discovery_info_provider.configureCredentials(rootca_path, certificate_path, privatekey_path)
    discovery_info_provider.configureTimeout(10)  # 10 sec  

    discovery_info = discovery_info_provider.discover(thing_name)
    ca_list = discovery_info.getAllCas()
    core_list = discovery_info.getAllCores()

    # 見つかった1つめのCoreの情報を取得
    group_id, ca = ca_list[0]
    core_info = core_list[0]
    print("Discovered GGC: %s from Group: %s" % (core_info.coreThingArn, group_id))

    print("Now we persist the connectivity/identity information...")
    group_ca = GROUP_CA_PATH + group_id + "_CA_" + str(uuid.uuid4()) + ".crt"
        # GroupCAの取得(...よく理解していない)
    if not os.path.exists(GROUP_CA_PATH):
        os.makedirs(GROUP_CA_PATH)
    group_ca_file = open(group_ca, "w")
    group_ca_file.write(ca)
    group_ca_file.close()

    return (group_ca,core_info)

if __name__ == "__main__":

    # gpio setup
    GPIO.setwarnings(False) 
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(LED_R, GPIO.OUT)
    GPIO.setup(LED_G, GPIO.OUT)

    # mqtt setup
    mqtt_client = AWSIoTMQTTClient(thing_name)
    #mqtt_client.configureEndpoint(host,port)
    #mqtt_client.configureCredentials(rootca_path, privatekey_path, certificate_path)

    mqtt_client.configureOfflinePublishQueueing(0)
    mqtt_client.configureDrainingFrequency(2)
    mqtt_client.configureConnectDisconnectTimeout(120)
    mqtt_client.configureMQTTOperationTimeout(5)

    # Register an onMessage callback
    mqtt_client.onMessage = onMessage

    # 取得した接続先に対して順に接続を試みる
    connected = False
    (rootca_path, core_info) = getGgCoreInfo()
    for connectivity_info in core_info.connectivityInfoList:
        current_host = connectivity_info.host
        current_port = connectivity_info.port
        print(" * Trying to connect to core at %s:%d" % (current_host, current_port))
        mqtt_client.configureEndpoint(current_host, current_port)
        mqtt_client.configureCredentials(rootca_path, privatekey_path, certificate_path)
        try:
            mqtt_client.connect()
            connected = True
            break
        except BaseException as e:
            print(" * Error in connect!")
            print(" * Type: %s" % str(type(e)))
            print(" * Error message: %s" % e.message)

    if not connected:
        print(" * Cannot connect to core %s. Exiting..." % core_info.coreThingArn)
        sys.exit(-2)    

    mqtt_client.connect()
    time.sleep(2)
    print(" * connection success")  
    mqtt_client.subscribe("io/led", 1, None)
    print(" * start subscribing...")

    while True:
        time.sleep(3)

公式のサンプルではGreengrass Coreの検索自体をリトライする構成になっています.これは "RasPi3側にLambda関数をデプロイした後でしか接続先を発見できない"ためです.今回はコード簡略化のため"Lambda関数はすでにデプロイされている前提"で進めます.

信号機側H/W

前回記事と同じなので省略します.

動かしてみる

(本来の動きとは逆になりますが)まずはRasPi3側にLambda関数をデプロイします.注意点としてデプロイ前に

  • RasPi3上でGreengrass Coreを起動しておく
  • Lambda関数を作成後,[アクション]→[新しいバージョンを発行]を実行しておく
  • サブスクリプション(メッセージが配送される経路)を適切に設定しておく

が必要です.Greengrass Coreは以下で起動できます.起動に失敗する場合はGreengrass Coreの設定が間違っている可能性が高いです.

pi@raspi3-nobu_e753:/greengrass/ggc/core $ pwd
/greengrass/ggc/core
pi@raspi3-nobu_e753:/greengrass/ggc/core $ sudo ./greengrassd start
Setting up greengrass daemon
Validating execution environment
Found cgroup subsystem: cpu
...
Starting greengrass daemon
Greengrass successfully started with PID: 2856

サブスクリプションの経路は以下としました(自信なし).ターゲットに"IoT Cloud"を設けておくことで,AWS IoTのコンソール側([テスト]→[トピックへサブスクライブする])でも発行メッセージを確認することができます.

aws03s.png

設定が完了したらデプロイを実行します.あらかじめ[Lambda]タブで指定された関数がエッジ側へロードされます.ランプが緑に変わったら完了です.

aws04s.png

次にRasPi3側のプログラムを動かします.

pi@raspi3-nobu_e753:~/Workspace/aws $ ls
certs/  led_blink_awsgg.py  led_blink_awsiot.py
pi@raspi3-nobu_e753:~/Workspace/aws $ python led_blink_awsgg.py 
Discovered GGC: arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thing/GgcGroup0_Core from Group: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Now we persist the connectivity/identity information...
 * Trying to connect to core at xxxx::xxx:xxxx:xxxx:xxxx:8883 # 接続先1トライ
 * Error in connect!
 * Type: <class 'socket.error'>
 * Error message: 
 * Trying to connect to core at 127.0.0.1:8883 # 接続先2トライ
 * connection success
 * start subscribing...    # エッジ側のLambda関数が発行するメッセージを受け,以下が出力される
{"red":1,"green":0}
 > Message : {u'green': 0, u'red': 1}
--------------

{"red":0,"green":1}
 > Message : {u'green': 1, u'red': 0}
--------------

{"red":1,"green":0}
 > Message : {u'green': 0, u'red': 1}
--------------

この状態で,RasPi3に接続されたLEDの色が5秒ごとに赤→緑→赤...と変わるのが確認できると思います.

最後に

エッジ側だけの情報で制御がなされていることを確認するためRasPi3のネットワーク接続を切断してみます(RasPi3のWifiを切断,ルーターの電源をおとす..などどのような方法でもよいです).その後もLEDの色が切り替わり続けるはずです.

このエッジタイプ信号機であれば,クラウドタイプ信号機の問題点を解決できそうですね.

わからなかった箇所

エッジ側へデプロイするLambda関数内で利用可能なコード
例えば,時刻の取得 datetime.now() やプラットフォーム情報の取得
platform.platform() は実行可能でしたが,ディレクトリ構成の取得 os.listdir() は実行できませんでした.何が実行できて何が実行できるのでしょう...

まとめ

AWS Greengrassを使い,信号機に見立てたLEDの制御を行うシステムを2通り作成しました.これを通じて,クラウドとエッジの違いを感覚的につかむことができました.

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした