0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#2 「Confluent + RabbitMQ」 で IoTデータを ストリーミング処理してみました

Last updated at Posted at 2021-07-16

STEP-2 : RabbitMQ経由のBrokerでのデータ受信確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータをRabbitMQで受信し、該当する Source Connector を使用し、Confluent でストリーミング処理をできることを確認しました。
image.png

以下の3つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

RabbitMQへのアクセス確認

  1. STEP-1 で作成したローカルDocker環境に対して、ブラウザから http://localhost:15672 でアクセスします。
  2. 認証画面が表示jされるので、username: guest 、 Password: guest と入力後「Login」ボタンを押します。認証がOKであれば、Overview 画面が表示されます。
    image.png

IoTデータ生成Pythonプログラム

  1. データを RabbitMQ に送信するプログラムは以下となります。
IoTSampleData-v5.py
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import pika

# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase

# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス名
            'section': random.choice(section),  # IoT機器セクション
            'iot_num': fake.zipcode(),          # IoT機器番号
            'iot_state': fake.prefecture(),     # IoT設置場所
            'vol_1': random.uniform(100, 200),  # IoT値−1
            'vol_2': random.uniform(50, 90)     # IoT値−2
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン : ターミナル出力用
def tm_main(count, proc, wait):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    # pprint.pprint(json_dict)
    for item in json_dict['items']:
        print(item)
        time.sleep(wait)


# メイン : IoTHub 出力用
def mq_main(count, proc, wait):
    print('IoTHub 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    channel, properties = rabbitmq_init()
    for i, item in enumerate(json_dict['items']):
        message = json.dumps(item).encode('utf-8')
        
        try:
            print("Sending message: {}".format(message))
            properties.message_id = str(i)
            channel.basic_publish(exchange='', routing_key='IoTHub', body=message, properties=properties)
            time.sleep(wait)
        except KeyboardInterrupt:
            print ( "MQTT Client Interrupt Stopped" )
            break

# MQTT(RabbitMQ) 接続定義
def rabbitmq_init():
    pika_param = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(pika_param)
    channel = connection.channel()
    channel.queue_declare(queue='IoTHub')
    properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')

    return channel, properties


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ mq(MQTT出力)')
    parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'mq'): 
        mq_main(args.count, args.proc, args.wait)
    else :
        tm_main(args.count, args.proc, args.wait)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間:{0}".format(making_time) + " [sec]")
    print("")

プログラムのパラメータ情報を確認します。

$ python IoTSampleData-v5.py -h

usage: IoTSampleData-v5.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]

IoT機器のなんちゃってダミーデータの生成

optional arguments:
  -h, --help     show this help message and exit
  --count COUNT  データ作成件数
  --proc PROC    データ作成プロセス名
  --mode MODE    tm(ターミナル出力)/ mq(MQTT出力)
  --wait WAIT    データWait時間(x.x秒)

rabbitMQにデータ送信するためには、「--mode mq」のパラメータが必要になります。

IoTデータ生成プログラムからRabbitMQへのデータ送信

  1. 上記プログラム内で「routing_key='IoTHub'」と定義しているので、このプログラムを実行させると、RabbitMQ に「IoTHub」というキューが自動作成されます。このキューにデータが蓄積されます。
  2. プログラムを実行します(データ送信先:MQ、 データ送信件数:5件、 データ送信間隔:1秒)。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T08:21:52.210384", "proc": "111", "section": "E", "iot_num": "752-9500", "iot_state": "\\u6803\\u6728\\u770c", "vol_1": 125.1396008658748, "vol_2": 64.62129087138834}'
Sending message: b'{"id": 1, "time": "2021-07-14T08:21:52.210467", "proc": "111", "section": "K", "iot_num": "810-7586", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 107.10688686955699, "vol_2": 78.66865000214042}'
Sending message: b'{"id": 2, "time": "2021-07-14T08:21:52.210487", "proc": "111", "section": "R", "iot_num": "399-6917", "iot_state": "\\u9999\\u5ddd\\u770c", "vol_1": 180.24083536198077, "vol_2": 79.81508156740844}'
Sending message: b'{"id": 3, "time": "2021-07-14T08:21:52.210505", "proc": "111", "section": "G", "iot_num": "152-1350", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 188.35767194882712, "vol_2": 80.33187715056255}'
Sending message: b'{"id": 4, "time": "2021-07-14T08:21:52.210519", "proc": "111", "section": "N", "iot_num": "842-1028", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 170.42473551436927, "vol_2": 52.295427164150915}'

データ作成件数:5
データ作成時間:5.029109001159668 [sec]

3. RabbitMQにブラウズアクセスし、「IoTHub」キューにデータが蓄積されていることを確認します。
image.png
image.png

データ受信トピックの作成

  1. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
    image.png

  2. 画面左側から「Topics」を選択し、新たに表示される「All topics」画面の右側にある「+ Add a topic」ボタンを押します。その後に表示される「New topic」画面の「Topic name」に「topic_201」を入力し、「Create with defaults」ボタンを押します。
    image.png

コネクタの作成

  1. 続いて、画面左側から「Connect」を選択し、新たに表示される「All Connect Clusters」画面から「connect-default」を選択し、その後に表示される「Connectors」画面から「Add connector」ボタンを押します。
    image.png

  2. 新たに表示される「Browse」画面から「RabbitMQSourceConnector」を選択します。その後に表示される「Add Connector」画面の「RabbitMQSourceConnector_1」というコネクタ名を入力します。
    image.png

  3. コネクタ名を入力すると、入力項目が増えますので、以下の内容を入力し、最後に「Continu」ボタンを押します。

カテゴリ 入力項目
Common tasks.max 1
Connection rabbitmq.host rabbitmq
Connection rabbitmq.username guest
Connection rabbitmq.password guest
Connection rabbitmq.port 5672
Source kafka.topic topic_201
Source rabbitmq.queue IoTHub

4. 設定内容に問題なければ、「Launch」ボタンを押します。
image.png

トピックでのデータ受信確認

  1. 上記にてコネクタの作成が完了すると、RabbitMQ の「IoTHub」キューに蓄積されたデータが、データ受信トピック「topic_201」で確認できます。「Topics」ー「All topics」ー「topic_201」の順で新たに表示される画面上部の「Message」を選択します。このMessage画面の「Jump to offset」の横の欄に「0」を入力し「0 / Partition:0」を選択します。
    image.png

これで、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信できることを確認できました。

ストリーミング処理を行うためのデータ受信確認

  1. 「topic_201」でデータ受信を確認できましたが、「value」が1つの値としてのデータとなっているので、ストリーミング処理のために意図する項目毎に値を取得できるようにデータ受信する必要があります。
    image.png

  2. 「Connector」ー「Connectors」ー「RabbitMQSourceConnector_1」の順で新たに表示される画面上部の「Settings」を選択し、コネクタの以下の項目に値を追加し、最後に「Continu」ボタンを押します。
    image.png

カテゴリ 入力項目
Common Value converter class org.apache.kafka.connect.converters.ByteArrayConverter

3. 設定内容に問題なければ、「Launch」ボタンを押します。
image.png

4. IoTデータ生成プログラムを実行し新たにデータを送信します。

$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T14:07:30.776174", "proc": "111", "section": "L", "iot_num": "319-1762", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 157.6113944081462, "vol_2": 71.87482721564037}'
Sending message: b'{"id": 1, "time": "2021-07-14T14:07:30.776255", "proc": "111", "section": "E", "iot_num": "606-3548", "iot_state": "\\u5927\\u5206\\u770c", "vol_1": 101.4282144148702, "vol_2": 54.41472842473606}'
Sending message: b'{"id": 2, "time": "2021-07-14T14:07:30.776276", "proc": "111", "section": "J", "iot_num": "846-2110", "iot_state": "\\u9ce5\\u53d6\\u770c", "vol_1": 162.78211638795443, "vol_2": 76.92165187757502}'
Sending message: b'{"id": 3, "time": "2021-07-14T14:07:30.776294", "proc": "111", "section": "Y", "iot_num": "196-8432", "iot_state": "\\u798f\\u4e95\\u770c", "vol_1": 118.78173693261178, "vol_2": 74.39179273564206}'
Sending message: b'{"id": 4, "time": "2021-07-14T14:07:30.776309", "proc": "111", "section": "K", "iot_num": "881-3972", "iot_state": "\\u798f\\u5cf6\\u770c", "vol_1": 198.90396115859022, "vol_2": 59.419668368865416}'

データ作成件数:5
データ作成時間:5.025936841964722 [sec]

5. ストリーミング処理のために意図する項目毎に値を取得できているかどうか受信データを確認します。「Topics」ー「All topics」ー「topic_201」の順で新たに表示される画面上部の「Message」を選択します。このMessage画面の「Jump to offset」の横の欄に「10」を入力し「10 / Partition:0」を選択します。
image.png

これで、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信でき、かつ、ストリーミング処理のために意図する項目毎に値を取得できていることを確認できました。
次のステップでは、「topic_201」のデータをストリーミング処理(クエリ処理)し、データ抽出できることを確認してみます。

次のステップへの準備

次のステップを実施する前に「topic_201」の全てのデータを削除しておきます。以下のコマンドを実行ください。

$ docker exec -it broker /bin/bash

[appuser@broker ~]$ 
[appuser@broker ~]$ /bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic topic_201
Topic topic_201 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[appuser@broker ~]$ exit
$

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?