LoginSignup
3

More than 5 years have passed since last update.

Raspberry PiからKinesisへのデータ送信をfluentd経由にする

Last updated at Posted at 2015-02-02

Raspberry PiからKinesisへのアップロードをbotoでやっていたがfluentdを使うことにした。

As-Is

  • Raspberry PiからAmazon Kinesisへデータを上げる

  • 次の測定をするのに、送信が終わるのを待つ必要があり、測定と送信(による永続化)が密結合になっている

    • ネットワークやkinesisのリクエスト上限などの問題が発生すると待たされたり刺さったりする
    • 送信機能のメンテをしている間は測定ができない

To-Be

  • 測定結果はファイルに追記するところまでをスクリプトの責務とし、追記分の送信はfluentdに任せる。

fluentd

プラグインは以下を使う。

インストール

問題ないと思うので割愛。以下を参考にさせていただいた。

設定

fluent.conf

<source>
 type tail
 path /var/log/sensors
 pos_file /tmp/sensor.log.pos
 tag iot.log
 format json
</source>

<match iot.**>
 type copy
 <store>
  type stdout
 </store>
 <store>
  type kinesis
  stream_name iot
  aws_key_id XXXX
  aws_sec_key XXXX
  region ap-northeast-1
  partition_key kinesis_partition_key

  time_key stream_time
  time_format %s
  flush_interval 1s
 </store>
</match>

  • flush_interval

    • 既定値はない?ようなので、適当に設定。(設定しないと待たされてデバッグが捗らない)
  • partition_key

    • 値でなくキーを指定。

起動

$ fluentd -c fluent.conf -v

センサ値の書き出し

botoの利用をやめて、printでデータを標準出力に書き出す。センサ値読み取りはオレオレモジュールに外出し
(ここに関していえば、pythonである必要性はなくなった・・・)

show_sensors_values.py
import time
import json
import subprocess
import netifaces

from sys import stderr,stdout
import os

from GPIO import *
from DHT  import *
from Analog import *

## disable buffering (equivalent to `python -u`)
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)

# prepare sensor readers
GPIO   = GPIO()
DHT    = DHT()
Analog = Analog()
ANALOG_CHANNELS = {
  'mic'                : 1,
  'ir_standard_analog' : 2,
  'photo'              : 3,
}

# Kinesis conf
stream_name = 'iot'

# main loop
while True:
  time.sleep(1)

  print >> stderr, "reading DHT..."
  # FIX ME: subprocess is necessary because access to DHT requires root!
  dht_process = subprocess.Popen(["sudo", "python", "read_dht.py"], stdout=subprocess.PIPE)
  (dht_json, _) = dht_process.communicate()
  data = json.loads(dht_json)

  print >> stderr, "reading digital sensors..."
  data['ir_tremor_digital'] = GPIO.read(24)
  data['ir_10m_digital']    = GPIO.read(25)

  print >> stderr, "reading analog sensors..."
  for sensor, ch in ANALOG_CHANNELS.iteritems():
      data[sensor] = Analog.read(ch)

  # wrap up

  try:
    data['ipaddr'] = netifaces.ifaddresses('wlan0')[netifaces.AF_INET][0]['addr']
  except:
    data['ipaddr'] = None


  data['sensor_time']                = time.time()
  data['kinesis_partition_key'] = 1  # anything
  data = json.dumps(data)

  print(data)

  • pythonの場合、os.fdopenしないと、標準出力がバッファリングされてしまい、fluentdが期待通り動かなくてハマった。
  • fluent-cat使ってもよいが、fluentdの稼働状況に依存する?気がするので、データのみ標準出力にして、ファイル書き出しにする(そのぶんlogrotateは必要になる)

起動

$ python show_sensors_values.py >> /var/log/sensors

Kinesisからのデータ取得

読み取り側コードは基本的に変更なし。
Kinesisのコンソール画面などでもPUTがあるかないかはだいたいわかる。

OS起動時に起動するようにする

本題ではないが、Raspberry Piを頻繁に起動したり終了したりする場合は、crontabの@rebootを使うのが手っ取り早い気がする。ちゃんとやるなら起動スクリプトを書いたりプロセス監視すべき。

@reboot fluentd -c /path/to/fluent.conf -v
@reboot cd /path/to/script; python show_sensors_values.py >> /var/log/sensors 2> /var/log/sensors.err

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3