みんさん、こんにちは。
前回、「クリスマスツリーに音楽に合わせて光るLEDを装飾をしてみた」でクリスマスツリーを装飾しました。その際にLEDをON/OFFボタンでコントロールできないかと考えたいた時に、先月開催されたSORACOM Technology Camp 2018
の会場で衝動買いしたSORACOM LTE-M Button
を発見。「これで、コントロールできるのでは?」と思ったのでやってみました。
アクションの仕様
仕様を決めます。ボタンには3つのアクションが使えるので、それぞれのアクションを以下のようにしました。
ボタンアクション | ツリー側のアクション |
---|---|
シングルクリック | 音楽再生/一時停止 |
ダブルクリック | LEDモード切り替え(常灯、ビジュアライザ) |
長押し | LED消灯、音楽の停止 |
構成図
全体の流れはこんな感じです。
作ってみる
では、以下のステップにそって解説していきます。
- AWS IoTの準備
- Raspberry Piの準備
- Arduinoの準備
- LTE-Mボタンから呼ばれるLambda関数を作成
- LTE-Mボタンの登録
- AWS IoT 1-Click のプロジェクトを作成
- いいや!限界だ押すね!今だッ!
AWS IoTの準備
まずは、Raspberry Pi
を AWS IoT
に登録して、MQTTのやり取りができるようにします。
以下を参考にAWS IoT
にRaspberry Pi
を登録します。
[AWS IoT Device SDK for Pythonを使ってRaspberryPiとAWS IoTをつないでみる]
(https://dev.classmethod.jp/hardware/raspberrypi/aws-iot-device-sdk-for-python-raspberrypi-aws-iot/)
証明書のダウンロード、ポリシーのアタッチ、エンドポイントの確認までできたらAWS IoT
側の準備は完了です。
Raspberry Piの準備
Raspberry Pi
では音楽ファイルの再生/停止
とAWS IoT
からのPublish
されるTopic
をSubscribe
して、Arduino
にシリアル通信でLEDのコントロールをするプログラムを用意します。
環境
- RaspberryPi 3 Model B GNU/Linux 8.0 (jessie)
- Python 3.6.7(3.7系だとライブラリのインストールがうまく行かなかったので3.6系にしました)
証明書を配置
作業フォルダにcert
ディレクトリを作成し、AWS IoT
でダウンロードした証明書を配置します。
.
└── iot_xmas_tree
└── cert
├── *********-certificate.pem.crt
├── *********-private.pem.key
└── rootCA.pemcd
ライブラリのインストール
使用するライブラリのインストールをします。
ライブラリ | 使用用途 |
---|---|
aws-iot-device-sdk-python v1.4.0 | AWS IoTとの接続 |
omxplayer-wrapper | omxplayerをPythonからコントロールするラッパーモジュール |
pyserial | Arduino間のシリアル通信で使用 |
sudo apt-get update && sudo apt-get install -y libdbus-1{,-dev}
pip install pyserial AWSIoTPythonSDK omxplayer-wrapper
プログラム
AWS IoT
接続情報を設定して、先ほど作成したcert
ディレクトリと同階層に配置します。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from logging import basicConfig, getLogger, INFO
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from omxplayer.player import OMXPlayer
from pathlib import Path
import enum
import time
import json
import serial
# AWSIoT接続情報
CLIENT_ID = 'myClientID'
ENDPOINT = '***************.iot.ap-northeast-1.amazonaws.com'
PORT = 8883
ROOT_CA = './cert/rootCA.pem'
PRIVATE_KEY = './cert/*******-private.pem.key'
CERTIFICATE = './cert/*******-certificate.pem.crt'
TOPIC = 'LTEButtonTopic/action'
plaing_music = None
music_path = None
player = None
ser = None
current_led_mode = None
class LED(enum.Enum):
ON = 'O'
VISUALIZER = 'V'
OFF = 'F'
def main():
# AWSIoTと接続
myMQTTClient = AWSIoTMQTTClient(CLIENT_ID)
myMQTTClient.configureEndpoint(ENDPOINT, PORT)
myMQTTClient.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)
myMQTTClient.configureOfflinePublishQueueing(-1)
myMQTTClient.configureDrainingFrequency(2)
myMQTTClient.configureConnectDisconnectTimeout(10)
myMQTTClient.configureMQTTOperationTimeout(5)
myMQTTClient.connect()
while True:
myMQTTClient.subscribe(TOPIC, 1, subscribe_callback)
time.sleep(1)
current_led_mode = LED.ON.value
# AWSIoTからメッセージ受診時のコールバック
def subscribe_callback(client, userdata, message):
print('\n\n--------------')
logger.info('Received a new message: ')
logger.info(message.payload)
json_data = json.loads(message.payload)
try:
click_type = json_data['click_type']
music_file = json_data['music_file']
except KeyError:
logger.error('keys could not be found')
print('--------------\n\n')
return
global music_path
music_path = Path(f'../../Music/{music_file}')
if music_path.is_file():
# シングルクリック時の処理
if click_type == "SINGLE":
play_pause_music(music_file)
# ダブルクリック時の処理
elif click_type == "DOUBLE":
global current_led_mode
led_mode = LED.ON.value if current_led_mode == LED.VISUALIZER.value else LED.VISUALIZER.value
send_data_to_arduino(led_mode)
current_led_mode = led_mode
# 長押し時の処理
elif click_type == "LONG":
stop_all()
else:
logger.info('None')
else:
logger.error(f'{music_file} could not be found')
print('--------------\n\n')
# Arduinoへシリアルデータを送る
def send_data_to_arduino(act_code):
try:
global ser
if ser is None:
ser = serial.Serial('/dev/ttyACM0', 9600, timeout = None)
time.sleep(3)
ser.write(act_code.encode())
logger.info(f'Sucsses to send data to Arduino:{act_code}')
except serial.serialutil.SerialException:
logger.error('Failed to send data to Arduino')
logger.error('Check the connection between Arduino and Raspberry pi by type /dev/ttyACM0')
# プレイヤーのコントロール(再生 or 一時停止)
def play_pause_music(music_file):
global player
global music_path
global plaing_music
if player is None:
player = OMXPlayer(music_path)
plaing_music = music_file
player.exitEvent += lambda p, e: clean_player()
logger.info(f'♪♪♪ Start music: {plaing_music} ♪♪♪')
elif music_file != plaing_music:
logger.info(f'chenge music {plaing_music} → {music_file}')
player.load(music_path)
plaing_music = music_file
logger.info(f'♪♪♪ Start music: {plaing_music} ♪♪♪')
else:
if player.is_playing():
player.pause()
logger.info(f'♪♪♪ Pause music: {plaing_music} ♪♪♪')
else:
player.play()
logger.info(f'♪♪♪ Play music: {plaing_music} ♪♪♪')
def stop_all():
global player
if not player is None:
player.stop()
clean_player()
send_data_to_arduino(LED.OFF.value)
global current_led_mode
current_led_mode = LED.VISUALIZER.value
def clean_player():
global player
player = None
if __name__ == '__main__':
basicConfig(level=INFO)
logger = getLogger(__name__)
main()
音楽ファイルの再生確認
Python
の対話モードで、音楽ファイルが再生されることを確認します。
確認する前にMusic
フォルダに音楽ファイルを配置して、ヘッドフォンジャックにイヤホンかスピーカーを接続してください。また、Raspberry Pi
の音声出力は、デフォルト設定ではHDMIケーブルかヘッドフォンジャックどちらか自動で判別されて再生されるようになっています。ヘッドフォンジャックが選択されていることを確認してください。
$ python
Python 3.6.7 (default, Dec 13 2018, 04:35:31)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from omxplayer.player import OMXPlayer
>>> from pathlib import Path
>>> music_path = Path('../../Music/sample.mp3') # 音楽ファイルのパス
>>> player = OMXPlayer(music_path) #再生開始
>>> player.pause() # 一時停止
>>> player.play() # 再生
>>> player.stop() # 停止
>>> quit()
音楽の再生/停止
が確認できたらOKです。
MQTTの動作確認
AWS IoT
のコンソールからトピックに発行してMQTT
の動作確認をします。
プログラムを起動します。
$ python xmas_tree.py
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:MqttCore initialized
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:Client id: myClientID
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:Protocol version: MQTTv3.1.1
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:Performing sync subscribe...
AWS IoT
のコンソールからトピックに発行
ボタンを押します。MQTT
の接続確認だけを行いたいので、メッセージはデフォルトの物を使用します。
Raspberry Pi
側でメッセージが受信ができたらMQTTの動作確認はOKです。
--------------
INFO:__main__:Received a new message:
INFO:__main__:b'{\n "message": "Hello from AWS IoT console"\n}'
ERROR:__main__:keys could not be found
--------------
Arduinoの準備
前回の投稿で作成したスケッチにシリアル通信の箇所を追記したものです。以下のスケッチをArduino
に書き込みます。
スケッチ
#include "FastLED.h"
// LEDの設定
#define LED_PIN 6
#define NUM_LEDS 60 // LEDの数
#define BRIGHTNESS 64
#define LED_TYPE WS2811
#define COLOR_ORDER GRB
#define UPDATES_PER_SECOND 100
#define REACT_LEVEL 512L // 音の反応レベル値
CRGB leds[NUM_LEDS];
// オーディオ入力の設定
int strobe = 4;
int reset = 5;
int audio1 = A0;
int audio2 = A1;
int left[7];
int right[7];
int band;
int audio_input = 0;
int freq = 0;
// ビジュアライザの変数
int midway = NUM_LEDS / 2; // 真ん中のLED
int loop_max = 0;
int k = 255;
int decay = 0;
int decay_check = 0;
long pre_react = 0;
long react = 0;
long post_react = 0;
int wheel_speed = 2;
// LEDのモード
bool is_visualizer_mode = false;
char led_mode = 'O';
void setup()
{
pinMode(13, OUTPUT);
digitalWrite(13, HIGH);
// スペクトルの設定
pinMode(audio1, INPUT);
pinMode(audio2, INPUT);
pinMode(strobe, OUTPUT);
pinMode(reset, OUTPUT);
digitalWrite(reset, LOW);
digitalWrite(strobe, HIGH);
// LEDの明るさを調整
delay( 3000 );
FastLED.addLeds<LED_TYPE, LED_PIN, COLOR_ORDER>(leds, NUM_LEDS).setCorrection( TypicalLEDStrip );
FastLED.setBrightness( BRIGHTNESS );
// LEDの初期化
initLED();
// シリアル入力にの設定
Serial.begin(115200);
Serial.println("\nListening from Raspberry Pi...");
Serial.begin(9600);
Serial.println("\nListening from Raspberry Pi...");
}
// 虹色の作成
// 参考 https://github.com/NeverPlayLegit/Rainbow-Fader-FastLED/blob/master/rainbow.ino
CRGB Scroll(int pos) {
CRGB color (0, 0, 0);
if (pos < 85) {
color.g = 0;
color.r = ((float)pos / 85.0f) * 255.0f;
color.b = 255 - color.r;
} else if (pos < 170) {
color.g = ((float)(pos - 85) / 85.0f) * 255.0f;
color.r = 255 - color.g;
color.b = 0;
} else if (pos < 256) {
color.b = ((float)(pos - 170) / 85.0f) * 255.0f;
color.g = 255 - color.b;
color.r = 1;
}
return color;
}
// MSGEQ7(スペクトラムアナライザ)から値を読み込む
void readMSGEQ7()
{
digitalWrite(reset, HIGH);
digitalWrite(reset, LOW);
for (band = 0; band < 7; band++)
{
digitalWrite(strobe, LOW);
delayMicroseconds(30); //
left[band] = analogRead(audio1);
right[band] = analogRead(audio2);
digitalWrite(strobe, HIGH);
}
}
// 真ん中のLEDから両端に向けてビジュアライズする
void doubleVisualizer()
{
for (int i = NUM_LEDS - 1; i >= midway; i--) {
if (i < react + midway) {
leds[i] = Scroll((i * 256 / 50 + k) % 256);
leds[(midway - i) + midway] = Scroll((i * 256 / 50 + k) % 256);
}
else
leds[i] = CRGB(0, 0, 0);
leds[midway - react] = CRGB(0, 0, 0);
}
FastLED.show();
}
// オーディオ信号からLEDを光らせるレベルを算出
void convertDouble()
{
if (left[freq] > right[freq])
audio_input = left[freq];
else
audio_input = right[freq];
if (audio_input > 80)
{
pre_react = ((long)midway * (long)audio_input) / REACT_LEVEL;
if (pre_react > react)
react = pre_react;
Serial.print(audio_input);
Serial.print(" -> ");
Serial.println(pre_react);
}
}
void doubleLevel()
{
readMSGEQ7();
convertDouble();
doubleVisualizer();
k = k - wheel_speed;
if (k < 0)
k = 255;
decay_check++;
if (decay_check > decay)
{
decay_check = 0;
if (react > 0)
react--;
}
}
void initLED()
{
for (int i = 0; i < NUM_LEDS; i++)
leds[i] = CRGB::Black;
FastLED.show();
}
void loop()
{
// シリアル通信
if (Serial.available() > 0)
{
initLED();
led_mode = Serial.read();
}
if (led_mode == 'O')
{
for (int i = 0; i < NUM_LEDS; i++)
leds[i] = CRGB::Red;
FastLED.show();
}
else if (led_mode == 'V')
{
doubleLevel();
}
else if (led_mode == 'F')
{
for (int i = 0; i < NUM_LEDS; i++)
leds[i] = CRGB::Black;
FastLED.show();
}
}
書き込みが終わったら、Raspberry Pi
と、LEDテープを接続します。
LEDテープの接続の方法は前回の投稿を参照。
シリアル通信の確認
Raspberry Pi
側からPython
の対話モードで、シリアル通信ができるか確認します。
シリアルポートにアクセスできるようにttyACM0
にアクセス許可をあげます。
対話モードで行うとアクセス許可がなくても動きますが、実行モード時はアクセス許可がないと動きません。ここハマりポイントです。
sudo usermod -a -G dialout <username>
sudo chmod a+rw /dev/ttyACM0
Python
の対話モードで実行
$ python
Python 3.6.7 (default, Dec 13 2018, 04:35:31)
[GCC 4.9.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import serial
>>> ser = serial.Serial('/dev/ttyACM0', 9600, timeout = None)
>>> ser.write('F'.encode())
1
>>> ser.write('O'.encode())
1
>>> quit()
LEDテープのチカチカが確認できたらOKです。
LTE-Mボタンから呼ばれるLambda関数を作成
Lambda関数を作成します。この関数はクリックイベントのタイプと再生する音楽ファイル名をAWS IoTへpublish
する関数です。
Lambda関数を作成
Python3.6
で作成しました。ロールはAWS IoTにアクセスするため、AWSIoTFullAccess
ポリシーをアタッチされたロールにします。
関数コード
# coding: utf-8
import json
import boto3
iot = boto3.client('iot-data')
def lambda_handler(event, context):
topic = 'LTEButtonTopic/action'
payload = {
"click_type": event['deviceEvent']['buttonClicked']['clickType'],
"music_file": event['placementInfo']['attributes']['musicFile']
}
try:
iot.publish(
topic=topic,
qos=0,
payload=json.dumps(payload, ensure_ascii=False)
)
return "Succeeeded."
except Exception as e:
print(e)
return "Failed."
テスト
Lambdaコンソールのテストイベント設定
からテストイベントを作成します。
以下はクリックイベントのタイプDOUBLE
と音楽ファイル名sample_music.mp3
を保持したテストイベントです。
{
"deviceInfo": {
"deviceId": "070GGXXXXXXXXXX",
"type": "button",
"remainingLife": 99.3,
"attributes": {
"projectRegion": "ap-northeast-1",
"projectName": "sample-project",
"placementName": "Sample-Placement",
"deviceTemplateName": "SampleRequest"
}
},
"deviceEvent": {
"buttonClicked": {
"clickType": "DOUBLE",
"reportedTime": "2018-12-17T16:48:33.804Z"
}
},
"placementInfo": {
"projectName": "Sample-Project",
"placementName": "Sample-Placement",
"attributes": {
"musicFile": "sample_music.mp3"
},
"devices": {
"Sample-Request": "070GGXXXXXXXXXX"
}
}
}
Raspberry Pi
のプログラムを起動して、テスト押します。
Raspberry Pi
側でメッセージが受信ができたら動作確認はOKです。
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:Performing sync subscribe...
INFO:AWSIoTPythonSDK.core.protocol.mqtt_core:Performing sync subscribe...
--------------
INFO:__main__:Received a new message:
INFO:__main__:b'{"click_type": "DOUBLE", "music_file": "sample_music.mp3"}'
ERROR:__main__:sample_music.mp3 could not be found
--------------
LTE-Mボタンの登録
AWS IoT 1-Click
のコンソールから登録します。ボタンの後ろに記載されてるDSNを入力するだけです。簡単ですね。
登録したら、ボタンの有効にします。
AWS IoT 1-Click のプロジェクトを作成
AWS IoT 1-Click
のコンソールからプロジェクトを作成します。
デバイステンプレートを作成します。アクションをLambda関数を選択
にしてLambda関数
は作成した関数を指定します。
続けて、プレイスメントを作成します。登録したボタンをデバイスに指定して、属性には再生したい音楽ファイル名を指定します。
ボタンとプロジェクトが紐づいたら、AWS IoT 1-Click
の準備は完了です。
いいや!限界だ押すね!今だッ!
シングルクリック:音楽再生/一時停止
シングルクリック:音楽再生/一時停止 pic.twitter.com/aNnCFNxp9Y
— kokichi@opst (@kokichi_opst) 2018年12月24日
スピーカーからノイズが。。
ダブルクリック:LEDモード切り替え(常灯、ビジュアライザ)
ダブルクリック:LEDモード切り替え(常灯、ビジュアライザ) pic.twitter.com/6Jpkl7wO6R
— kokichi@opst (@kokichi_opst) 2018年12月24日
長押し:LED消灯、音楽の停止
長押し:LED消灯、音楽の停止 pic.twitter.com/YBk2M0qH5S
— kokichi@opst (@kokichi_opst) 2018年12月24日
最後に
初めてのSORACOM LTE-M Button
でした。ボタンを使用するまでが、とても簡単でしたのでプログラムの方に集中できました。LTEの通信網を使用しているのでWiFi環境がないところでも使えるというのは使用シーンが広がりますね。面白い使い方を思いついたら、また投稿したいと思います。
参考
[AWS IoT Device SDK for Pythonを使ってRaspberryPiとAWS IoTをつないでみる]
(https://dev.classmethod.jp/hardware/raspberrypi/aws-iot-device-sdk-for-python-raspberrypi-aws-iot/)
AWS IoT Device SDK for Pythonのサンプル
pyserialの公式ドキュメント
Pythonでシリアル通信
omxplayer-wrapperの公式ドキュメント