概要
Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。
第3回は、第1回で作成したDockerコンテナ環境に、新たに
ProducerとしてPythonプログラムを走らせるためのコンテナを稼働させます。
このProducerからIoT疑似データを送り、Consumerでそのデータの受け取りを確認します。
簡単に絵を描くとこんな感じになります。
実行環境
macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3
最初に
新たな topic(topic-01)を作成するために、broker に接続します。作成後、その内容を確認します。
$ docker exec -it broker /bin/bash
root@broker:/#
root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-01 --partitions 3 replication-factor 1
Created topic topic-01
root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-01
Topic: topic-01 PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: topic-01 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-01 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Producerコンテナの作成
新たなProducerコンテナを作成します。
Pythonプログラムが稼働するコンテナ作成のためのディレクトリ構成は以下となります。
$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│ └── IoTSampleData-v2.py
└── requirements.txt
docker-compose.yml は以下となります。
ローカルのDocker環境なので、Pythonプログラム(IoTSampleData-v2.py)は COPY を使用せず、volumes を使用しました。
version: '3'
services:
iot:
build: .
working_dir: '/app/'
tty: true
volumes:
- ./opt:/app/opt
networks:
default:
external:
name: iot_network
DockerFile は以下となります。
最後の行の「requirements.txt」はPythonプログラムで必要な関数を別途定義します。
FROM python:3.7.5-slim
USER root
RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8
ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm
RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt
requirements.txt は以下となります。
Pythonプログラムでimportするのに必要な関数を定義しています。
faker
kafka-python
Producerコンテナの作成と確認
定義したコンテナをビルドして起動させます。
$ docker-compose up -d
前略
Creating iotsampledata_iot_1 ... done
起動を確認します。
$ docker-compose ps
Name Command State Ports
---------------------------------------------
iotsampledata_iot_1 python3 Up
起動している全てのコンテナを確認します。
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
4e4f79c219e1 iotsampledata_iot "python3" 3 minutes ago Up 3 minutes iotsampledata_iot_1
37e2e1f360f5 confluentinc/cp-kafka:5.5.1 "/bin/sh" 2 hours ago Up 2 hours 9092/tcp cli
78d0a02910fe confluentinc/cp-kafka:5.5.1 "/etc/confluent/dock…" 2 hours ago Up 2 hours 0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp broker
ad55284f0174 confluentinc/cp-zookeeper:5.5.1 "/etc/confluent/dock…" 2 hours ago Up 2 hours 2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp zookeeper
Consumerでのメッセージ受け取り設定
cli に接続します。
$ docker exec -it cli /bin/bash
root@cli:/#
Consumer としてメッセージ受け取りの設定をします。
root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning
↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。
Producerからのメッセージ送信
Producerで稼働させるデータ生成プログラムは以下となります。
生成されるデータのカラム構成はプログラム中の items を確認ください。
このプログラムの改変版となります。
import random
import json
import time
from datetime import date, datetime
from collections import OrderedDict
import argparse
import string
import pprint
from faker.factory import Factory
from kafka import KafkaProducer
from kafka import KafkaConsumer
# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")
# IoT機器のダミーセクション(小文字アルファベットを定義)
section = string.ascii_uppercase
# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
iot_items = json.dumps({
'items': [{
'id': i, # id
'time': generate_time(), # データ生成時間
'proc': proc, # データ生成プロセス :プログラム実行時のパラメータ
'section': random.choice(section), # IoT機器セクション :A-Z文字をランダムに割当
'iot_num': fake.zipcode(), # IoT機器番号 :郵便番号をランダムに割当
'iot_state': fake.prefecture(), # IoT設置場所 :都道府県名をランダムに割当
'vol_1': random.uniform(100, 200), # IoT値−1 :100-200の間の値をランダムに割当(小数点以下、14桁)
'vol_2': random.uniform(50, 90) # IoT値−2 :50-90の間の値をランダムに割当(小数点以下、14桁)
}
for i in range(count)
]
}, ensure_ascii=False).encode('utf-8')
return iot_items
# IoT機器で計測されたダミーデータの生成時間
def generate_time():
dt_time = datetime.now()
gtime = json_trans_date(dt_time)
return gtime
# date, datetimeの変換関数
def json_trans_date(obj):
# 日付型を文字列に変換
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# 上記以外は対象外.
raise TypeError ("Type %s not serializable" % type(obj))
# メイン : ターミナル出力用
def tm_main(count, proc):
print('ターミナル 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
pprint.pprint(json_dict)
# メイン : Kafka出力用
def kf_main(count, proc):
print('Kafka 出力')
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
producer = KafkaProducer(bootstrap_servers=['broker:29092'])
date = datetime.now().strftime("%Y/%m/%d")
for item in json_dict['items']:
# print(item)
# result = producer.send('topic-01', json.dumps(item).encode('utf-8'))
result = producer.send('topic-01', key=date.encode('utf-8'), value=json.dumps(item).encode('utf-8'))
print(result)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
parser.add_argument('--count', type=int, default=10, help='データ作成件数')
parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ kf(Kafka出力)')
args = parser.parse_args()
start = time.time()
if (args.mode == 'kf'):
kf_main(args.count, args.proc)
else :
tm_main(args.count, args.proc)
making_time = time.time() - start
print("")
print(f"データ作成件数:{args.count}")
print("データ作成時間:{0}".format(making_time) + " [sec]")
print("")
データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
IoTSampleData-v2.py を実行し、生成データを送信します(デフォルトで10件)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f7b5ca50490>
データ作成件数:10
データ作成時間:0.09887456893920898 [sec]
ConsumerのプロンプトにProducerで送信されたデータが表示されます(文字コード変換はご容赦ください)。
root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning
{"id": 0, "time": "2021-02-04T16:26:15.900303", "proc": "111", "section": "S", "iot_num": "115-4984", "iot_state": "\u5bcc\u5c71\u770c", "vol_1": 145.2739911204906, "vol_2": 56.15103042985286}
{"id": 1, "time": "2021-02-04T16:26:15.900343", "proc": "111", "section": "O", "iot_num": "467-0063", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 196.68828974426407, "vol_2": 83.83189487506144}
{"id": 2, "time": "2021-02-04T16:26:15.900365", "proc": "111", "section": "H", "iot_num": "475-9700", "iot_state": "\u611b\u5a9b\u770c", "vol_1": 194.72768503158943, "vol_2": 68.37182675896713}
{"id": 3, "time": "2021-02-04T16:26:15.900386", "proc": "111", "section": "W", "iot_num": "322-4370", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 148.90496235643883, "vol_2": 67.71055729821154}
{"id": 4, "time": "2021-02-04T16:26:15.900405", "proc": "111", "section": "G", "iot_num": "461-8598", "iot_state": "\u9ad8\u77e5\u770c", "vol_1": 125.97506447191827, "vol_2": 70.61188682350212}
{"id": 5, "time": "2021-02-04T16:26:15.900416", "proc": "111", "section": "T", "iot_num": "105-7057", "iot_state": "\u798f\u4e95\u770c", "vol_1": 172.64015749557626, "vol_2": 68.7370674440578}
{"id": 6, "time": "2021-02-04T16:26:15.900429", "proc": "111", "section": "K", "iot_num": "691-1826", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 117.36354421367344, "vol_2": 81.55938819205682}
{"id": 7, "time": "2021-02-04T16:26:15.900502", "proc": "111", "section": "W", "iot_num": "355-5626", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 123.18728419707203, "vol_2": 88.59540935089659}
{"id": 8, "time": "2021-02-04T16:26:15.900534", "proc": "111", "section": "O", "iot_num": "102-4557", "iot_state": "\u6ecb\u8cc0\u770c", "vol_1": 117.00423119320587, "vol_2": 83.8472348239608}
{"id": 9, "time": "2021-02-04T16:26:15.900544", "proc": "111", "section": "D", "iot_num": "675-3926", "iot_state": "\u5cf6\u6839\u770c", "vol_1": 176.01299983418306, "vol_2": 51.670329829172005}
これで、Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることを確認できました。
次回について
次回(第4回)は Kafka の KSQL を使用し、Producer から送られてくるデータを抽出してみます。
第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築