Help us understand the problem. What is going on with this article?

PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 - Part 2: KafkaとLandoop

More than 3 years have passed since last update.

 前回はRaspberry Pi 3上でSensorTagから環境データを取得するPythonスクリプトを書きました。この環境データはKafkaを経由してストリーム処理する予定です。次にRaspberry Pi 3からメッセージを受け取るKafkaクラスタをクラウド上に構築していきます。
 
 KafkaクラスタはLandoopが開発しているfast-data-devのDockerイメージを使います。

LandoopでKafkaクラスタを構築する

 LandoopはKafkaのソリューションを提供する会社です。特にKafka Connectを中心にしたストリームのデータパイプラインの開発に強い印象です。

Landoop 

 
 LandoopではKafka Topics UIKafka Connect UIKafka Topics UIなどのWebツールを開発しています。demoサイトからどのようなツールなのかを確認できます。このdemoサイトと同じ環境はfast-data-connect-clusterを使うと簡単に構築することができます。
 
 
landoop-top.png

fast-data-connect-cluster

 fast-data-connect-clusterのリポジトリをcloneします。

$ git clone https://github.com/Landoop/fast-data-connect-cluster

 
 リポジトリに含まれるdocker-compose.ymlを少し変更してクラウド上で利用します。Debianの仮想マシンを用意してDockerとDocker Composeをインストールしました。利用するバージョンは以下です。

$ docker --version
Docker version 17.06.0-ce, build 02c1d87

$ docker-compose --version
docker-compose version 1.14.0, build c7bdf9e

 このdocker-compose.ymlにはLandoopのWebツールに加えてConfluent Open Sourceに含まれるKafkaSchema RegistryKafka REST ProxyKafka ConnectApache ZooKeeperが含まれます。一通りKafkaを使った開発に必要なコンテナが揃うのでとても便利です。

 現在Confluent Open SourceとKafkaのバージョンは以下になっています。
 
* Confluent Open Source: v3.2.2
* Kafka v0.10.2.1

 docker-compose.ymlの主な変更点です。

  • ADV_HOST: Dockerが起動している仮想マシンのパブリックIPアドレスを指定します。
  • ports: リモートから接続するためにZooKeeperやKafkaクラスタのポートを公開します。
docker-compose.yml
version: '2'
services:
  kafka-stack:
    image: landoop/fast-data-dev
    environment:
      - FORWARDLOGS=0
      - RUNTESTS=0
      - ADV_HOST=210.xxx.xxx.xxx
    ports:
      - 3030:3030
      - 9092:9092
      - 2181:2181
      - 8081:8081
  connect-node-1:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-2:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-node-3:
    image: landoop/fast-data-dev-connect-cluster
    depends_on:
      - kafka-stack
    environment:
      - ID=01
      - BS=kafka-stack:9092
      - ZK=kafka-stack:2181
      - SR=http://kafka-stack:8081
  connect-ui:
    image: landoop/kafka-connect-ui:latest
    depends_on:
      - connect-node-1
    environment:
      - CONNECT_URL=http://connect-node-1:8083
    ports:
      - 8000:8000

 docker-compose.ymlのディレクトリに移動してコンテナを起動します。

$ cd fast-data-connect-cluster
$ docker-compose up -d

動作確認

 Kafkaクライアントから簡単に動作確認をします。kafka-topicsコマンドでトピックを作成します。

$ docker-compose exec kafka-stack kafka-topics \
    --create --topic test \
    --zookeeper localhost:2181 \
    --partitions 1 --replication-factor 1

 kafka-console-consumerコマンドを実行します。メッセージがトピックに届くとこのシェルに表示されます。

$ docker-compose exec kafka-stack \
  kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --from-beginning \
  --topic test

 別のシェルからkafka-console-producerコマンドを実行します。

$ docker-compose exec kafka-stack \
  kafka-console-producer \
  --broker-list localhost:9092 \
  --topic test

 コマンドは待機状態になるので適当なメッセージをシェルに入力します。同じメッセージがkafka-console-consumerのシェルにも表示されます。

hello world

 Kafka Topics UIのページではトピックの一覧とメッセージの中身を確認することができます。
 
landoop-topic.png

SensorTagの環境データをKafkaに送信する

kafka-python

 PythonのKafkaクライアントにはkafka-pythonconfluent-kafka-pythonがあります。APIが微妙に違うので間違えないようにします。今回はkafka-pythonをインストールして使います。

$ sudo pip install kafka-python

 前回書いたSensorTagのデータを取得するPythonのコードにKafkaへメッセージを送信するproducerを追加します。

json_producer_sensortag_kafka.py
from bluepy.sensortag import SensorTag
import sys
import time
import json
import calendar

from kafka import KafkaProducer

def main():
    argvs = sys.argv
    argc = len(argvs)
    if (argc != 2):
        print 'Usage: # python {0} bd_address'.format(argvs[0])
        quit()
    bid = argvs[1]
    print('Connecting to ' + bid)

    timeout = 10.0

    tag = SensorTag(bid)
    tag.IRtemperature.enable()
    tag.humidity.enable()

    time.sleep(1.0)

    producer = KafkaProducer(bootstrap_servers='210.xxx.xxx.xxx:9092')
    while True:
        tAmb, tObj = tag.IRtemperature.read()
        humidity, rh = tag.humidity.read()

        value = {
            "bid" : bid,
            "time" : calendar.timegm(time.gmtime()),
            "ambient": tAmb,
            "objecttemp": tObj,
            "humidity": humidity,
            "rh": rh
        }

        msg = json.dumps(value).encode("utf-8")
        producer.send('sensortag', msg)
        producer.flush()
        print(msg)

        time.sleep(timeout)

    tag.disconnect()
    del tag

if __name__ == '__main__':
    main()

 SensorTagのBDアドレスを引数にしてPythonスクリプトを実行します。

$ python json_producer_sensortag_kafka.py B0:B4:48:BE:5E:00

 10秒間隔で取得した環境データをJSON文字列に整形してKafkaクラスタに送信します。

Connecting to B0:B4:48:BE:5E:00
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464133, "humidity": 26.8096923828125, "objecttemp": 22.0625, "ambient": 26.59375, "rh": 68.829345703125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464143, "humidity": 26.86004638671875, "objecttemp": 22.40625, "ambient": 26.65625, "rh": 68.927001953125}
{"bid": "B0:B4:48:BE:5E:00", "time": 1501464153, "humidity": 26.92047119140625, "objecttemp": 22.71875, "ambient": 26.71875, "rh": 68.95751953125}

 Kafka Topics UIの画面にもSensorTagの環境データが表示されました。

landoop-sensortag.png

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away