STEP-2 : RabbitMQ経由のBrokerでのデータ受信確認
概要
Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータをRabbitMQで受信し、該当する Source Connector を使用し、Confluent でストリーミング処理をできることを確認しました。
以下の3つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認
ローカル環境
macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)
RabbitMQへのアクセス確認
- STEP-1 で作成したローカルDocker環境に対して、ブラウザから http://localhost:15672 でアクセスします。
- 認証画面が表示jされるので、username: guest 、 Password: guest と入力後「Login」ボタンを押します。認証がOKであれば、Overview 画面が表示されます。
IoTデータ生成Pythonプログラム
- データを RabbitMQ に送信するプログラムは以下となります。
import random
import json
import time
from datetime import date, datetime
import argparse
import string
from faker.factory import Factory
import pika
# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")
# IoT機器のダミーセクション(大文字アルファベットを定義)
section = string.ascii_uppercase
# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
iot_items = json.dumps({
'items': [{
'id': i, # id
'time': generate_time(), # データ生成時間
'proc': proc, # データ生成プロセス名
'section': random.choice(section), # IoT機器セクション
'iot_num': fake.zipcode(), # IoT機器番号
'iot_state': fake.prefecture(), # IoT設置場所
'vol_1': random.uniform(100, 200), # IoT値−1
'vol_2': random.uniform(50, 90) # IoT値−2
}
for i in range(count)
]
}, ensure_ascii=False).encode('utf-8')
return iot_items
# IoT機器で計測されたダミーデータの生成時間
def generate_time():
dt_time = datetime.now()
gtime = json_trans_date(dt_time)
return gtime
# date, datetimeの変換関数
def json_trans_date(obj):
# 日付型を文字列に変換
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# 上記以外は対象外.
raise TypeError ("Type %s not serializable" % type(obj))
# メイン : ターミナル出力用
def tm_main(count, proc, wait):
print('ターミナル 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
# pprint.pprint(json_dict)
for item in json_dict['items']:
print(item)
time.sleep(wait)
# メイン : IoTHub 出力用
def mq_main(count, proc, wait):
print('IoTHub 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
channel, properties = rabbitmq_init()
for i, item in enumerate(json_dict['items']):
message = json.dumps(item).encode('utf-8')
try:
print("Sending message: {}".format(message))
properties.message_id = str(i)
channel.basic_publish(exchange='', routing_key='IoTHub', body=message, properties=properties)
time.sleep(wait)
except KeyboardInterrupt:
print ( "MQTT Client Interrupt Stopped" )
break
# MQTT(RabbitMQ) 接続定義
def rabbitmq_init():
pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='IoTHub')
properties = pika.BasicProperties(content_type='application/json', delivery_mode=1, priority=1, content_encoding='utf-8')
return channel, properties
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
parser.add_argument('--count', type=int, default=10, help='データ作成件数')
parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ mq(MQTT出力)')
parser.add_argument('--wait', type=float, default=1, help='データWait時間(x.x秒)')
args = parser.parse_args()
start = time.time()
if (args.mode == 'mq'):
mq_main(args.count, args.proc, args.wait)
else :
tm_main(args.count, args.proc, args.wait)
making_time = time.time() - start
print("")
print(f"データ作成件数:{args.count}")
print("データ作成時間:{0}".format(making_time) + " [sec]")
print("")
プログラムのパラメータ情報を確認します。
$ python IoTSampleData-v5.py -h
usage: IoTSampleData-v5.py [-h] [--count COUNT] [--proc PROC] [--mode MODE] [--wait WAIT]
IoT機器のなんちゃってダミーデータの生成
optional arguments:
-h, --help show this help message and exit
--count COUNT データ作成件数
--proc PROC データ作成プロセス名
--mode MODE tm(ターミナル出力)/ mq(MQTT出力)
--wait WAIT データWait時間(x.x秒)
rabbitMQにデータ送信するためには、「--mode mq」のパラメータが必要になります。
IoTデータ生成プログラムからRabbitMQへのデータ送信
- 上記プログラム内で「routing_key='IoTHub'」と定義しているので、このプログラムを実行させると、RabbitMQ に「IoTHub」というキューが自動作成されます。このキューにデータが蓄積されます。
- プログラムを実行します(データ送信先:MQ、 データ送信件数:5件、 データ送信間隔:1秒)。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T08:21:52.210384", "proc": "111", "section": "E", "iot_num": "752-9500", "iot_state": "\\u6803\\u6728\\u770c", "vol_1": 125.1396008658748, "vol_2": 64.62129087138834}'
Sending message: b'{"id": 1, "time": "2021-07-14T08:21:52.210467", "proc": "111", "section": "K", "iot_num": "810-7586", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 107.10688686955699, "vol_2": 78.66865000214042}'
Sending message: b'{"id": 2, "time": "2021-07-14T08:21:52.210487", "proc": "111", "section": "R", "iot_num": "399-6917", "iot_state": "\\u9999\\u5ddd\\u770c", "vol_1": 180.24083536198077, "vol_2": 79.81508156740844}'
Sending message: b'{"id": 3, "time": "2021-07-14T08:21:52.210505", "proc": "111", "section": "G", "iot_num": "152-1350", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 188.35767194882712, "vol_2": 80.33187715056255}'
Sending message: b'{"id": 4, "time": "2021-07-14T08:21:52.210519", "proc": "111", "section": "N", "iot_num": "842-1028", "iot_state": "\\u6ecb\\u8cc0\\u770c", "vol_1": 170.42473551436927, "vol_2": 52.295427164150915}'
データ作成件数:5
データ作成時間:5.029109001159668 [sec]
3. RabbitMQにブラウズアクセスし、「IoTHub」キューにデータが蓄積されていることを確認します。
データ受信トピックの作成
-
Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
-
画面左側から「Topics」を選択し、新たに表示される「All topics」画面の右側にある「+ Add a topic」ボタンを押します。その後に表示される「New topic」画面の「Topic name」に「topic_201」を入力し、「Create with defaults」ボタンを押します。
コネクタの作成
-
続いて、画面左側から「Connect」を選択し、新たに表示される「All Connect Clusters」画面から「connect-default」を選択し、その後に表示される「Connectors」画面から「Add connector」ボタンを押します。
-
新たに表示される「Browse」画面から「RabbitMQSourceConnector」を選択します。その後に表示される「Add Connector」画面の「RabbitMQSourceConnector_1」というコネクタ名を入力します。
-
コネクタ名を入力すると、入力項目が増えますので、以下の内容を入力し、最後に「Continu」ボタンを押します。
カテゴリ | 入力項目 | 値 |
---|---|---|
Common | tasks.max | 1 |
Connection | rabbitmq.host | rabbitmq |
Connection | rabbitmq.username | guest |
Connection | rabbitmq.password | guest |
Connection | rabbitmq.port | 5672 |
Source | kafka.topic | topic_201 |
Source | rabbitmq.queue | IoTHub |
4. 設定内容に問題なければ、「Launch」ボタンを押します。
トピックでのデータ受信確認
- 上記にてコネクタの作成が完了すると、RabbitMQ の「IoTHub」キューに蓄積されたデータが、データ受信トピック「topic_201」で確認できます。「Topics」ー「All topics」ー「topic_201」の順で新たに表示される画面上部の「Message」を選択します。このMessage画面の「Jump to offset」の横の欄に「0」を入力し「0 / Partition:0」を選択します。
これで、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信できることを確認できました。
ストリーミング処理を行うためのデータ受信確認
-
「topic_201」でデータ受信を確認できましたが、「value」が1つの値としてのデータとなっているので、ストリーミング処理のために意図する項目毎に値を取得できるようにデータ受信する必要があります。
-
「Connector」ー「Connectors」ー「RabbitMQSourceConnector_1」の順で新たに表示される画面上部の「Settings」を選択し、コネクタの以下の項目に値を追加し、最後に「Continu」ボタンを押します。
カテゴリ | 入力項目 | 値 |
---|---|---|
Common | Value converter class | org.apache.kafka.connect.converters.ByteArrayConverter |
3. 設定内容に問題なければ、「Launch」ボタンを押します。
4. IoTデータ生成プログラムを実行し新たにデータを送信します。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
IoTHub 出力
Sending message: b'{"id": 0, "time": "2021-07-14T14:07:30.776174", "proc": "111", "section": "L", "iot_num": "319-1762", "iot_state": "\\u5cf6\\u6839\\u770c", "vol_1": 157.6113944081462, "vol_2": 71.87482721564037}'
Sending message: b'{"id": 1, "time": "2021-07-14T14:07:30.776255", "proc": "111", "section": "E", "iot_num": "606-3548", "iot_state": "\\u5927\\u5206\\u770c", "vol_1": 101.4282144148702, "vol_2": 54.41472842473606}'
Sending message: b'{"id": 2, "time": "2021-07-14T14:07:30.776276", "proc": "111", "section": "J", "iot_num": "846-2110", "iot_state": "\\u9ce5\\u53d6\\u770c", "vol_1": 162.78211638795443, "vol_2": 76.92165187757502}'
Sending message: b'{"id": 3, "time": "2021-07-14T14:07:30.776294", "proc": "111", "section": "Y", "iot_num": "196-8432", "iot_state": "\\u798f\\u4e95\\u770c", "vol_1": 118.78173693261178, "vol_2": 74.39179273564206}'
Sending message: b'{"id": 4, "time": "2021-07-14T14:07:30.776309", "proc": "111", "section": "K", "iot_num": "881-3972", "iot_state": "\\u798f\\u5cf6\\u770c", "vol_1": 198.90396115859022, "vol_2": 59.419668368865416}'
データ作成件数:5
データ作成時間:5.025936841964722 [sec]
5. ストリーミング処理のために意図する項目毎に値を取得できているかどうか受信データを確認します。「Topics」ー「All topics」ー「topic_201」の順で新たに表示される画面上部の「Message」を選択します。このMessage画面の「Jump to offset」の横の欄に「10」を入力し「10 / Partition:0」を選択します。
これで、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信でき、かつ、ストリーミング処理のために意図する項目毎に値を取得できていることを確認できました。
次のステップでは、「topic_201」のデータをストリーミング処理(クエリ処理)し、データ抽出できることを確認してみます。
次のステップへの準備
次のステップを実施する前に「topic_201」の全てのデータを削除しておきます。以下のコマンドを実行ください。
$ docker exec -it broker /bin/bash
[appuser@broker ~]$
[appuser@broker ~]$ /bin/kafka-topics --zookeeper zookeeper:2181 --delete --topic topic_201
Topic topic_201 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[appuser@broker ~]$ exit
$
本課題のステップ情報
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認