2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

KafkaをローカルのDocker環境で、さくっと動かしてみました  第5回

Last updated at Posted at 2021-02-08

概要

Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。

第5回は、第4回で抽出したデータを取得するために、新たに ConsumerとしてPythonプログラムを稼働させるコンテナを作成し、そのConsumerでデータの受け取りを確認します。
簡単に絵を描くとこんな感じになります。
Kafka-5.png

実行環境

macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3

Consumerコンテナの作成

topic-11 からの抽出データ受け取りのために、新たなConsumerコンテナを作成します。
Pythonプログラムが稼働するコンテナ作成のためのディレクトリ構成は以下となります。

$ tree
.
├── Dockerfile
├── docker-compose.yml
├── opt
│   └── IoTTopicData-v1.py
└── requirements.txt

docker-compose.yml は以下となります。
ローカルのDocker環境なので、Pythonプログラム(IoTTopicData-v1.py)は COPY を使用せず、volumes を使用しました。

version: '3'
services:
  iot:
    build: .
    working_dir: '/app/'
    tty: true
    volumes:
      - ./opt:/app/opt

networks:
  default:
    external:
      name: iot_network

DockerFile は以下となります。
最後の行の「requirements.txt」はPythonプログラムで必要な関数を別途定義します。

FROM python:3.7.5-slim
USER root

RUN apt-get update
RUN apt-get -y install locales && localedef -f UTF-8 -i ja_JP ja_JP.UTF-8

ENV LANG ja_JP.UTF-8
ENV LANGUAGE ja_JP:ja
ENV LC_ALL ja_JP.UTF-8
ENV TZ JST-9
ENV TERM xterm

RUN apt-get install -y vim less
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
RUN pip install -r requirements.txt

requirements.txt は以下となります。
Pythonプログラムでimportするのに必要な関数を定義しています。

requirements.txt
kafka-python

Consumerコンテナの作成と確認

定義したコンテナをビルドして起動させます。

$ docker-compose up -d
    前略
Creating iottopicdata_ktp_1 ... done

起動を確認します。

$ docker-compose ps
       Name          Command   State   Ports
--------------------------------------------
iottopicdata_ktp_1   python3   Up           

起動している全てのコンテナを確認します。

$ docker ps
CONTAINER ID   IMAGE                               COMMAND                  CREATED              STATUS                  PORTS                                                    NAMES
c23123e17068   iottopicdata_ktp                    "python3"                About a minute ago   Up About a minute                                                                iottopicdata_ktp_1
35620515e9f1   confluentinc/cp-ksql-cli:5.4.3      "/bin/sh"                22 hours ago         Up 22 hours                                                                      ksql-cli
f46362cbcd5c   confluentinc/cp-ksql-server:5.4.3   "/etc/confluent/dock…"   22 hours ago         Up 22 hours (healthy)   0.0.0.0:8088->8088/tcp                                   ksql-server
4e4f79c219e1   iotsampledata_iot                   "python3"                24 hours ago         Up 24 hours                                                                      iotsampledata_iot_1
37e2e1f360f5   confluentinc/cp-kafka:5.5.1         "/bin/sh"                26 hours ago         Up 26 hours             9092/tcp                                                 cli
78d0a02910fe   confluentinc/cp-kafka:5.5.1         "/etc/confluent/dock…"   26 hours ago         Up 26 hours             0.0.0.0:9092->9092/tcp, 0.0.0.0:29092->29092/tcp         broker
ad55284f0174   confluentinc/cp-zookeeper:5.5.1     "/etc/confluent/dock…"   26 hours ago         Up 26 hours             2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp   zookeeper

Consumerで稼働させるプログラム

抽出したデータを受け取るプログラムは以下となります。

IoTTopicData-v1.py
import json
import time
import argparse
import pprint
from datetime import datetime
from kafka import KafkaProducer
from kafka import KafkaConsumer

# ターミナル出力用
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                                message.offset, message.key,
                                                message.value))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic-11',
                        bootstrap_servers = ['broker:29092'],
                        value_deserializer=lambda m: json.loads(m.decode('utf-8')))

    print(consumer)
    return consumer


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='KafkaからのIoT機器のなんちゃってStreamデータの取得')
    parser.add_argument('--mode', type=str, default='tm', help='tm(ターミナル出力)')
    args = parser.parse_args()

    start = time.time()
    
    consumer = get_kafka_topic()
    topic_to_tm(consumer)

    making_time = time.time() - start

    print("")
    print("Streamデータ取得待機時間:{0}".format(making_time) + " [sec]")
    print("")

Consumerでのメッセージ受け取り設定

データを受け取るために、Comsumer に接続し、プログラムのあるディレクトリに移動します。

$ docker exec -it iottopicdata_ktp_1 /bin/bash
root@c23123e17068:/app#
root@c23123e17068:/app# cd opt
root@c23123e17068:/app/opt#

IoTTopicData-v1.py を実行し抽出データの受け取り設定をします。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力

↑ プロンプトが何も表示されない状態ですが、Producerからのメッセージの受け取りを待っています。

Producerからのメッセージ送信

データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

IoTSampleData-v2.py を実行し、生成データを送信します(30件)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7fa4a03cf650>

データ作成件数:30
データ作成時間:0.12619686126708984 [sec]

ConsumerのプロンプトにProducerで送信されたデータが表示されます。

root@c23123e17068:/app/opt# python IoTTopicData-v1.py --mode tm
<kafka.consumer.group.KafkaConsumer object at 0x7f7494085650>
ターミナル 出力
topic-11:0:0: key=b'2021/02/05' value={'SECTION': 'W', 'TIME': '2021-02-05T15:34:52.630342', 'PROC': '111', 'IOT_NUM': '268-8968', 'IOT_STATE': '愛知県', 'VOL_1': 113.19269863668802, 'VOL_2': 50.74199787037559}
topic-11:0:1: key=b'2021/02/05' value={'SECTION': 'E', 'TIME': '2021-02-05T15:34:52.630686', 'PROC': '111', 'IOT_NUM': '539-2454', 'IOT_STATE': '広島県', 'VOL_1': 150.6519642589583, 'VOL_2': 50.299823891383774}

これで、Producer上のPythonプログラムで生成したデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることを確認できました。

次回について

次回(第6回)は Kafka の KSQL でストリーミング抽出処理されたデータを Consumer を経由してS3に書き出すことを確認します。

第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築

2
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?