前回記事(AWS GreengrassでLチカ:クラウドとエッジについて考える(1))の続きになります.
取り組む課題や環境,クラウドタイプのやり方についてはそちらを参照ください.
2. AWS Greengrassにてネットワーク型信号機を実現する
本記事で取り組む課題は"ネットワーク型信号機の設計"ですが,クラウドの考え方を素直に適用した場合,いろいろと問題が発生しそうなことについては前回記事にて述べました.今回はそれらの問題を解消すべく,エッジの考え方を導入したネットワーク型信号機をAWS Greengrassを使って実現します.
今回もできるだけ単純なモデルとするため,構成は以下としました.
大きな違いはメッセージの発行元です.
- クラウドタイプ : インターネットをはさんだサーバー上で稼働するLambda関数がメッセージ発行元
- エッジタイプ : RasPi3自身(正確にはRasPi3上で動作するLambda関数)がメッセージ発行元.
構成を見る限り自立して稼働しそうな気がします...なんだかエッジぽいです.
早速作成してみます.
信号機側にデプロイされるLambda関数
まずはこれを作成することにします.作成はAWS Lambdaのコンソール上で実施しました.AWS Lambdaのコンソールから [関数の作成]選択,"Greengrass"で検索すると現状では2件ヒットします.うちPython版の"greengrass-hello-world"をテンプレートにして作業を進めます.
今回は,テンプレートを少しだけ変更して以下としました.
import greengrasssdk
import platform
from threading import Timer
import time
client = greengrasssdk.client('iot-data')
my_platform = platform.platform()
def greengrass_hello_world_run():
# (追加部分)緑点灯のためのメッセージ発行,後5秒待ち
client.publish(topic='io/led', payload='{"red":0,"green":1}')
time.sleep(5)
# (追加部分)赤点灯のためのメッセージ発行,後5秒待ち
client.publish(topic='io/led', payload='{"red":1,"green":0}')
# 関数自身を呼ぶことで無限ループ
Timer(5, greengrass_hello_world_run).start()
# 実行開始
greengrass_hello_world_run()
def function_handler(event, context):
return
※本来はデバイスシャドウを利用すべきかもしれませんが,簡素化のために直にメッセージを発行する形にしています
AWS Greengrass を通じてこのLambda関数がエッジ側(RasPi3)にデプロイされると,(無限ループのおかげで)動き続けることになります.エッジ側で動くLambda関数は通常のLambda関数と異なり,動作時間上限がありません.デプロイの過程やセキュリティの担保はGreengrassが面倒をみてくれるので,いろいろ考えなくてすみます.
信号機側S/W
前回記事にて作成したS/Wを改良する形で進めます.AWS Greengrassのサンプルを参考にしました.
主な変更点はMQTTクライアントの接続先を,サーバー側ではなく,RasPi3上で動作するGreengrassコア相手にする事です.接続先情報はサーバー側に問い合わせる機構となっているようです.
# -*- coding: utf-8 -*-
# AWS Greengrass を取り扱うためのモジュール
from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.core.protocol.connection.cores import ProgressiveBackOffCore
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryInvalidRequestException
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import RPi.GPIO as GPIO
import time
import json
import sys
import uuid
import os
# gpio pins
LED_R = 14
LED_G = 15
LEVEL = [GPIO.LOW, GPIO.HIGH]
# host
host = "xxxxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com"
port = 8883
# certs
rootca_path = "./certs/root-ca.pem"
certificate_path = "./certs/thing-certificate.pem.crt"
privatekey_path = "./certs/thing-private.pem.key"
thing_name = "RasPi3"
def onMessage(message): # メッセージを受け取った際に呼ばれる
print(message.payload)
payload = json.loads(message.payload.decode('utf-8'))
print(" > Message : {}".format(payload))
print("--------------\n")
try:
red_lv = int(payload['red'])
green_lv = int(payload['green'])
GPIO.output(LED_R, LEVEL[red_lv])
GPIO.output(LED_G, LEVEL[green_lv])
except:
print(" ! Invalid message payload")
def getGgCoreInfo(): # エッジ上Greengrass Coreの情報取得
GROUP_CA_PATH = "./groupCA/"
# Greengrass coreの検索
discovery_info_provider = DiscoveryInfoProvider()
discovery_info_provider.configureEndpoint(host)
discovery_info_provider.configureCredentials(rootca_path, certificate_path, privatekey_path)
discovery_info_provider.configureTimeout(10) # 10 sec
discovery_info = discovery_info_provider.discover(thing_name)
ca_list = discovery_info.getAllCas()
core_list = discovery_info.getAllCores()
# 見つかった1つめのCoreの情報を取得
group_id, ca = ca_list[0]
core_info = core_list[0]
print("Discovered GGC: %s from Group: %s" % (core_info.coreThingArn, group_id))
print("Now we persist the connectivity/identity information...")
group_ca = GROUP_CA_PATH + group_id + "_CA_" + str(uuid.uuid4()) + ".crt"
# GroupCAの取得(...よく理解していない)
if not os.path.exists(GROUP_CA_PATH):
os.makedirs(GROUP_CA_PATH)
group_ca_file = open(group_ca, "w")
group_ca_file.write(ca)
group_ca_file.close()
return (group_ca,core_info)
if __name__ == "__main__":
# gpio setup
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_R, GPIO.OUT)
GPIO.setup(LED_G, GPIO.OUT)
# mqtt setup
mqtt_client = AWSIoTMQTTClient(thing_name)
#mqtt_client.configureEndpoint(host,port)
#mqtt_client.configureCredentials(rootca_path, privatekey_path, certificate_path)
mqtt_client.configureOfflinePublishQueueing(0)
mqtt_client.configureDrainingFrequency(2)
mqtt_client.configureConnectDisconnectTimeout(120)
mqtt_client.configureMQTTOperationTimeout(5)
# Register an onMessage callback
mqtt_client.onMessage = onMessage
# 取得した接続先に対して順に接続を試みる
connected = False
(rootca_path, core_info) = getGgCoreInfo()
for connectivity_info in core_info.connectivityInfoList:
current_host = connectivity_info.host
current_port = connectivity_info.port
print(" * Trying to connect to core at %s:%d" % (current_host, current_port))
mqtt_client.configureEndpoint(current_host, current_port)
mqtt_client.configureCredentials(rootca_path, privatekey_path, certificate_path)
try:
mqtt_client.connect()
connected = True
break
except BaseException as e:
print(" * Error in connect!")
print(" * Type: %s" % str(type(e)))
print(" * Error message: %s" % e.message)
if not connected:
print(" * Cannot connect to core %s. Exiting..." % core_info.coreThingArn)
sys.exit(-2)
mqtt_client.connect()
time.sleep(2)
print(" * connection success")
mqtt_client.subscribe("io/led", 1, None)
print(" * start subscribing...")
while True:
time.sleep(3)
公式のサンプルではGreengrass Coreの検索自体をリトライする構成になっています.これは "RasPi3側にLambda関数をデプロイした後でしか接続先を発見できない"ためです.今回はコード簡略化のため"Lambda関数はすでにデプロイされている前提"で進めます.
信号機側H/W
前回記事と同じなので省略します.
動かしてみる
(本来の動きとは逆になりますが)まずはRasPi3側にLambda関数をデプロイします.注意点としてデプロイ前に
- RasPi3上でGreengrass Coreを起動しておく
- Lambda関数を作成後,[アクション]→[新しいバージョンを発行]を実行しておく
- サブスクリプション(メッセージが配送される経路)を適切に設定しておく
が必要です.Greengrass Coreは以下で起動できます.起動に失敗する場合はGreengrass Coreの設定が間違っている可能性が高いです.
pi@raspi3-nobu_e753:/greengrass/ggc/core $ pwd
/greengrass/ggc/core
pi@raspi3-nobu_e753:/greengrass/ggc/core $ sudo ./greengrassd start
Setting up greengrass daemon
Validating execution environment
Found cgroup subsystem: cpu
...
Starting greengrass daemon
Greengrass successfully started with PID: 2856
サブスクリプションの経路は以下としました(自信なし).ターゲットに"IoT Cloud"を設けておくことで,AWS IoTのコンソール側([テスト]→[トピックへサブスクライブする])でも発行メッセージを確認することができます.
設定が完了したらデプロイを実行します.あらかじめ[Lambda]タブで指定された関数がエッジ側へロードされます.ランプが緑に変わったら完了です.
次にRasPi3側のプログラムを動かします.
pi@raspi3-nobu_e753:~/Workspace/aws $ ls
certs/ led_blink_awsgg.py led_blink_awsiot.py
pi@raspi3-nobu_e753:~/Workspace/aws $ python led_blink_awsgg.py
Discovered GGC: arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:thing/GgcGroup0_Core from Group: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Now we persist the connectivity/identity information...
* Trying to connect to core at xxxx::xxx:xxxx:xxxx:xxxx:8883 # 接続先1トライ
* Error in connect!
* Type: <class 'socket.error'>
* Error message:
* Trying to connect to core at 127.0.0.1:8883 # 接続先2トライ
* connection success
* start subscribing... # エッジ側のLambda関数が発行するメッセージを受け,以下が出力される
{"red":1,"green":0}
> Message : {u'green': 0, u'red': 1}
--------------
{"red":0,"green":1}
> Message : {u'green': 1, u'red': 0}
--------------
{"red":1,"green":0}
> Message : {u'green': 0, u'red': 1}
--------------
この状態で,RasPi3に接続されたLEDの色が5秒ごとに赤→緑→赤...と変わるのが確認できると思います.
最後に
エッジ側だけの情報で制御がなされていることを確認するためRasPi3のネットワーク接続を切断してみます(RasPi3のWifiを切断,ルーターの電源をおとす..などどのような方法でもよいです).その後もLEDの色が切り替わり続けるはずです.
このエッジタイプ信号機であれば,クラウドタイプ信号機の問題点を解決できそうですね.
わからなかった箇所
エッジ側へデプロイするLambda関数内で利用可能なコード
例えば,時刻の取得 datetime.now()
やプラットフォーム情報の取得
platform.platform()
は実行可能でしたが,ディレクトリ構成の取得 os.listdir()
は実行できませんでした.何が実行できて何が実行できるのでしょう...
まとめ
AWS Greengrassを使い,信号機に見立てたLEDの制御を行うシステムを2通り作成しました.これを通じて,クラウドとエッジの違いを感覚的につかむことができました.