LoginSignup
13
12

More than 5 years have passed since last update.

Raspberry PiからAmazon Kinesisへデータを上げる

Last updated at Posted at 2015-01-19

基板の選定

主に以下の理由で、Arduino Unoを使っていたのをRaspberry Pi B+にした。
Arduinoだと

  • プログラムがC++/Processing風言語に固定される
  • メインループが一つ
  • JSONのサイズが大きいと送出がうまくいかないなど何かと動きが不明
  • プログラム修正のたびに端末を触りに行く必要がある

がんばればそれぞれなんとかなるかもしれないが、Raspberry Piだと最初からLinuxなので、

  • LLや既存パッケージが使える
  • プロセス管理できる
  • SSHログインできる
  • Chefなどでのオーケストレーションを見込める

などスケールがききそう。

Raspberry Piの設定

Raspbian

Raspbian 2014-12-24をDLしてセットアップする。(Pidoraでもいいかもしれないが未検証)
手順はここが参考になったので詳細割愛。

boto

AWSへの接続を行うためpythonライブラリのbotoをインストールする。
(できればfluentdのpluginを入れて設定一枚で実現するほうがスクリプト書くより楽そうではある)

apt-get install python-botoで入るのが2.3.0-1と古いので、pipで上げる。

# curl https://bootstrap.pypa.io/get-pip.py | python
# pip install boto --upgrade
$ python 
Python 2.7.3 (default, Mar 18 2014, 05:13:23) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto
>>> boto.__version__
'2.35.1'

WiringPi

Raspberry Piの入出力制御はGPIOへのechocatを介して行うが、sudo権限を要する。WiringPiを使うとnon-rootでGPIOへのアクセスが可能になる。(それはそれでいいのかって気もするが)

$ git clone git://git.drogon.net/wiringPi
$ cd wiringPi/
$ ./build 

$ watch -n1 gpio readall
 +-----+-----+---------+------+---+--B Plus--+---+------+---------+-----+-----+
 | BCM | wPi |   Name  | Mode | V | Physical | V | Mode | Name    | wPi | BCM |
 +-----+-----+---------+------+---+----++----+---+------+---------+-----+-----+
 |     |     |    3.3v |      |   |  1 || 2  |   |      | 5v      |     |     |
 |   2 |   8 |   SDA.1 |   IN | 1 |  3 || 4  |   |      | 5V      |     |     |
 |   3 |   9 |   SCL.1 |   IN | 1 |  5 || 6  |   |      | 0v      |     |     |
 |   4 |   7 | GPIO. 7 |   IN | 1 |  7 || 8  | 1 | ALT0 | TxD     | 15  | 14  |
 |     |     |      0v |      |   |  9 || 10 | 1 | ALT0 | RxD     | 16  | 15  |
 |  17 |   0 | GPIO. 0 |   IN | 0 | 11 || 12 | 0 | IN   | GPIO. 1 | 1   | 18  |
 |  27 |   2 | GPIO. 2 |   IN | 0 | 13 || 14 |   |      | 0v      |     |     |
 |  22 |   3 | GPIO. 3 |   IN | 0 | 15 || 16 | 0 | IN   | GPIO. 4 | 4   | 23  |
 |     |     |    3.3v |      |   | 17 || 18 | 0 | IN   | GPIO. 5 | 5   | 24  |
 |  10 |  12 |    MOSI |   IN | 0 | 19 || 20 |   |      | 0v      |     |     |
 |   9 |  13 |    MISO |   IN | 0 | 21 || 22 | 0 | IN   | GPIO. 6 | 6   | 25  |
 |  11 |  14 |    SCLK |   IN | 0 | 23 || 24 | 1 | IN   | CE0     | 10  | 8   |
 |     |     |      0v |      |   | 25 || 26 | 1 | IN   | CE1     | 11  | 7   |
 |   0 |  30 |   SDA.0 |   IN | 1 | 27 || 28 | 1 | IN   | SCL.0   | 31  | 1   |
 |   5 |  21 | GPIO.21 |   IN | 1 | 29 || 30 |   |      | 0v      |     |     |
 |   6 |  22 | GPIO.22 |   IN | 1 | 31 || 32 | 0 | IN   | GPIO.26 | 26  | 12  |
 |  13 |  23 | GPIO.23 |   IN | 0 | 33 || 34 |   |      | 0v      |     |     |
 |  19 |  24 | GPIO.24 |   IN | 0 | 35 || 36 | 0 | IN   | GPIO.27 | 27  | 16  |
 |  26 |  25 | GPIO.25 |   IN | 0 | 37 || 38 | 0 | IN   | GPIO.28 | 28  | 20  |
 |     |     |      0v |      |   | 39 || 40 | 0 | IN   | GPIO.29 | 29  | 21  |
 +-----+-----+---------+------+---+----++----+---+------+---------+-----+-----+
 | BCM | wPi |   Name  | Mode | V | Physical | V | Mode | Name    | wPi | BCM |
 +-----+-----+---------+------+---+--B Plus--+---+------+---------+-----+-----+

ピンの物理配置、名前と一緒に表示されるため混同するが、gpio番号は左右両端のBCM列で数える。
例えばGPIO 24を入力に使う場合は、 右側のBCM列が24になっているところでModeINになっているのを確認(左下のGPIO.24は無視)
OUTになっていたらgpio -g mode 25 inのようにして入力に設定。

センサ接続

各種センサの出力を、入力になっているGPIOに接続する。
(回路図・写真などは割愛)

センサによって読み取り方が違う。
シェルスクリプトでもよいが後々のメンテナンスがしやすいようにモジュールを作っておく。

デジタルセンサ

デジタルセンサの場合、watch -n1 gpio readallの出力をみながらセンサの前で手を振ってみると値が0,1で振れてわかりやすい。

GPIO.py
import subprocess
class GPIO(object):
    def read(self, pin):
       # print "reading", pin
       p = subprocess.Popen(["gpio", "-g", "read", str(pin)], stdout=subprocess.PIPE)
       (stdoutdata, _) = p.communicate()
       return int(stdoutdata)

    def write(self, pin, value):
       # print "writing", pin, value
       p = subprocess.Popen(["gpio", "-g", "write", str(pin), str(value)], stdout=subprocess.PIPE)
       p.communicate()

    def mode(self, pin, mode):
       # print "setting mode", pin, mode
       p = subprocess.Popen(["gpio", "-g", "mode", str(pin), str(mode)], stdout=subprocess.PIPE)
       p.communicate()

アナログセンサ

Raspberry Piはアナログ入力を読めない。
MCP3208などのADコンバータの入力にセンサをつなぎ、その出力を通して送られてくるデジタルのパルスを読む。
配線はこちらを参考に(ただしB+のピン配置は違う
RPi.GPIOを利用した例をそのまま参考にさせていただいた。

Analog.py
import sys
import time

from GPIO import *
import RPi.GPIO

OUT  = 'out' 
IN   = 'in' 
HIGH = 1
LOW  = 0

spi_clk  = 11
spi_mosi = 10
spi_miso = 9
spi_ss   = 8

class Analog():
  def __init__(self):
    self.GPIO = GPIO()
    RPi.GPIO.setwarnings(False)
    RPi.GPIO.setmode(RPi.GPIO.BCM)

    self.GPIO.mode(spi_mosi, OUT)
    self.GPIO.mode(spi_miso, IN)
    self.GPIO.mode(spi_clk,  OUT)
    self.GPIO.mode(spi_ss,   OUT)

  def read(self, ch):
    self.GPIO.write(spi_ss,   LOW)
    self.GPIO.write(spi_clk,  LOW)
    self.GPIO.write(spi_mosi, LOW)
    self.GPIO.write(spi_clk,  HIGH)
    self.GPIO.write(spi_clk,  LOW)

    cmd = (ch | 0x18) << 3
    for i in range(5):
      if (cmd & 0x80):
        self.GPIO.write(spi_mosi,   HIGH)
      else:
        self.GPIO.write(spi_mosi,   LOW)
      cmd <<= 1
      self.GPIO.write(spi_clk,   HIGH)
      self.GPIO.write(spi_clk,   LOW)
    self.GPIO.write(spi_clk, HIGH)
    self.GPIO.write(spi_clk, LOW)
    self.GPIO.write(spi_clk, HIGH)
    self.GPIO.write(spi_clk, LOW)

    value = 0
    for i in range(12):
        value <<= 1
        self.GPIO.write(spi_clk, HIGH)
        if (self.GPIO.read(spi_miso)):
            value |= 0x1
        self.GPIO.write(spi_clk, LOW)

    self.GPIO.write(spi_ss, HIGH)
    return value

温湿度センサ

これもパルス出力を読む必要があるが、RHT03を使う場合、Adafruitのモジュールで読める。残念ながらnon-rootで実行する方法がわからなかった。

$ git clone https://github.com/adafruit/Adafruit-Raspberry-Pi-Python-Code.git
$ cd Adafruit-Raspberry-Pi-Python-Code/
$ cd Adafruit_DHT_Driver_Python/
$ sudo python setup.py install
# must be executed as root
import dhtreader
import json

class DHT(object):
  def read(self, type, pin):
    dhtreader.init()
    raw = dhtreader.read(type, pin)
    result = {}
    if raw is not None:
      temp, humid = raw
      result = {
        'temp'  : temp,
        'humid' : humid
      }
    return json.dumps(result)

Kinesisへのデータ送信

作ったモジュールを組み合わせ、データをまとめてKinesisへ送信する。

from boto import kinesis
import time
import json
import subprocess
from GPIO import *
from DHT  import *
from Analog import *

# set up sensor readers
GPIO   = GPIO()
DHT    = DHT()
Analog = Analog()
ANALOG_CHANNELS = {
  'mic'                : 1,
  'ir_standard_analog' : 2,
  'photo'              : 3,
}

# Kinesis conf
stream_name = 'iot'

print "connecting to Kinesis..."
conn = kinesis.connect_to_region(region_name = "ap-northeast-1")
print "connected to Kinesis."

# main loop
while True:

  # read DHT
  # FIX ME: subprocess is necessary because access to DHT requires root!
  dht_process = subprocess.Popen(["sudo", "python", "read_dht.py"], stdout=subprocess.PIPE)
  (dht_json, _) = dht_process.communicate()
  data = json.loads(dht_json)

  # read digital
  data['ir_tremor_digital'] = GPIO.read(24)
  data['ir_10m_digital']    = GPIO.read(25)

  # read analog
  for sensor, ch in ANALOG_CHANNELS.iteritems():
      data[sensor] = Analog.read(ch)

  # wrap up
  data['at']                = time.time()
  data = json.dumps(data)
  print data

  # send
  shard = conn.put_record(
    stream_name, data,
    partition_key = 'partition_key'
  )


今はデータが来なくなったら起動、の手運用。頻発するようなら、プロセス起動していなかった自動起動、とかやりたい。

Kinesisからのデータ取得

~/.botopython-boto導入済みのコンテナorサーバーで、shard iteratorを回してレコードを読み続けるループを書く。

read_stream.py
from boto import kinesis
import time

conn = kinesis.connect_to_region(region_name = "ap-northeast-1")
shard_id = conn.describe_stream('iot')['StreamDescription']['Shards'][0]['ShardId']

shard_iterator_id = conn.get_shard_iterator('iot',shard_id,'LATEST')['ShardIterator']
while True:
  time.sleep(1)

  SequenceNumber = None
  print 'polling..'
  records = conn.get_records(shard_iterator_id)['Records']

  for record in records:
    print record
    SequenceNumber = record['SequenceNumber']

  if SequenceNumber is not None:
    shard_iterator_id = conn.get_shard_iterator('iot',shard_id,'AFTER_SEQUENCE_NUMBER', SequenceNumber)['ShardIterator']

$ python read_stream.py
polling..
polling..
polling..
polling..
polling..
polling..
polling..
polling..
polling..
{u'PartitionKey': u'partition_key', u'Data': u'{"ir_10m_digital": 0, "photo": 3562, "mic": 2431, "ir_standard_analog": 3011, "at": 1421648473.138328, "ir_tremor_digital": 0}', u'SequenceNumber': u'49546816267976975579015210414312564458569839141741133826'}
polling..
polling..
polling..
polling..
polling..
polling..
polling..
{u'PartitionKey': u'partition_key', u'Data': u'{"humid": 34.599998474121094, "temp": 26.600000381469727, "ir_10m_digital": 1, "photo": 3567, "mic": 2432, "ir_standard_analog": 4095, "at": 1421648481.605487, "ir_tremor_digital": 1}', u'SequenceNumber': u'49546816267976975579015210414313773384389454320671653890'}
polling..

ストリームの最新のデータが流れてくる。Kinesisは24h以上データを保持しないので、永続化するならDynamoDBなどに流す。(何を使うのがよいかはよくわかってない)

訂正。最後のshard_iterator_idの取り直しは以下のようにしたほうがよさそうだ。
docs

shard_iterator_id = conn.get_records(shard_iterator_id)['NextShardIterator']

GPIOアクセスの代替

RPi.GPIO

そもそもrootを使うことが気にならない場合は、Raspbian標準のRPi.GPIOライブラリ(Python)を使うほうがsubprocess.Popenより速くてよさそう。

WiringPi2-Python

subprocess.Popenで直接gpioコマンドを叩くかわりに、PythonライブラリとしてWiringPi2-Pythonがあるが、GPIOの読み書きが想定通りできなかった。
B+未対応と思われる部分があり、B+でなければちゃんと動くのかもしれないので、一応導入方法を記す。

$ sudo apt-get install python-dev
$ sudo apt-get install python-setuptools 
$ git clone https://github.com/Gadgetoid/WiringPi2-Python.git
$ cd WiringPi2-Python
$ sudo python setup.py install
13
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
12