概要
Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。
第5回は、第4回で抽出したデータを取得するために、新たに ConsumerとしてPythonプログラムを稼働させるコンテナを作成し、そのConsumerでデータの受け取りを確認します。
簡単に絵を描くとこんな感じになります。
実行環境
macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3
Consumerコンテナの作成
topic-11 からの抽出データ受け取りのために、新たなConsumerコンテナを作成します。
Pythonプログラムが稼働するコンテナ作成のためのディレクトリ構成は以下となります。
$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│ └── IoTTopicData-v1.py
└── requirements.txt
docker-compose.yml は以下となります。
ローカルのDocker環境なので、Pythonプログラム(IoTTopicData-v1.py)は COPY を使用せず、volumes を使用しました。
version: '3'
services:
iot:
build: .
working_dir: '/app/'
tty: true
volumes:
- ./opt:/app/opt
networks:
default:
external:
name: iot_network
DockerFile は以下となります。
最後の行の「requirements.txt」はPythonプログラムで必要な関数を別途定義します。
FROM python:3.7.5-slim
USER root
RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
requirements.txt は以下となります。
Pythonプログラムでimportするのに必要な関数を定義しています。
kafka-python
Consumerコンテナの作成と確認
定義したコンテナをビルドして起動させます。
$ docker-compose up -d
前略
Creating iottopicdata_ktp_1 ... done
起動を確認します。
$ docker-compose ps
Name Command State Ports
--------------------------------------------
iottopicdata_ktp_1 python3 Up
起動している全てのコンテナを確認します。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
c23123e17068 iottopicdata_ktp "python3" About a minute ago Up About a minute iottopicdata_ktp_1
35620515e9f1 confluentinc/cp-ksql-cli:5.4.3 "/bin/sh" 22 hours ago Up 22 hours ksql-cli
f46362cbcd5c confluentinc/cp-ksql-server:5.4.3 "/etc/confluent/dock…" 22 hours ago Up 22 hours (healthy) 0.0.0.0:8088->8088/tcp ksql-server
4e4f79c219e1 iotsampledata_iot "python3" 24 hours ago Up 24 hours iotsampledata_iot_1
37e2e1f360f5 confluentinc/cp-kafka:5.5.1 "/bin/sh" 26 hours ago Up 26 hours 9092/tcp cli
78d0a02910fe confluentinc/cp-kafka:5.5.1 "/etc/confluent/dock…" 26 hours ago Up 26 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp broker
ad55284f0174 confluentinc/cp-zookeeper:5.5.1 "/etc/confluent/dock…" 26 hours ago Up 26 hours 2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp zookeeper
Consumerで稼働させるプログラム
抽出したデータを受け取るプログラムは以下となります。
import json
import time
import argparse
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer
# ターミナル出力用
def topic_to_tm(consumer):
print('ターミナル 出力')
# Read data from kafka
try :
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('topic-11',
bootstrap_servers = ['broker:29092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
print(consumer)
return consumer
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)')
args = parser.parse_args()
start = time.time()
consumer = get_kafka_topic()
topic_to_tm(consumer)
making_time = time.time() - start
print("")
print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
print("")
Consumerでのメッセージ受け取り設定
データを受け取るために、Comsumer に接続し、プログラムのあるディレクトリに移動します。
$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#
IoTTopicData-v1.py を実行し抽出データの受け取り設定をします。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。
Producerからのメッセージ送信
データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
IoTSampleData-v2.py を実行し、生成データを送信します(30件)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fa4a03cf650>
データ作成件数:30
データ作成時間:0.12619686126708984 [sec]
ConsumerのプロンプトにProducerで送信されたデータが表示されます。
root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
topic-11:0:0: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T15:34:52.630342', 'PROC': '111', 'IOT_NUM': '268-8968', 'IOT_STATE': '愛知県', 'VOL_1': 113.19269863668802, 'VOL_2': 50.74199787037559}
topic-11:0:1: key=b'2021/02/05' value={'SECTION': 'E', 'TIME': '2021-02-05T15:34:52.630686', 'PROC': '111', 'IOT_NUM': '539-2454', 'IOT_STATE': '広島県', 'VOL_1': 150.6519642589583, 'VOL_2': 50.299823891383774}
これで、Producer上のPythonプログラムで生成したデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることを確認できました。
次回について
次回(第6回)は Kafka の KSQL でストリーミング抽出処理されたデータを Consumer を経由してS3に書き出すことを確認します。
第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築