前回はRaspberry Pi 3上でSensorTagから環境データを取得するPythonスクリプトを書きました。この環境データはKafkaを経由してストリーム処理する予定です。次にRaspberry Pi 3からメッセージを受け取るKafkaクラスタをクラウド上に構築していきます。
KafkaクラスタはLandoopが開発しているfast-data-devのDockerイメージを使います。
LandoopでKafkaクラスタを構築する
LandoopはKafkaのソリューションを提供する会社です。特にKafka Connectを中心にしたストリームのデータパイプラインの開発に強い印象です。
Landoop
LandoopではKafka Topics UIやKafka Connect UI、Kafka Topics UIなどのWebツールを開発しています。demoサイトからどのようなツールなのかを確認できます。このdemoサイトと同じ環境はfast-data-connect-clusterを使うと簡単に構築することができます。
fast-data-connect-cluster
fast-data-connect-clusterのリポジトリをcloneします。
$ git clone https://github.com/Landoop/fast-data-connect-cluster
リポジトリに含まれるdocker-compose.ymlを少し変更してクラウド上で利用します。Debianの仮想マシンを用意してDockerとDocker Composeをインストールしました。利用するバージョンは以下です。
$ docker --version
Docker version 17.06.0-ce, build 02c1d87
$ docker-compose --version
docker-compose version 1.14.0, build c7bdf9e
このdocker-compose.ymlにはLandoopのWebツールに加えてConfluent Open Sourceに含まれるKafka、Schema Registry、Kafka REST Proxy、Kafka Connect、Apache ZooKeeperが含まれます。一通りKafkaを使った開発に必要なコンテナが揃うのでとても便利です。
現在Confluent Open SourceとKafkaのバージョンは以下になっています。
- Confluent Open Source: v3.2.2
- Kafka v0.10.2.1
docker-compose.ymlの主な変更点です。
- ADV_HOST: Dockerが起動している仮想マシンのパブリックIPアドレスを指定します。
- ports: リモートから接続するためにZooKeeperやKafkaクラスタのポートを公開します。
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=210.xxx.xxx.xxx
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
connect-node-1:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-2:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-node-3:
image: landoop/fast-data-dev-connect-cluster
depends_on:
- kafka-stack
environment:
- ID=01
- BS=kafka-stack:9092
- ZK=kafka-stack:2181
- SR=http://kafka-stack:8081
connect-ui:
image: landoop/kafka-connect-ui:latest
depends_on:
- connect-node-1
environment:
- CONNECT_URL=http://connect-node-1:8083
ports:
- 8000:8000
docker-compose.ymlのディレクトリに移動してコンテナを起動します。
$ cd fast-data-connect-cluster
$ docker-compose up -d
動作確認
Kafkaクライアントから簡単に動作確認をします。kafka-topicsコマンドでトピックを作成します。
$ docker-compose exec kafka-stack kafka-topics \
--create --topic test \
--zookeeper localhost:2181 \
--partitions 1 --replication-factor 1
kafka-console-consumerコマンドを実行します。メッセージがトピックに届くとこのシェルに表示されます。
$ docker-compose exec kafka-stack \
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--from-beginning \
--topic test
別のシェルからkafka-console-producerコマンドを実行します。
$ docker-compose exec kafka-stack \
kafka-console-producer \
--broker-list localhost:9092 \
--topic test
コマンドは待機状態になるので適当なメッセージをシェルに入力します。同じメッセージがkafka-console-consumerのシェルにも表示されます。
hello world
Kafka Topics UIのページではトピックの一覧とメッセージの中身を確認することができます。
SensorTagの環境データをKafkaに送信する
kafka-python
PythonのKafkaクライアントにはkafka-pythonとconfluent-kafka-pythonがあります。APIが微妙に違うので間違えないようにします。今回はkafka-pythonをインストールして使います。
$ sudo pip install kafka-python
前回書いたSensorTagのデータを取得するPythonのコードにKafkaへメッセージを送信するproducerを追加します。
from bluepy.sensortag import SensorTag
import sys
import time
import json
import calendar
from kafka import KafkaProducer
def main():
argvs = sys.argv
argc = len(argvs)
if (argc != 2):
print 'Usage: # python {0} bd_address'.format(argvs[0])
quit()
bid = argvs[1]
print('Connecting to ' + bid)
timeout = 10.0
tag = SensorTag(bid)
tag.IRtemperature.enable()
tag.humidity.enable()
time.sleep(1.0)
producer = KafkaProducer(bootstrap_servers='210.xxx.xxx.xxx:9092')
while True:
tAmb, tObj = tag.IRtemperature.read()
humidity, rh = tag.humidity.read()
value = {
"bid" : bid,
"time" : calendar.timegm(time.gmtime()),
"ambient": tAmb,
"objecttemp": tObj,
"humidity": humidity,
"rh": rh
}
msg = json.dumps(value).encode("utf-8")
producer.send('sensortag', msg)
producer.flush()
print(msg)
time.sleep(timeout)
tag.disconnect()
del tag
if __name__ == '__main__':
main()
SensorTagのBDアドレスを引数にしてPythonスクリプトを実行します。
$ python json_producer_sensortag_kafka.py B0:B4:48:BE:5E:00
10秒間隔で取得した環境データをJSON文字列に整形してKafkaクラスタに送信します。
Connecting to B0:B4:48:BE:5E:00
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464133, "humidity": 26.8096923828125, "objecttemp": 22.0625, "ambient": 26.59375, "rh": 68.829345703125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464143, "humidity": 26.86004638671875, "objecttemp": 22.40625, "ambient": 26.65625, "rh": 68.927001953125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464153, "humidity": 26.92047119140625, "objecttemp": 22.71875, "ambient": 26.71875, "rh": 68.95751953125}
Kafka Topics UIの画面にもSensorTagの環境データが表示されました。