LoginSignup
2
1

More than 3 years have passed since last update.

KafkaをローカルのDocker環境で、さくっと動かしてみました  第3回

Last updated at Posted at 2021-02-07

概要

Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。

第3回は、第1回で作成したDockerコンテナ環境に、新たに
ProducerとしてPythonプログラムを走らせるためのコンテナを稼働させます。
このProducerからIoT疑似データを送り、Consumerでそのデータの受け取りを確認します。
簡単に絵を描くとこんな感じになります。
kafka-3.png

実行環境

macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3

最初に

新たな topic(topic-01)を作成するために、broker に接続します。作成後、その内容を確認します。

$ docker exec -it broker /bin/bash
root@broker:/#

root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-01 --partitions 3 replication-factor 1
Created topic topic-01

root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-01
Topic: topic-01 PartitionCount: 3   ReplicationFactor: 1    Configs: 
    Topic: topic-01 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic-01 Partition: 1    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic-01 Partition: 2    Leader: 1   Replicas: 1 Isr: 1

Producerコンテナの作成

新たなProducerコンテナを作成します。
Pythonプログラムが稼働するコンテナ作成のためのディレクトリ構成は以下となります。

$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│   └── IoTSampleData-v2.py
└── requirements.txt

docker-compose.yml は以下となります。
ローカルのDocker環境なので、Pythonプログラム(IoTSampleData-v2.py)は COPY を使用せず、volumes を使用しました。

version: '3'
services:
  iot:
    build: .
    working_dir: '/app/'
    tty: true
    volumes:
      - ./opt:/app/opt

networks:
  default:
    external:
      name: iot_network

DockerFile は以下となります。
最後の行の「requirements.txt」はPythonプログラムで必要な関数を別途定義します。

FROM python:3.7.5-slim
USER root

RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8

ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

requirements.txt は以下となります。
Pythonプログラムでimportするのに必要な関数を定義しています。

requirements.txt
faker
kafka-python

Producerコンテナの作成と確認

定義したコンテナをビルドして起動させます。

$ docker-compose up -d
    前略
Creating iotsampledata_iot_1 ... done

起動を確認します。

$ docker-compose ps
       Name           Command   State   Ports
---------------------------------------------
iotsampledata_iot_1   python3   Up           

起動している全てのコンテナを確認します。

$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                                    NAMES
4e4f79c219e1   iotsampledata_iot                 "python3"                3 minutes ago   Up 3 minutes                                                            iotsampledata_iot_1
37e2e1f360f5   confluentinc/cp-kafka:5.5.1       "/bin/sh"                2 hours ago     Up 2 hours     9092/tcp                                                 cli
78d0a02910fe   confluentinc/cp-kafka:5.5.1       "/etc/confluent/dock…"   2 hours ago     Up 2 hours     0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         broker
ad55284f0174   confluentinc/cp-zookeeper:5.5.1   "/etc/confluent/dock…"   2 hours ago     Up 2 hours     2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   zookeeper

Consumerでのメッセージ受け取り設定

cli に接続します。

$ docker exec -it cli /bin/bash
root@cli:/#

Consumer としてメッセージ受け取りの設定をします。

root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning

↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。

Producerからのメッセージ送信

Producerで稼働させるデータ生成プログラムは以下となります。
生成されるデータのカラム構成はプログラム中の items を確認ください。
このプログラムの改変版となります。

IoTSampleData-v2.py
import random
import json
import time
from datetime import date, datetime
from collections import OrderedDict
import argparse
import string
import pprint
from faker.factory import Factory
from kafka import KafkaProducer
from kafka import KafkaConsumer

# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# IoT機器のダミーセクション(小文字アルファベットを定義)
section = string.ascii_uppercase


# IoT機器で送信JSONデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': i,                            # id
            'time': generate_time(),            # データ生成時間
            'proc': proc,                       # データ生成プロセス  :プログラム実行時のパラメータ
            'section': random.choice(section),  # IoT機器セクション  :A-Z文字をランダムに割当
            'iot_num': fake.zipcode(),          # IoT機器番号    :郵便番号をランダムに割当
            'iot_state': fake.prefecture(),     # IoT設置場所    :都道府県名をランダムに割当
            'vol_1': random.uniform(100, 200),  # IoT値−1       :100-200の間の値をランダムに割当(小数点以下、14桁)
            'vol_2': random.uniform(50, 90)     # IoT値−2      :50-90の間の値をランダムに割当(小数点以下、14桁)
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime

# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# メイン : ターミナル出力用
def tm_main(count, proc):
    print('ターミナル 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)
    pprint.pprint(json_dict)


# メイン : Kafka出力用
def kf_main(count, proc):
    print('Kafka 出力')
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    producer = KafkaProducer(bootstrap_servers=['broker:29092'])
    date = datetime.now().strftime("%Y/%m/%d")

    for item in json_dict['items']:
        # print(item)
        # result = producer.send('topic-01', json.dumps(item).encode('utf-8'))
        result = producer.send('topic-01', key=date.encode('utf-8'), value=json.dumps(item).encode('utf-8'))

    print(result)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='IoT機器のなんちゃってダミーデータの生成')
    parser.add_argument('--count', type=int, default=10, help='データ作成件数')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)/ kf(Kafka出力)')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'kf'): 
        kf_main(args.count, args.proc)
    else :
        tm_main(args.count, args.proc)

    making_time = time.time() - start

    print("")
    print(f"データ作成件数:{args.count}")
    print("データ作成時間:{0}".format(making_time) + " [sec]")
    print("")

データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

IoTSampleData-v2.py を実行し、生成データを送信します(デフォルトで10件)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f7b5ca50490>

データ作成件数:10
データ作成時間:0.09887456893920898 [sec]

ConsumerのプロンプトにProducerで送信されたデータが表示されます(文字コード変換はご容赦ください)。

root@cli:/# kafka-console-consumer --bootstrap-server broker:29092 --topic topic-01 --group G1 --from-beginning
{"id": 0, "time": "2021-02-04T16:26:15.900303", "proc": "111", "section": "S", "iot_num": "115-4984", "iot_state": "\u5bcc\u5c71\u770c", "vol_1": 145.2739911204906, "vol_2": 56.15103042985286}
{"id": 1, "time": "2021-02-04T16:26:15.900343", "proc": "111", "section": "O", "iot_num": "467-0063", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 196.68828974426407, "vol_2": 83.83189487506144}
{"id": 2, "time": "2021-02-04T16:26:15.900365", "proc": "111", "section": "H", "iot_num": "475-9700", "iot_state": "\u611b\u5a9b\u770c", "vol_1": 194.72768503158943, "vol_2": 68.37182675896713}
{"id": 3, "time": "2021-02-04T16:26:15.900386", "proc": "111", "section": "W", "iot_num": "322-4370", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 148.90496235643883, "vol_2": 67.71055729821154}
{"id": 4, "time": "2021-02-04T16:26:15.900405", "proc": "111", "section": "G", "iot_num": "461-8598", "iot_state": "\u9ad8\u77e5\u770c", "vol_1": 125.97506447191827, "vol_2": 70.61188682350212}
{"id": 5, "time": "2021-02-04T16:26:15.900416", "proc": "111", "section": "T", "iot_num": "105-7057", "iot_state": "\u798f\u4e95\u770c", "vol_1": 172.64015749557626, "vol_2": 68.7370674440578}
{"id": 6, "time": "2021-02-04T16:26:15.900429", "proc": "111", "section": "K", "iot_num": "691-1826", "iot_state": "\u9577\u5d0e\u770c", "vol_1": 117.36354421367344, "vol_2": 81.55938819205682}
{"id": 7, "time": "2021-02-04T16:26:15.900502", "proc": "111", "section": "W", "iot_num": "355-5626", "iot_state": "\u4e09\u91cd\u770c", "vol_1": 123.18728419707203, "vol_2": 88.59540935089659}
{"id": 8, "time": "2021-02-04T16:26:15.900534", "proc": "111", "section": "O", "iot_num": "102-4557", "iot_state": "\u6ecb\u8cc0\u770c", "vol_1": 117.00423119320587, "vol_2": 83.8472348239608}
{"id": 9, "time": "2021-02-04T16:26:15.900544", "proc": "111", "section": "D", "iot_num": "675-3926", "iot_state": "\u5cf6\u6839\u770c", "vol_1": 176.01299983418306, "vol_2": 51.670329829172005}

これで、Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることを確認できました。

次回について

次回(第4回)は Kafka の KSQL を使用し、Producer から送られてくるデータを抽出してみます。

第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1