Help us understand the problem. What is going on with this article?

SORACOM AWS LTE-M ButtonでRaspberry Piを操作してみた

More than 1 year has passed since last update.

先日、JAWS-UG名古屋 SORACOM-UG東海 共催 LTE-M Button ハンズオンに参加させてもらい、安かったのでLTE-M Buttonを買いました。

ハンズオンを試していくうちに、なんとなく実用的だと思ったので、自分でもなにか作ろうと思い、試しにRaspberry Piや手持ちのパーツを組み合わせてLTE-Mボタンで操作できるデバイスを作ってみました。

使うもの

Raspberry Pi3 Model B+
アノードコモンフルカラーLED
抵抗(あるもので用意しました。200Ωぐらいあればいいです)
ジャンパーケーブル
ブレッドボード

AWS側の構築手順

Raspberry Pi用のポリシーの作成

こちら を参考にしてRaspberry Piを接続するための設定を行い、証明書などをダウンロードしていきます。証明書は再発行出来ないのでなくさないようにします。

Lambda

ランタイムをpython3.6で設定し、実行ロールではAWS Iotが使えるようにしておきます。
そして、lambda_function.pyを以下の内容に変更します。

lambda_function.py
import json
import os
import urllib.request
import logging
import boto3

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

iot = boto3.client('iot-data', region_name="us-west-2")

def lambda_handler(event, context):

    logger.info('request event: \n' + json.dumps(event))

    click_type = event['deviceEvent']['buttonClicked'].get('clickType')
    logger.info("click_type: " + click_type)

    topic = "raspberry_pi/cmd"
    payload = event['placementInfo']['attributes'][click_type]
    logger.info(payload)

    try:
        iot.publish(
            topic=topic,
            qos=0,
            payload=json.dumps(payload, ensure_ascii=False)
        )

        return "Succeed"

    except Exception as e:
        logger.info(e)
        return "Falid"

実際にボタンを動かす前にテストをするときはテストイベントの設定で以下のJSONを書き込みます。

{
  "deviceInfo": {
    "deviceId": "G030PMXXXXXXXXXX",
    "type": "button",
    "remainingLife": 94.2,
    "attributes": {
      "projectRegion": "us-west-2",
      "projectName": "One-click-Raspi",
      "placementName": "button1",
      "deviceTemplateName": "RequestCmd"
    }
  },
  "deviceEvent": {
    "buttonClicked": {
      "clickType": "SINGLE",
      "reportedTime": "2019-01-24T18:49:36.813Z"
    }
  },
  "placementInfo": {
    "projectName": "One-click-Raspi",
    "placementName": "button1",
    "attributes": {
      "SINGLE": "{\"cmd\":\"green\"}",
      "DOUBLE": "{\"cmd\":\"blue\"}",
      "LONG": "{\"cmd\":\"red\"}"
    },
    "devices": {
      "RequestCmd": "G030PMXXXXXXXXXX"
    }
  }
}

この時点ではまだRaspberry Piの設定を行っていませんが、エラーを出すことなくIotプラットフォームに送信することができたら、成功です。(関数ではSucceedと出力されます。)もし、Faildと出た場合は、ロールにIoT Full AccessのPolicyがついていない可能性があります。もう一度lambdaで設定したポリシーを確認してみてください。

ボタンのデバイス登録

1.AWS Iot 1-clickを開き、オンボードから、デバイスの登録を行います。
スクリーンショット 2019-01-27 18.49.10.png

2.新たにプロジェクトを作成します。プロジェクト名はOne-click-Raspiとしておきます。
3.「プロジェクトのプレイスメントのテンプレートの定義」ではデバイステンプレートの設定では、先程登録したデバイスを選択し、ボタンを押したときのアクションはlambda関数を選択し、実行するlambda関数を選択します。デバイステンプレート名は何でも良いです。

スクリーンショット 2019-01-27 19.01.57.png

4.プレイスメントの属性を以下のように登録させます

名前 デフォルト値
SINGLE {"cmd":"green"}
DOUBLE {"cmd":"blue"}
LONG {"cmd":"red"}

名前の欄にはボタンが行った操作の種類が書かれており、lambdaで受け取るrequestの中に記載されるものです。(先程のテスト用に用意したJSONがlambda関数に送信され実行されるイメージです。)

Raspberry Pi側の構築手順

配線図

配線は下図のように簡単に配線しました。黄色のジャンパーケーブル以外は接続先のピンの色に対応させています。
Raspi_led.png

証明書のダウンロード

AWS Iotへの接続設定を行ったときにダウンロードした証明書は certディレクトリを作成しその中にファイルを保存していきます。分かりやすくするためにcertディレクトリはLEDを点灯させるプログラムと同じディレクトリに配置します。

プログラム

AWSIoTのPythonSDKをインストールします。

sudo pip install AWSIoTPythonSDK

以下のプログラムを用意します。

3led.py
import pigpio
from time import sleep
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import json

green_pin = 9
red_pin  = 10
blue_pin = 11

CLIENT_ID = "raspi_control"
ENDPOINT = "YOUR_ENDPOINT"
PORT = 8883

ROOT_CA = "./cert/rootCA.pem"
PRIVATE_KEY = "./cert/YOUR_KEY.pem.key"
CERTIFICATE = "./cert/YOUR_CATE.pem.crt"

TOPIC = "raspberry_pi/cmd"

pi = pigpio.pi()
pi.set_mode(green_pin, pigpio.OUTPUT)
pi.set_mode(red_pin, pigpio.OUTPUT)
pi.set_mode(blue_pin, pigpio.OUTPUT)

pi.write(green_pin, 1)
pi.write(red_pin, 1)
pi.write(blue_pin, 1)

def main():
    # https://s3.amazonaws.com/aws-iot-device-sdk-python-docs/sphinx/html/index.html
    client = AWSIoTMQTTClient(CLIENT_ID)
    client.configureEndpoint(ENDPOINT, PORT)
    client.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)

    client.configureAutoReconnectBackoffTime(1, 32, 20)
    client.configureOfflinePublishQueueing(-1)
    client.configureDrainingFrequency(2)
    client.configureConnectDisconnectTimeout(10)
    client.configureMQTTOperationTimeout(5)

    client.connect()
    client.subscribe(TOPIC, 1, subscribe_callback)

    while True:
        sleep(5)

def subscribe_callback(client, userdata, message):
    d = json.loads(message.payload.decode())
    d = json.loads(d)
    print("Received a new message: ")
    print(d)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")
    color = d.get("cmd")
    led_chika(color)

def led_chika(color):
    if color == "blue":
        pi.write(blue_pin, 0)
        sleep(2)
        pi.write(blue_pin, 1)
    elif color == "green":
        pi.write(green_pin, 0)
        sleep(2)
        pi.write(green_pin, 1)
    elif color == "red":
        pi.write(red_pin, 0)
        sleep(2)
        pi.write(red_pin, 1)
    else:
        print("No Color")

if __name__ == "__main__":
    main()

動作チェック

実際にボタンを押して、ボタンの押し方で光るLEDの色が変わったら、成功です。

参考

【AWS Lambdaの基本コードその4】 AWS IoTにメッセージをPublish
Raspi(デバイス)とAWSIoT(クラウド)間でMQTTを使ってメッセージを送受信する
Soracom LTE-M ButtonでAlexaに通知させてみた
Raspberry PiでAWS IoT Coreと接続し、GPIO制御をしてみた
AWS IoT への Raspberry Pi の接続

K_M95
IBM Champion 2020 / LINE API Expert 2020
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした