Kafka
Landoop
SchemaRegistry
ApacheAvro

PythonとSensorTag, Kafka, Spark Streamingのストリーム処理 - Part 3: Apache AvroとSchema Registry

More than 1 year has passed since last update.

 Landoopが提供するfast-data-devのDockerイメージにはConfluent Open SourceSchema RegistryとWebツールのSchema Registry UIが含まれています。前回SensorTagのデータはJSONフォーマットでKafkaへ送信していましたがApache Avroフォーマットも試してみます。Apache Avroはデータのシリアル化と言語に依存しないスキーマによるデータ交換の仕組みを提供します。Schema RegistryはREST APIから操作できるAvroスキーマを一元管理するためのストレージです。

Schema Registry

 ローカルにあるAvroスキーマファイルを利用してデータをシリアライズすることもできますが、Schema Registryで一元管理することでAvroメッセージをデシリアライズする側も共通のデータフォーマットを参照することができます。

Schema Registry UI

 fast-data-devのトップページからSCHEMASをクリックするとSchema Registry UIのページが開きます。左上にあるNEWボタンをクリックするとAvroスキーマを記述するエディタが起動します。

schema-registry.png

 Schema Registry UIのエディタでAvroスキーマを記述します。保存する前にバリデーションを実行するため記述したJSONが正しいフォーマットか確認できます。

 フォームのSubject Nameはvalueスキーマの場合topic名-valueと書くようです。SensorTagからAvroフォーマットで送信するtopic名はsensortag-avroなので、この場合はsensortag-avro-valueになります。SchemaのフィールドにSensorTag用のAvroスキーマをJSONフォーマットで記述します。

{
  "type": "record",
  "name": "SensorAvroValue",
  "fields": [
    {
      "name": "bid",
      "type": "string"
    },
    {
      "name": "time",
      "type": "long"
    },
    {
      "name": "ambient",
      "type": "double"
    },
    {
      "name": "objecttemp",
      "type": "double"
    },
    {
      "name": "humidity",
      "type": "double"
    },
    {
      "name": "rh",
      "type": "double"
    }
  ]
}

Raspberry Pi 3

 前回Raspberry Pi 3ではKafkaのPythonクライアントとしてkafka-pythonを利用しました。今回はAvroフォーマットに対応しているconfluent-kafka-pythonを使います。

librdkafkaのインストール

 confluent-kafka-pythonのインストールにはlibrdkafkaのヘッダが必要です。先にlibrdkafkaをビルドして共有ライブラリ情報を更新します。

$ sudo apt-get update && sudo apt-get install git build-essential -y
$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure 
$ make && sudo make install
$ sudo ldconfig

confluent-kafkaのインストール

 Pythonのヘッダファイルも必要です。Avroフォーマットを利用する場合pipパッケージ名はconfluent-kafka[avro]になります。Avroが不要な場合はconfluent-kafkaです。

$ sudo apt-get update && sudo apt-get install python-dev -y
$ sudo pip install confluent-kafka[avro]

Avro Producer

 オフィシャルのconfluent-kafka-pythonのページにあるコードを参考にAvro Producerを書きます。公式サンプルではローカルにあるスキーマファイルを利用しています。スキーマをSchema Registryから取得する機能は実装されていないようなので、ちょっと手間ですがSchema Registryから直接REST APIでスキーマを文字列として取得します。

avro_producer_sensortag.py
from bluepy.sensortag import SensorTag
import sys
import time
import calendar
import requests
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

def main():
    argvs = sys.argv
    argc = len(argvs)
    if (argc != 2):
        print 'Usage: # python {0} bd_address'.format(argvs[0])
        quit()
    bid = argvs[1]
    print('Connecting to ' + bid)

    timeout = 10.0

    tag = SensorTag(bid)
    tag.IRtemperature.enable()
    tag.humidity.enable()

    time.sleep(1.0)

    get_schema_req_data = requests.get(
        "http://<fast-data-devのIPアドレス>:8081/subjects/sensortag-avro-value/versions/latest")
    get_schema_req_data.raise_for_status()

    schema_string = get_schema_req_data.json()['schema']
    value_schema = avro.loads(schema_string)

    avroProducer = AvroProducer({
        'api.version.request':True,
        'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
        'schema.registry.url': '<fast-data-devのIPアドレス>:8081'
    }, default_value_schema=value_schema)

    while True:
        tAmb, tObj = tag.IRtemperature.read()
        humidity, rh = tag.humidity.read()

        value = {
            "bid" : bid,
            "time" : calendar.timegm(time.gmtime()),
            "ambient": tAmb,
            "objecttemp": tObj,
            "humidity": humidity,
            "rh": rh
        }

        avroProducer.produce(topic='sensortag-avro', value=value)
        avroProducer.flush()
        print(value)
        time.sleep(timeout)

    tag.disconnect()
    del tag

if __name__ == '__main__':
    main()

 SensorTagのBDアドレスをhcitoolを使い確認します。

$ sudo hcitool lescan
LE Scan ...
...
B0:B4:48:BE:5E:00 CC2650 SensorTag
...

 BDアドレスを引数にして作成したPythonスクリプトを実行します。

$ python avro_producer_sensortag.py <SensorTagのBDアドレス>

 以下のようなログを出力してKafkaブローカーへメッセージ送信を開始します。

{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495463, 'humidity': 27.04132080078125, 'objecttemp': 22.5, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495475, 'humidity': 27.02117919921875, 'objecttemp': 22.75, 'ambient': 26.84375, 'rh': 69.05517578125}
{'bid': 'B0:B4:48:BE:5E:00', 'time': 1501495486, 'humidity': 27.04132080078125, 'objecttemp': 22.96875, 'ambient': 26.84375, 'rh': 69.05517578125}

 

Avro Consumer

 Avro Consumerのコードはconfluent-kafka-pythonにあるサンプルをそのまま使います。

avro_consumer_sensortag.py
import requests
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

c = AvroConsumer({
    'api.version.request':True,
    'bootstrap.servers': '<fast-data-devのIPアドレス>:9092',
    'group.id': 'raspiavro',
    'schema.registry.url': 'http://<fast-data-devのIPアドレス>:8081'})
c.subscribe(['sensortag-avro'])

running = True
while running:
    try:
        msg = c.poll(10)
        print(msg)
        if msg:
            if not msg.error():
                print(msg.value())
            elif msg.error().code() != KafkaError._PARTITION_EOF:
                print(msg.error())
                running = False
    except SerializerError as e:
        print("Message deserialization failed for %s: %s" % (msg, e))
        running = False

c.close()

 作成したPythoのスクリプトを実行します。

$ python avro_consumer_sensortag.py

 サンプルでは10秒間隔でpollingしています。タイミングがあわないとデータを取得できないためNoneが返ります。

<cimpl.Message object at 0x7655de88>
<cimpl.Message object at 0x764ee6f0>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495204L, u'humidity': 27.27294921875, u'objecttemp': 22.78125, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x7655de88>
None
<cimpl.Message object at 0x7655de88>
{u'bid': u'B0:B4:48:BE:5E:00', u'time': 1501495215L, u'humidity': 27.26287841796875, u'objecttemp': 22.9375, u'ambient': 27.09375, u'rh': 69.671630859375}
<cimpl.Message object at 0x747caa98>

kafka-avro-console-consumer

 最後にサーバー側でもkafka-avro-console-consumerコマンドからメッセージを取得してみます。

$ docker-compose exec kafka-stack \
  kafka-avro-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic sensortag-avro

 こちらも同様にSensorTagのデータを取得することができます。

{"bid":"B0:B4:48:BE:5E:00","time":1501495384,"ambient":26.9375,"objecttemp":22.96875,"humidity":27.11181640625,"rh":69.05517578125}
{"bid":"B0:B4:48:BE:5E:00","time":1501495396,"ambient":26.90625,"objecttemp":22.6875,"humidity":27.0916748046875,"rh":69.05517578125}