先日、JAWS-UG名古屋 SORACOM-UG東海 共催 LTE-M Button ハンズオンに参加させてもらい、安かったのでLTE-M Buttonを買いました。
#jawsug #soracomug #あのボタン pic.twitter.com/t8M5K2FQ0a
— K.Miura (@k_miura_io) 2019年1月19日
ハンズオンを試していくうちに、なんとなく実用的だと思ったので、自分でもなにか作ろうと思い、試しにRaspberry Piや手持ちのパーツを組み合わせてLTE-Mボタンで操作できるデバイスを作ってみました。
#使うもの
Raspberry Pi3 Model B+
アノードコモンフルカラーLED
抵抗(あるもので用意しました。200Ωぐらいあればいいです)
ジャンパーケーブル
ブレッドボード
#AWS側の構築手順
##Raspberry Pi用のポリシーの作成
こちら を参考にしてRaspberry Piを接続するための設定を行い、証明書などをダウンロードしていきます。証明書は再発行出来ないのでなくさないようにします。
##Lambda
ランタイムをpython3.6で設定し、実行ロールではAWS Iotが使えるようにしておきます。
そして、lambda_function.pyを以下の内容に変更します。
import json
import os
import urllib.request
import logging
import boto3
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
iot = boto3.client('iot-data', region_name="us-west-2")
def lambda_handler(event, context):
logger.info('request event: \n' + json.dumps(event))
click_type = event['deviceEvent']['buttonClicked'].get('clickType')
logger.info("click_type: " + click_type)
topic = "raspberry_pi/cmd"
payload = event['placementInfo']['attributes'][click_type]
logger.info(payload)
try:
iot.publish(
topic=topic,
qos=0,
payload=json.dumps(payload, ensure_ascii=False)
)
return "Succeed"
except Exception as e:
logger.info(e)
return "Falid"
実際にボタンを動かす前にテストをするときはテストイベントの設定で以下のJSONを書き込みます。
{
"deviceInfo": {
"deviceId": "G030PMXXXXXXXXXX",
"type": "button",
"remainingLife": 94.2,
"attributes": {
"projectRegion": "us-west-2",
"projectName": "One-click-Raspi",
"placementName": "button1",
"deviceTemplateName": "RequestCmd"
}
},
"deviceEvent": {
"buttonClicked": {
"clickType": "SINGLE",
"reportedTime": "2019-01-24T18:49:36.813Z"
}
},
"placementInfo": {
"projectName": "One-click-Raspi",
"placementName": "button1",
"attributes": {
"SINGLE": "{\"cmd\":\"green\"}",
"DOUBLE": "{\"cmd\":\"blue\"}",
"LONG": "{\"cmd\":\"red\"}"
},
"devices": {
"RequestCmd": "G030PMXXXXXXXXXX"
}
}
}
この時点ではまだRaspberry Piの設定を行っていませんが、エラーを出すことなくIotプラットフォームに送信することができたら、成功です。(関数ではSucceedと出力されます。)もし、Faildと出た場合は、ロールにIoT Full AccessのPolicyがついていない可能性があります。もう一度lambdaで設定したポリシーを確認してみてください。
##ボタンのデバイス登録
1.AWS Iot 1-clickを開き、オンボードから、デバイスの登録を行います。
2.新たにプロジェクトを作成します。プロジェクト名はOne-click-Raspiとしておきます。
3.「プロジェクトのプレイスメントのテンプレートの定義」ではデバイステンプレートの設定では、先程登録したデバイスを選択し、ボタンを押したときのアクションはlambda関数を選択し、実行するlambda関数を選択します。デバイステンプレート名は何でも良いです。
4.プレイスメントの属性を以下のように登録させます
名前 | デフォルト値 |
---|---|
SINGLE | {"cmd":"green"} |
DOUBLE | {"cmd":"blue"} |
LONG | {"cmd":"red"} |
名前の欄にはボタンが行った操作の種類が書かれており、lambdaで受け取るrequestの中に記載されるものです。(先程のテスト用に用意したJSONがlambda関数に送信され実行されるイメージです。)
#Raspberry Pi側の構築手順
##配線図
配線は下図のように簡単に配線しました。黄色のジャンパーケーブル以外は接続先のピンの色に対応させています。
##証明書のダウンロード
AWS Iotへの接続設定を行ったときにダウンロードした証明書は cert
ディレクトリを作成しその中にファイルを保存していきます。分かりやすくするためにcertディレクトリはLEDを点灯させるプログラムと同じディレクトリに配置します。
##プログラム
AWSIoTのPythonSDKをインストールします。
sudo pip install AWSIoTPythonSDK
以下のプログラムを用意します。
import pigpio
from time import sleep
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import json
green_pin = 9
red_pin = 10
blue_pin = 11
CLIENT_ID = "raspi_control"
ENDPOINT = "YOUR_ENDPOINT"
PORT = 8883
ROOT_CA = "./cert/rootCA.pem"
PRIVATE_KEY = "./cert/YOUR_KEY.pem.key"
CERTIFICATE = "./cert/YOUR_CATE.pem.crt"
TOPIC = "raspberry_pi/cmd"
pi = pigpio.pi()
pi.set_mode(green_pin, pigpio.OUTPUT)
pi.set_mode(red_pin, pigpio.OUTPUT)
pi.set_mode(blue_pin, pigpio.OUTPUT)
pi.write(green_pin, 1)
pi.write(red_pin, 1)
pi.write(blue_pin, 1)
def main():
# https://s3.amazonaws.com/aws-iot-device-sdk-python-docs/sphinx/html/index.html
client = AWSIoTMQTTClient(CLIENT_ID)
client.configureEndpoint(ENDPOINT, PORT)
client.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)
client.configureAutoReconnectBackoffTime(1, 32, 20)
client.configureOfflinePublishQueueing(-1)
client.configureDrainingFrequency(2)
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)
client.connect()
client.subscribe(TOPIC, 1, subscribe_callback)
while True:
sleep(5)
def subscribe_callback(client, userdata, message):
d = json.loads(message.payload.decode())
d = json.loads(d)
print("Received a new message: ")
print(d)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
color = d.get("cmd")
led_chika(color)
def led_chika(color):
if color == "blue":
pi.write(blue_pin, 0)
sleep(2)
pi.write(blue_pin, 1)
elif color == "green":
pi.write(green_pin, 0)
sleep(2)
pi.write(green_pin, 1)
elif color == "red":
pi.write(red_pin, 0)
sleep(2)
pi.write(red_pin, 1)
else:
print("No Color")
if __name__ == "__main__":
main()
#動作チェック
実際にボタンを押して、ボタンの押し方で光るLEDの色が変わったら、成功です。
#あのボタン を使ってRaspberry Piを操作できるようになった#soracom #AWSIot #ltembutton pic.twitter.com/5S0hp2YbIw
— K.Miura (@k_miura_io) 2019年1月27日
参考
【AWS Lambdaの基本コードその4】 AWS IoTにメッセージをPublish
Raspi(デバイス)とAWSIoT(クラウド)間でMQTTを使ってメッセージを送受信する
Soracom LTE-M ButtonでAlexaに通知させてみた
Raspberry PiでAWS IoT Coreと接続し、GPIO制御をしてみた
AWS IoT への Raspberry Pi の接続