Raspberry PiからKinesisへのアップロードをboto
でやっていたがfluentd
を使うことにした。
As-Is
-
次の測定をするのに、送信が終わるのを待つ必要があり、測定と送信(による永続化)が密結合になっている
- ネットワークやkinesisのリクエスト上限などの問題が発生すると待たされたり刺さったりする
- 送信機能のメンテをしている間は測定ができない
To-Be
- 測定結果はファイルに追記するところまでをスクリプトの責務とし、追記分の送信は
fluentd
に任せる。
fluentd
プラグインは以下を使う。
インストール
問題ないと思うので割愛。以下を参考にさせていただいた。
設定
fluent.conf
<source>
type tail
path /var/log/sensors
pos_file /tmp/sensor.log.pos
tag iot.log
format json
</source>
<match iot.**>
type copy
<store>
type stdout
</store>
<store>
type kinesis
stream_name iot
aws_key_id XXXX
aws_sec_key XXXX
region ap-northeast-1
partition_key kinesis_partition_key
time_key stream_time
time_format %s
flush_interval 1s
</store>
</match>
-
flush_interval
- 既定値はない?ようなので、適当に設定。(設定しないと待たされてデバッグが捗らない)
-
partition_key
- 値でなくキーを指定。
起動
$ fluentd -c fluent.conf -v
センサ値の書き出し
boto
の利用をやめて、print
でデータを標準出力に書き出す。センサ値読み取りはオレオレモジュールに外出し。
(ここに関していえば、pythonである必要性はなくなった・・・)
show_sensors_values.py
import time
import json
import subprocess
import netifaces
from sys import stderr,stdout
import os
from GPIO import *
from DHT import *
from Analog import *
## disable buffering (equivalent to `python -u`)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
# prepare sensor readers
GPIO = GPIO()
DHT = DHT()
Analog = Analog()
ANALOG_CHANNELS = {
'mic' : 1,
'ir_standard_analog' : 2,
'photo' : 3,
}
# Kinesis conf
stream_name = 'iot'
# main loop
while True:
time.sleep(1)
print >> stderr, "reading DHT..."
# FIX ME: subprocess is necessary because access to DHT requires root!
dht_process = subprocess.Popen(["sudo", "python", "read_dht.py"], stdout=subprocess.PIPE)
(dht_json, _) = dht_process.communicate()
data = json.loads(dht_json)
print >> stderr, "reading digital sensors..."
data['ir_tremor_digital'] = GPIO.read(24)
data['ir_10m_digital'] = GPIO.read(25)
print >> stderr, "reading analog sensors..."
for sensor, ch in ANALOG_CHANNELS.iteritems():
data[sensor] = Analog.read(ch)
# wrap up
try:
data['ipaddr'] = netifaces.ifaddresses('wlan0')[netifaces.AF_INET][0]['addr']
except:
data['ipaddr'] = None
data['sensor_time'] = time.time()
data['kinesis_partition_key'] = 1 # anything
data = json.dumps(data)
print(data)
- pythonの場合、
os.fdopen
しないと、標準出力がバッファリングされてしまい、fluentdが期待通り動かなくてハマった。 -
fluent-cat
使ってもよいが、fluentdの稼働状況に依存する?気がするので、データのみ標準出力にして、ファイル書き出しにする(そのぶんlogrotateは必要になる)
起動
$ python show_sensors_values.py >> /var/log/sensors
Kinesisからのデータ取得
読み取り側コードは基本的に変更なし。
Kinesisのコンソール画面などでもPUTがあるかないかはだいたいわかる。
OS起動時に起動するようにする
本題ではないが、Raspberry Piを頻繁に起動したり終了したりする場合は、crontabの@reboot
を使うのが手っ取り早い気がする。ちゃんとやるなら起動スクリプトを書いたりプロセス監視すべき。
@reboot fluentd -c /path/to/fluent.conf -v
@reboot cd /path/to/script; python show_sensors_values.py >> /var/log/sensors 2> /var/log/sensors.err