概要
Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。
第7回は、第5回でConsumer上のPythonプログラムでtopic-11のデータをターミナル出力するものでしたが、今回は Producer側を複数台のIoT機器を想定し同様のことを確認します。
簡単に絵を描くとこんな感じになります。
実行環境
macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3
Consumerでのメッセージ受け取り設定
データを受け取るために、Comsumer に接続し、プログラムのあるディレクトリに移動します。
$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#
IoTTopicData-v1.py を実行しデータの受け取り設定をします(ターミナル出力)。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。
2つのProducerコンテナの起動
データを送信するために別ターミナルを立ち上げ、合計2つのProducer を立ち上げます。
その後、2つのコンテナが起動されているかを確認します。
$ docker-compose up -d --scale iot=2
Creating iotsampledata_iot_2 ... done
$ docker-compose ps
Name Command State Ports
---------------------------------------------
iotsampledata_iot_1 python3 Up
iotsampledata_iot_2 python3 Up
2つのProducerからのメッセージ送信
同時に2つのコンテナのプログラムを実行させるために、スクリプトファイル「script_iot.sh」を作成します。
それぞれのコンテナから 500件 のデータを送信(パラメータ:--count 500)するように設定します。
どちらのコンテナからデータが送信されたのか識別(パラメータ:--proc 1111 or 2222)できるように設定します。
#!/bin/zsh
docker exec iotsampledata_iot_1 python /app/opt/IoTSampleData-v2.py --mode kf --proc 1111 --count 500 &
docker exec iotsampledata_iot_2 python /app/opt/IoTSampleData-v2.py --mode kf --proc 2222 --count 500
このスクリプトを実行します。
$ ./iot_multi.sh
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f1e5b861ad0>
データ作成件数:500
データ作成時間:0.21667098999023438 [sec]
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f165d9c8450>
データ作成件数:500
データ作成時間:0.22739458084106445 [sec]
ConsumerのプロンプトにProducerで送信されたデータが表示されます。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
中略
topic-11:0:766: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211665', 'PROC': '1111', 'IOT_NUM': '832-6134', 'IOT_STATE': '茨城県', 'VOL_1': 179.96703574789623, 'VOL_2': 80.56843267707185}
topic-11:0:767: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211852', 'PROC': '1111', 'IOT_NUM': '673-3797', 'IOT_STATE': '福井県', 'VOL_1': 147.72401039767132, 'VOL_2': 85.72532419479631}
topic-11:0:768: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.216597', 'PROC': '2222', 'IOT_NUM': '503-2390', 'IOT_STATE': '千葉県', 'VOL_1': 152.7394266108338, 'VOL_2': 57.05013141176613}
topic-11:0:769: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.216701', 'PROC': '2222', 'IOT_NUM': '516-4680', 'IOT_STATE': '青森県', 'VOL_1': 108.40957664102798, 'VOL_2': 87.16620915206302}
topic-11:0:770: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.217158', 'PROC': '2222', 'IOT_NUM': '461-3098', 'IOT_STATE': '熊本県', 'VOL_1': 156.59169261796896, 'VOL_2': 84.65261961696028}
topic-11:0:771: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217178', 'PROC': '2222', 'IOT_NUM': '715-8459', 'IOT_STATE': '宮城県', 'VOL_1': 170.13111876865736, 'VOL_2': 63.72228869990629}
topic-11:0:772: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217307', 'PROC': '2222', 'IOT_NUM': '153-1059', 'IOT_STATE': '石川県', 'VOL_1': 109.43435107461053, 'VOL_2': 71.26109660837363}
topic-11:0:773: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212614', 'PROC': '1111', 'IOT_NUM': '019-2207', 'IOT_STATE': '長崎県', 'VOL_1': 136.0842636605171, 'VOL_2': 51.32787199610963}
topic-11:0:774: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212961', 'PROC': '1111', 'IOT_NUM': '783-1782', 'IOT_STATE': '宮崎県', 'VOL_1': 123.00662563145016, 'VOL_2': 59.17796383111264}
後略
これで、2つのProducerコンテナ上のPythonプログラムで生成したデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることを確認できました(キー:PROC の値を確認すると、それぞれのコンテナで作成したデータを受け取っている)。
おわりに
全7回で、KafkaをローカルのDocker環境で、さくっと動かしてみました。
スキーマレジストリ や コネクタ 等は含めていませんが、簡単なKafkaの動きを確認できました。
第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージを Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築