RaspberryPiでセンサーデータをfluentdを使ってDynamoDBに送って可視化してみよう<上級編>

  • 27
    Like
  • 0
    Comment

概要

本コンテンツは、JAWS-UG関西IoT専門支部主催のハンズオン向けに作成したものです。
下図の通り、IoTデバイス(RaspberryPi)でセンシングしたデータをリアルタイムで可視化することが目標です。
通信にはWi-Fiまたは3Gを、データベースにはAWS DynamoDBを、可視化にはIoT.kyoto VISという無料サービスを利用します。
989c03b4-4479-373a-b785-2b08c5b35bc1.png


<上級編>とは

本コンテンツはLinuxやAWSの基礎的なスキルを有している方向けに<完全編>の記述内容を簡略化することで、試行錯誤やデバッグを行いながら学習できるように配慮されています。行き詰まった場合は、各章の先頭に<完全編>へのリンクを配置していますので、適宜参照して下さい。

前提条件

本コンテンツは、「Raspberry Pi でSORACOM 接続+MQTTをしてみよう 後編」が完了している方を対象としており、下記の前提条件については説明いたしません。

  • IoTデバイスには、最新のRaspbianがインストールされているRaspberryPi 2 Model Bを使用
  • RaspberryPiがインターネットに接続できること(Wi-Fi/3G)
  • RaspberryPiにPCからSSHで接続できること
  • RaspberryPiに照度センサーが接続済みで、センサーログをJSON形式でテキストファイルに保存できること

また、AWSアカウントが必要です。

今回利用するツール/サービス

fluentd

fluentdとは、米トレジャーデータ社が中心となり、オープンソースとして開発を行っているログ収集ミドルウェアです。いわゆるPub-Subモデルによりログの配送経路を柔軟に制御することが可能で、データのinputとoutputはプラグインという形で実装されています。サーバ等のLOG収集に使われることが多いですが、IoTデバイスのストリームデータの収集にも向いています。
さくらのナレッジの「柔軟なログ収集を可能にする「fluentd」入門」がよくまとまっていて分かりやすいです。
今回はRaspberryPiにfluentdをインストールし、下図のように利用します。
fluentd.PNG

AWS DynamoDB

AWS DynamoDBとは、アマゾン ウェブ サービスが提供する完全マネージド型のNoSQLデータベース(いわゆるKVS)です。IoTと非常に親和性が高く、下記のような特徴があります。

  • サイジング不要、I/Oスループットのみ指定する
  • スキーマレス、KEYが増えてもマイグレーションしなくていい
  • SQLではなくAPIで利用する。ストリームデータを連続的に書き込みつつ即座に読み出すようなことは得意領域。一方、SQLが得意とする複雑な処理は苦手
  • 排他制御やトランザクション処理はできない(ライブラリを利用すれば可能)が、その分高速
  • テーブル内のデータの追加・変更をトリガーにコードを実行することができる(DynamoDB Streams -> Lambda)
  • indexの張り方に失敗すると痛い目に遭う。LSI/GSIの使い分けが難しい
  • 1回のQueryで取得できるデータが約1MBに制限されている

詳しく知りたい方はBlack Belt Tech シリーズ開発者ガイドを読んでみてください。

テーブルの削除手順

テーブルを選択して、[Actions] -> [Delete table]

DynamoDB · AWS Console 2016-04-02 16-15-05.png

IoT.kyoto VIS

IoT.kyoto VISとは、株式会社KYOSOが提供するDynamoDB特化のグラフ化Webサービスで、リアルタイムでのグラフ表示に強みがあります。
このハンズオンのために私とjsエンジニアの2名で立ち上げました( ー`дー´)キリッ (半分冗談、半分本気w)

[ToDo 0] RaspberryPiの準備

<完全編>ToDo 0

照度計回路を準備する

「Raspberry Pi でSORACOM 接続+MQTTをしてみよう 後編」で作製した回路をそのまま利用します。未作製の方は「RaspberryPiで照度計をつくろう」にしたがって回路を作製してRaspberryPiに接続して下さい。必要なパーツはこちらを参照してください。

インターネット接続確認とRaspbianのアップデート

  • RaspberryPiを有線LANかWi-Fiもしくは3Gでインターネット接続して下さい
  • PCからRaspberryPiにSSH接続して下さい
  • 何らかの手段でRaspberryPiがインターネット接続していることを確認して下さい
  • OSパッケージリストのUpdateとvimのインストールを実行して下さい

[ToDo 1] 照度計回路の動作を確認する

<完全編>ToDo 1
下記Pythonコードを実行して、照度センサーの値が取得できることを確認します。

  • コード内にログファイルに関する記述がありますので、適当な内容に変更して下さい
  • コードを実行して標準出力にJSONが表示されることを確かめます
  • ログファイルにJSONが記録されていることを確かめます
shodo-dynamo.py
#coding: utf-8

# spi, time ライブラリをインポート
import spidev
import time
# コマンド実行ライブラリ
import subprocess
# センサーデータのJSON化のためにインポート
import json
from datetime import datetime

# SpiDev オブジェクトのインスタンスを生成
spi = spidev.SpiDev()
# ポート0、デバイス0のSPI をオープン
spi.open(0, 0)
# 最大クロックスピードを1MHz に設定
spi.max_speed_hz=1000000
# 1 ワードあたり8ビットに設定
spi.bits_per_word=8

# ダミーデータを設定(1111 1111)
dummy = 0xff
# スタートビットを設定(0100 0111)
start = 0x47
# シングルエンドモードを設定 (0010 0000)
sgl = 0x20
# ch0 を選択(0000 0000)
ch0 = 0x00
# ch1 を選択(0001 0000)
ch1 = 0x10
# MSB ファーストモードを選択(0000 1000)
msbf = 0x08
# IC からデータを取得する関数を定義
def measure(ch):
    # SPI インターフェイスでデータの送受信を行う
    ad = spi.xfer2( [ (start + sgl + ch + msbf), dummy ] )
    #
    val = ((ad[0] & 0x03) << 8) + ad[1]
    # 受信した2バイトのデータを10 ビットデータにまとめる
    voltage =  ( val * 3.3 ) / 1023
    # 結果を返す
    return val, voltage
try:
    # 無限ループ
    while 1:
        # 関数を呼び出してch1 のデータを取得
        ch1_val, ch1_voltage  = measure(ch1)
        # 結果を表示
        #print  'ch1 = {:4d}, {:2.2f}[V]'.format(ch1_val, ch1_voltage)
        # センサーデータをJSON化するため辞書に入れる。後々のデータ処理に用いる
        # JSONデータ構造 {"brightness": ch1_val, "ID": "id001", "time_sensor": "2015-10-15 16:21:56"}
        # "ID": "id001"の001の部分はデバイスによって変える
        json_data = {"time_sensor": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "ID": "id001", "brightness": ch1_val}
        encode_json_data = json.dumps(json_data)
        #json_dataを標準出力し、ログファイルに追記
        print encode_json_data
        f = open("/path/to/logfile/hoge.log","a")
        f.write(encode_json_data)
        f.write("\n")
        f.close()
        # 1秒待つ
        time.sleep(1)

# キーボード例外を検出
except KeyboardInterrupt:
    # 何も処理をしない
    pass

# SPI を開放
spi.close()

[ToDo 2] APIアクセス用のKeyを作成する

<完全編>ToDo 2
DynamoDBにAPIでアクセスするために、IAM Access Keyを作成します。

  • 下記2ユーザーを作成し、IAM Access Keyを取得して下さい
ユーザー名 アクセス権 使用箇所
dynamo-full AmazonDynamoDBFullAccess RaspberryPiからDynamoDBへの書き込み
dynamo-read AmazonDynamoDBReadOnlyAccess IoT.kyotoからDynamoDBへの読み出し

[ToDo 3] fluentdのインストール

<完全編>ToDo 3
RaspberryPiにfluentdをインストールします。

fluentd本体のインストール

プラグインとの適合性を考慮して、0.12系の最新版をインストールします。

$ sudo apt-get install ruby-dev libssl-dev
$ sudo gem install fluentd -v "~> 0.12.0" --no-ri --no-rdoc

プラグイン/SDKのインストール

fluent-gemコマンドで下記2つのモジュールをインストールして下さい。

  • fluent-plugin-dynamodb
  • aws-sdk-v1

DynamoDBプラグインのソース修正

デフォルトのままでは、aws-sdk-v2を読み込んでしまい正常に動作しないので、明示的にaws-sdk-v1を読み込むように修正してください。

[ToDo 4] fluentdの設定

<完全編>ToDo 4

設定ファイルとログファイルの作成

  • 適切なディレクトリを作成してfluentdのログファイルを作成します (例)fluent.log
  • 同ディレクトリにfluentdの設定ファイルを作成します (例)fluent.conf
  • [ToDo 1]で作成されたログファイルに対応するposファイルを作成します。posファイルはログ読み取りのステータス(どこまで読み取ったか)を記録します (例)hoge.pos

設定ファイルの記述

設定ファイルは下記のようなルールで記述します。<source>ディレクティブはインプットに関する処理や付加するタグについて、<match>ディレクティブはタグが一致したものについてどのような処理をするかを記述します。

<source>
  @type <使用するプラグイン>
  <設定項目1> <設定値1>
  <設定項目2> <設定値2>
  :
  :
</source>
<match <条件文>>
  @type <使用するプラグイン>
  <設定項目1> <設定値1>
  <設定項目2> <設定値2>
  :
  :
</match>

<source>ディレクティブ

照度センサーログファイルのtailを取得してdynamodb.shodoというタグを付加します。

fluent.conf
<source>
  # inputにtailプラグインを指定
  @type tail
  # フォーマットを指定
  format json
  # 照度センサーログファイルをフルパスで指定
  path /path/to/logfile/hoge.log
  # posファイルをフルパスで指定
  pos_file /path/to/posfile/hoge.pos
  # タグを指定
  tag dynamodb.shodo
</source>

<match>ディレクティブ

  • DynamoDBプラグインのGitHubリポジトリのREADMEを参考に記述して下さい
  • AWSのKeyには、[ToDo 2]で作成した「dynamo-full」ユーザーのKeyを使用します

テスト

[error]の行がないか確認します。

$ sudo fluentd -c /fluent/fluent.conf --dry-run

[ToDo 5] DynamoDBのテーブルを作成する

<完全編>ToDo 5

下記条件でDynamoDBのテーブルを作成して下さい。条件にないものは適切に設定して下さい。

項目 設定内容
Partition key ID
Sort key time_sensor

[ToDo 6] DynamoDBにセンサーログを送信する

<完全編>ToDo 6

fluentdを実行する

$ sudo fluentd -c /path/to/fluent.conf -v -o /path/to/fluent.log &
[1] 20284

正常に実行されているかどうかはログを確認します。
※「20284」はPID

Pythonコードを実行する

$ python /path/to/shodo-dynamo.py
{"brightness": 169, "ID": "id000", "time_sensor": "2016-03-28 18:45:25"}
{"brightness": 171, "ID": "id000", "time_sensor": "2016-03-28 18:45:26"}
{"brightness": 171, "ID": "id000", "time_sensor": "2016-03-28 18:45:27"}
{"brightness": 170, "ID": "id000", "time_sensor": "2016-03-28 18:45:28"}

DynamoDBに正常にPutできているか確認する

  • DynamoDBコンソールの[items]タブでレコードを確認します。Partition key(ID)を指定して降順でQueryを投げれば見やすいでしょう
  • DynamoDBプラグインのREADMEの記載内容だけでは1分に1回まとめてデータが送信されるはずです。ストリーム送信されるように設定を変更して下さい

[ToDo 7] IoT.kyoto VISでグラフ表示する

<完全編>と同内容
ブラウザはChrome(Win/Mac)またはSafari(Mac)をお使い下さい。

サインアップ

https://vis.iot.kyoto にアクセスし、「新規登録」をクリックします。

設定

IoT.kyoto VIS公式マニュアルの3~4章を参照。
AWSのKeyには、[ToDo 2]で作成した「dynamo-read」ユーザーのKeyを使用します。

いろいろ試してみる

[レンジ設定]を[手動]に変更して0~1000くらいに設定するとグラフが見やすくなります。下記のようなことを試してみましょう。

  • フォトトランジスタにライトを当てたり手で覆ったりして、グラフの変化や描画までの遅延を確認する
  • 閾値超えのアラートメールが届くか試してみる
  • 日時指定で過去の履歴を表示してみる
  • 別コンソールでデバイスIDを変えたPythonコードを実行(擬似的に複数デバイスが接続された状態となる)して、IoT.kyoto VISの画面で2個のグラフを同時に表示してみる
  • データの送信間隔を変えてみる(「グラフ設定(歯車)」でグラフの更新間隔を変えることができます) スクリーンショット 2016-04-02 16.39.38.png

[ToDo 8] 自動実行設定

<完全編>ToDo 8
RaspberryPi電源投入時に各種処理を自動実行させるようにします。

  • fluentdとPythonコードが自動実行されるようにしましょう。ただし、fluentdの実行タイミングによってはエラーが出る場合がありますので、fluentdのログを確認しながら実行タイミングを調整して下さい
  • SORACOMで接続している場合は自動接続されるようにしましょう

[ToDo 9] シャットダウンボタンをつくろう

<完全編>ToDo 9
自動起動ができるようになったら、シャットダウンのためにSSHでログインするのは面倒ですよね! シャットダウンのための物理スイッチを作ってPCレス運用ができるようにしましょう。
今回は、物理スイッチはパーツリストに入れていませんでしたので、ジャンパワイヤで代用します。物理スイッチを購入するのであれば、ブレッドボードに挿さる6mm程度のタクトスイッチを選べばよいでしょう。
http://www.amazon.co.jp/dp/B00H3CVRGY

配線

GPIO 23(16番ピン)、GND(14番ピン等)にそれぞれジャンパワイヤを繋ぎ、タクトスイッチの代わりにオス-オスのジャンパワイヤで短絡できるようにブレッドボードに配線します。

Pythonコード

コードを実行した後に、ジャンパワイヤで4秒以上短絡させてみましょう。now power-off !と表示された後にシャットダウンされるはずです。

sw.py
#coding: utf-8
import RPi.GPIO as GPIO
import os
import time

GPIO.setmode(GPIO.BCM)
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_UP)

cnt = 0
# 4回実行=4秒でループを抜ける
while cnt < 4:
    if GPIO.input(23) == GPIO.LOW:
        cnt += 1
    # 1秒待ってループ
    time.sleep(1)
else:
    print "now power-off !"
    GPIO.cleanup()
    # 4秒以上短絡状態が続いたらシャットダウン実行
    os.system("/sbin/shutdown -h now")

自動実行設定

[ToDo 8]同様に自動実行されるようにしましょう。
正常に動作していることが確認できたら本コンテンツは終了です。お疲れ様でした!