Edited at

Raspberry PiからFluentdでBigQueryにデータを送るウェザーステーションの作り方

More than 5 years have passed since last update.

最近なんだか個人的に電子工作ブームで、ついAmazonでRaspberry Piをポチってしまった。とりあえずウェザーステーション(気温・湿度・気圧を測るやつ)を作ってみた。

Untitled.png

びろーんと伸びてるのは温度・湿度センサーDHT22で、基板上で青く光っているのが気圧センサーLPS331。丸くて黒いやつはなんとなくつけてみた圧電スピーカーで今回は使ってない。

そして、これらのセンサーデータを10秒おきにFluentd経由でGoogle BigQueryに送る簡単なPythonコードを書いた。Google SpreadsheetからBigQueryのクエリを実行して描いた俺の部屋のお天気環境グラフがこんな感じ。

kobito.1411883641.895990.png

単に1台分のグラフを書くだけならBigQueryにデータを入れる必要はなくてSpreadsheetに直接送れば済むのだけど、RasPi+Fluentd+BQの連携をいちど試してみたかったのだ。BQは毎秒1M〜2M行程度のリアルタイムインポートをさばけるので、もし仮に気象庁様からこのRasPiを使って全国100万か所に次世代アメダスを展開したい!1か月以内に!って言われたとしても、同じものを100万台並べるだけでよい。サーバーいらずで次世代アメダス開発終了である。

以下、このウェザーステーションの作り方を簡単にまとめ。


RasPiとセンサーの接続、ドライバまわり

必要な材料は以下のとおり。全部で1万円もかからない。

ケース付きのRasPiを購入し、NOOBSプリインストールのSDカードを挿して電源を入れればOSを簡単にセットアップできる。USB WiFiもOS付属のツールですぐに使えた。RasPiお手軽すぎる。

センサー群とRasPiの接続、それと値の読み込み方法は、以下のページが参考になる。

kobito.1411891019.710035.png

From DHT Humidity Sensing on Raspberry Pi

特別な回路を組む必要はなく、RasPiのGPIOとセンサーをジャンパーでつなぐだけである。ただし、温度センサーはRasPi本体からちょっと離しておかないとCPUの熱の影響を受けるので注意。RasPiのIOまわりはRaspberry Piクックブックを参考にしたり。


Ansibleでドライバ組み込み

難しいのはセンサーの値の読み込み方法で、ドライバを自分で書くのは大変そうだ。特にDHT22は面倒そうなのだけど、そこはさくっとGitHubに載っているありもののドライバを使った。

組み込みデバイスとしてのRasPiの面白いところは、まったく普通のDebian系Linuxなので、こういうネット上のリソースの利用がとてもお手軽な点。例えばDHT22のドライバインストールも、Ansibleで以下のようなPlaybookを書いてGitHubから直接引っ張ってこれる。

    # Adafruit DHT drivers

- git: repo=git@github.com:adafruit/Adafruit_Python_DHT.git
dest={{ dht_dir }} accept_hostkey=yes
sudo: no
- command: python setup.py install chdir={{ dht_dir }}

もっとも、RasPiは遅いのでPlaybookの実行が終わるまでにすっげー待たされるのだけど。。:)


Pythonでセンサーの値を読む

気圧センサーLPS331はI2CというIOバスで接続するので、Pythonからはi2ctoolsというI2C通信用のコマンドをsubprocessで呼び出して値を読み込む。こんな感じ。

def cmd_exec(cmd):

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if stderr != None and len(stderr.strip()) > 0:
raise IOError("Error on executing cmd: " + stderr)
return stdout.strip()

def i2cget(reg):
return cmd_exec("i2cget -y 1 " + LPS331_ADRS + " " + reg)

こうしてi2cgetコマンドを呼び出す準備を整えたら、I2Cのレジスタを読み込んでごにょごにょ計算すると気圧のヘクトパスカル値が得られる。

def read_lps():

# reading from LPS
out0 = i2cget("0x28")
out1 = i2cget("0x29")
out2 = i2cget("0x2a")

# decoding the value
return (int(out0, 16) + (int(out1, 16) * 0x100) + (int(out2, 16) * 0x10000)) / 4096.0

温度・湿度センサーDHT22の方は上述のAdafruitのPythonドライバを使えば簡単に値を読める。

# read humidity and temp from DHT 

humidity, temp = Adafruit_DHT.read_retry(Adafruit_DHT.DHT22, DHT22_GPIO)


FluentdでBigQueryに送る

温度、湿度、気圧のデータが取れたので、つづいてはFluentd経由でBigQueryに送る部分。Ansibleで以下のように書いて、Fluentdとfluent-logger-pythonfluent-plugin-bigqueryをインストールする。

    # Fluentd

- command: aptitude install ruby-dev
- command: gem install fluentd

# pip, fluent-logger-python, fluent-plugin-bigquery
- command: aptitude install python-pip
- command: pip install fluent-logger
- command: fluent-gem install fluent-plugin-bigquery

一方、fluentd.confではfluent-logger-pythonから受け取ったイベントログをfluent-plugin-bigqueryに流す以下の様な設定を書いておく。

<source>

type forward
port 24224
</source>

<match weather.**>
type bigquery

method insert

auth_method private_key
email YOUR_SERVICE_ACCOUNT_EMAIL
private_key_path YOUR_PRIVATE_KEY_FILE_PATH

project YOUR_PROJECT_ID
dataset YOUR_DATASET
table YOUR_TABLE_NAME

time_format %s
time_field time

fetch_schema true
field_integer time
</match>

BQプラグイン、最新版ではBQのスキーマを自動的にフェッチしてくれるので、BQスキーマを記述する必要がない! じつにお手軽にFluentdからBQにログを流せる。。プラグイン開発者の皆様に寿司をおごってあげたいくらいだ。

こうしてFluentd側の準備ができたら、Pythonコードではfluent-logger-pythonを使って測定データをFluentdに流す。

    # write metrics to local fluentd

event.Event("metrics", {
"atmos": atmos,
"hum": humidity,
"temp": temp
})


BigQueryでテーブルと鍵を作る

あとはFluentdから流れてくるイベントログを受け取るBigQuery側の準備である。今回は以下のような簡単なテーブルスキーマを用意した。ちなみに、複数デバイスからデータを集める場合はデバイスのIDを持たせる必要があるだろう。

[

{
"name": "time",
"type": "INTEGER"
},
{
"name": "hum",
"type": "FLOAT"
},
{
"name": "temp",
"type": "FLOAT"
},
{
"name": "atmos",
"type": "FLOAT"
}
]

このスキーマファイルを使い、Google Cloud SDKのbqコマンドでBQのテーブルを作成する。

> bq mk -t <your-project-id>:<your_dataset>.weather_report wr_bqschema.json

残る作業は、fluent-plugin-bigqueryからBQへ接続するためのprivate keyの作成と登録。Google Cloud PlatformのコンソールページAPIs & authを選択し、サービスアカウント用の新しいClient IDを作成する。

kobito.1411892684.312256.png

するとprivate keyのダウンロードが始まるので、このファイルをRasPiにscpでコピーし、fluentd.confを編集してprivate_key_pathフィールドにファイルパス、emailフィールドにサービスアカウントのメールアドレスをそれぞれ設定しておく。


BigQueryでクエリを実行してみる

以上で準備は完了。さっそく動かしてみよう。

> fluentd -c fluentd.conf

を実行してFluentdの正常起動を確認したら、別のシェルを開いて、

> sudo python weather_report.py

を実行。正常に動いていれば、コンソール上には何も表示されない。BigQueryのコンソール上で以下のようなSQLを叩いてみて、データが飛んできていることを確認する。

SELECT LEFT(STRING(SEC_TO_TIMESTAMP(time)), 15) + '0:00' as time,

AVG(temp) as temp, AVG(hum) as hum, AVG(atmos) as atmos
FROM [YOUR_PROJ:YOUR_DATASET.weather_report]
GROUP BY time ORDER BY time DESC

kobito.1411893063.321064.png

いい感じに保存されているようす。ちなみにこのクエリは、10秒おきに記録される気温・湿度・気圧について10分ごとの平均値を集計してタイムスタンプの降順でソートするクエリだ。もし俺製次世代アメダスに100億行のデータが溜まったとしても何の問題もなく10秒くらいで同様の結果を返してくるのがBQの恐ろしいところ。もうセンサーでもPOSでも家電でも、なんでもBQに直接生ログを送っとけばいいと思うよ。


Google Spreadsheetでグラフを書く

Google SpreadsheetのGoogle Apps Scriptを使えば、BigQueryのクエリを実行して結果をシートに保存したり、そこからグラフを描いたりできる。このあたりは、

等を見れば使い方が分かるはず。自分でApps Scriptコードを書かずとも、BigQuery Dashboardの公開シートをこのドキュメントのUsing BigQuery Dashboardにある手順でBQ接続設定を済ませれば、シートにクエリを書き込むだけですぐに実行できる。

kobito.1411907400.014620.png

9/23〜9/27にかけての気圧変化。そういえば24〜25あたりは曇り空だった。天気予報機能も付けたいなー。

その他、このウェザーステーションの製作で使ったソフトやツール群、playbook、手順等はGitHubに載せておいたので参考にしていただきたい。


RasPi+Fluentd+BQおもろいよ

ここで見てきたとおり、RasPiは電子工作デバイスながらもフツーのLinuxとして使えるので、センサー等で読み込んだ値をFluentdだのBQだの今どきのツールで扱えるのがとても面白い。BQに直接データを入れてしまえば、サーバーいらず・手間いらず・低コストで死ぬほどスケーラブルなデータ収集基盤を土日のホビーで作れてしまう。ぜひぜひお試しあれ。


Disclaimer この記事は個人的なものです。ここで述べられていることは私の個人的な意見に基づくものであり、私の雇用者には関係はありません。