この記事はIoTバックエンド Advent Calendar 2015の20日目の記事です。
IoTっぽくAmazon KinesisストリームへEdisonから加速度センサーの値を送るアプリケーションを作ります。
1. ハードウェアの準備
Intel Edison + スイッチサイエンス版Eaglet
Intel EdisonのI/Oは1.8Vですが、Eagletには3.3Vへの変換回路が搭載されているので、一般的に販売されているセンサーの多くが利用できます。
さらにEagletにはGROVEを接続できるコネクタが搭載されています。
なので、加速度センサーとしてGROVEのものを使います。
GROVE - I2C 三軸加速度センサ ADXL345搭載
接続は非常に簡単です。Groveコネクタ同士をセンサーに付属のケーブルで接続するだけです。
逆には刺さらないようになっているため安心しましょう。
2. Edisonへソフトウェアのインストールと設定
前回・前々回の記事の部分はやっておきます。
EdisonにSSHでログインします。
アプリケーションをPythonで開発するのに必要なパッケージをインストールします。
# Pythonのパッケージマネージャーのインストール
root@my-edison:~# opkg install python-pip
# AWSCLIのインストール
root@my-edison:~# pip install awscli
# PythonのAWS用のパッケージをインストール
root@my-edison:~# pip install boto3
AWSへ接続するために接続設定をしておきます。
root@my-edison:~# aws configure
AWS Access Key ID [None]: <Edison用のキー>
AWS Secret Access Key [None]: <Edison用のキー>
Default region name [None]: <利用したいリージョン>
Default output format [None]: <お好きに>
3. Kinesisストリームの作成
データを流すためのKinesisストリームを作成します。
ここではAWSCLIから作成していますが、AWSマネジメントコンソールから行っても問題ありません。
ストリーム名をSample-Edison-Stream
として進めます。適宜好きな名前に置き換えてください。
$ aws kinesis create-stream --stream-name Sample-Edison-Stream --shard-count 1
ストリームができているか確認。
$ aws kinesis describe-stream --stream-name Sample-Edison-Stream
{
"StreamDescription": {
"RetentionPeriodHours": 24,
"StreamStatus": "CREATING",
...
StreamStatus
がACTIVE
になれば完了です。CREATING
の場合はしばらく待ちましょう。
4. Kinesisにデータを送るアプリケーションを作成
約0.1秒に1回、加速度センサーの値をKinesisへ送るアプリケーションを作成します。
作成したアプリケーションは以下のリポジトリに公開していますのでご利用ください。
https://github.com/mia-0032/edison-adxl345-to-kinesis
Edisonには最初からupm(Useful Packages & Modules)というパッケージがインストールされており、これを使うことでセンサーなどを簡単に利用できるようになっています。
# -*- coding: utf-8 -*-
import boto3
import json
import pyupm_adxl345 as adxl345
from socket import gethostname
from sys import argv
from time import sleep
# Kinesisのストリーム名は引数で指定
stream_name = argv[1]
# EagletのGroveコネクタはI2C 6番に接続されているので6を指定
adxl = adxl345.Adxl345(6)
kinesis = boto3.client('kinesis')
while True:
# 加速度を取得する
adxl.update()
force = adxl.getAcceleration()
data = {'X': force[0], 'Y': force[1], 'Z': force[2]}
# Kinesisへデータを送る
res = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=gethostname() # 今回はシャードが1個なのであまり関係なし
)
sleep(0.1)
以下のコマンドで実行できます。
root@my-edison:~# python sender.py Sample-Edison-Stream
5. Kinesisにデータが送られているか確認
Kinesisへデータが流れているか確認します。
確認用のPythonスクリプトを作成しましたのでご利用ください。
# -*- coding: utf-8 -*-
import boto3
import logging
from sys import argv
from time import sleep
# ロガーの生成
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Kinesisのストリーム名は引数で指定
stream_name = argv[1]
logger.info("Stream Name: {0}".format(stream_name))
kinesis = boto3.client('kinesis')
# シャードIDを取得
stream = kinesis.describe_stream(
StreamName=stream_name
)
# 今回はシャード数1でストリームを作成しているため先頭のシャードIDを使う
shard_id = stream['StreamDescription']['Shards'][0]['ShardId']
# 最初のシャードイテレータを取得
shard_iterator = kinesis.get_shard_iterator(
StreamName=stream_name,
ShardId=shard_id,
ShardIteratorType='TRIM_HORIZON'
)['ShardIterator']
# ループでひたすらレコードを取得する
while True:
res = kinesis.get_records(
ShardIterator=shard_iterator,
Limit=100
)
for r in res['Records']:
logger.info(
'Time: {0}, Data: {1}'.format(
r['ApproximateArrivalTimestamp'],
r['Data']
)
)
shard_iterator = res['NextShardIterator']
sleep(1)
以下のコマンドで実行できます。
root@my-edison:~# python receiver.py Sample-Edison-Stream
こんな感じでレコードが流れてくれば成功です。
INFO:root:Time: 2015-12-20 14:40:59.522000+09:00, Data: {"Y": -0.10188677161931992, "X": -0.075471684336662292, "Z": 0.95703125}
INFO:root:Time: 2015-12-20 14:40:59.675000+09:00, Data: {"Y": -0.056603759527206421, "X": 0.0037735840305685997, "Z": 0.94921875}
INFO:root:Time: 2015-12-20 14:40:59.833000+09:00, Data: {"Y": -0.24150937795639038, "X": 0.018867921084165573, "Z": 0.90625}
INFO:root:Time: 2015-12-20 14:40:59.983000+09:00, Data: {"Y": -0.22264145314693451, "X": -0.0037735840305685997, "Z": 0.9296875}
INFO:root:Time: 2015-12-20 14:41:00.133000+09:00, Data: {"Y": -0.15094336867332458, "X": 0.011320752091705799, "Z": 0.9375}
以上でKinesisにデータを送ることができました!お疲れ様です。