LoginSignup
3
3

More than 3 years have passed since last update.

KafkaをローカルのDocker環境で、さくっと動かしてみました  第7回

Last updated at Posted at 2021-02-09

概要

Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。

第7回は、第5回でConsumer上のPythonプログラムでtopic-11のデータをターミナル出力するものでしたが、今回は Producer側を複数台のIoT機器を想定し同様のことを確認します。
簡単に絵を描くとこんな感じになります。
Kafka-7.png

実行環境

macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3

Consumerでのメッセージ受け取り設定

データを受け取るために、Comsumer に接続し、プログラムのあるディレクトリに移動します。

$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#

IoTTopicData-v1.py を実行しデータの受け取り設定をします(ターミナル出力)。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力

↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。

2つのProducerコンテナの起動

データを送信するために別ターミナルを立ち上げ、合計2つのProducer を立ち上げます。
その後、2つのコンテナが起動されているかを確認します。

$ docker-compose up -d --scale iot=2
Creating iotsampledata_iot_2 ... done

$ docker-compose ps
       Name           Command   State   Ports
---------------------------------------------
iotsampledata_iot_1   python3   Up           
iotsampledata_iot_2   python3   Up           

2つのProducerからのメッセージ送信

同時に2つのコンテナのプログラムを実行させるために、スクリプトファイル「script_iot.sh」を作成します。
それぞれのコンテナから 500件 のデータを送信(パラメータ:--count 500)するように設定します。
どちらのコンテナからデータが送信されたのか識別(パラメータ:--proc 1111 or 2222)できるように設定します。

iot_multi.sh
#!/bin/zsh
docker exec iotsampledata_iot_1 python /app/opt/IoTSampleData-v2.py --mode kf --proc 1111 --count 500 &
docker exec iotsampledata_iot_2 python /app/opt/IoTSampleData-v2.py --mode kf --proc 2222 --count 500

このスクリプトを実行します。

$ ./iot_multi.sh
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f1e5b861ad0>

データ作成件数:500
データ作成時間:0.21667098999023438 [sec]

Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f165d9c8450>

データ作成件数:500
データ作成時間:0.22739458084106445 [sec]

ConsumerのプロンプトにProducerで送信されたデータが表示されます。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
  中略
topic-11:0:766: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211665', 'PROC': '1111', 'IOT_NUM': '832-6134', 'IOT_STATE': '茨城県', 'VOL_1': 179.96703574789623, 'VOL_2': 80.56843267707185}
topic-11:0:767: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.211852', 'PROC': '1111', 'IOT_NUM': '673-3797', 'IOT_STATE': '福井県', 'VOL_1': 147.72401039767132, 'VOL_2': 85.72532419479631}
topic-11:0:768: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.216597', 'PROC': '2222', 'IOT_NUM': '503-2390', 'IOT_STATE': '千葉県', 'VOL_1': 152.7394266108338, 'VOL_2': 57.05013141176613}
topic-11:0:769: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.216701', 'PROC': '2222', 'IOT_NUM': '516-4680', 'IOT_STATE': '青森県', 'VOL_1': 108.40957664102798, 'VOL_2': 87.16620915206302}
topic-11:0:770: key=b'2021/02/06' value={'SECTION': 'E', 'TIME': '2021-02-06T01:36:51.217158', 'PROC': '2222', 'IOT_NUM': '461-3098', 'IOT_STATE': '熊本県', 'VOL_1': 156.59169261796896, 'VOL_2': 84.65261961696028}
topic-11:0:771: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217178', 'PROC': '2222', 'IOT_NUM': '715-8459', 'IOT_STATE': '宮城県', 'VOL_1': 170.13111876865736, 'VOL_2': 63.72228869990629}
topic-11:0:772: key=b'2021/02/06' value={'SECTION': 'C', 'TIME': '2021-02-06T01:36:51.217307', 'PROC': '2222', 'IOT_NUM': '153-1059', 'IOT_STATE': '石川県', 'VOL_1': 109.43435107461053, 'VOL_2': 71.26109660837363}
topic-11:0:773: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212614', 'PROC': '1111', 'IOT_NUM': '019-2207', 'IOT_STATE': '長崎県', 'VOL_1': 136.0842636605171, 'VOL_2': 51.32787199610963}
topic-11:0:774: key=b'2021/02/06' value={'SECTION': 'W', 'TIME': '2021-02-06T01:36:51.212961', 'PROC': '1111', 'IOT_NUM': '783-1782', 'IOT_STATE': '宮崎県', 'VOL_1': 123.00662563145016, 'VOL_2': 59.17796383111264}
 後略

これで、2つのProducerコンテナ上のPythonプログラムで生成したデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることを確認できました(キー:PROC の値を確認すると、それぞれのコンテナで作成したデータを受け取っている)。

おわりに

全7回で、KafkaをローカルのDocker環境で、さくっと動かしてみました。
スキーマレジストリ や コネクタ 等は含めていませんが、簡単なKafkaの動きを確認できました。

第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージを Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3