4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

KafkaをローカルのDocker環境で、さくっと動かしてみました  第4回

Last updated at Posted at 2021-02-08

概要

Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。

第4回は、第3回をベースに Kafka の KSQL を使用し、Producer から送られてくるデータを とある条件で抽出してみます。
簡単に絵を描くとこんな感じになります。
Kafka-4.png

実行環境

macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3

KSQLコンテナの作成

第1回の docker-compose.yml に ksql-server(KSQLのサーバ) と ksql-cli(KSQLのクライアント) コンテナ定義を追加します。


version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_SUPPORT_METRICS_ENABLE: "false"

  cli:
    image: confluentinc/cp-kafka:5.5.1
    hostname: cli
    container_name: cli
    depends_on:
      - broker
    entrypoint: /bin/sh
    tty: true

  ksql-server:
    image: confluentinc/cp-ksql-server:5.4.3
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "IoT-demo-1"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_AUTO_OFFSET_RESET: "earliest"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.4.3
    container_name: ksql-cli
    volumes:
      - $PWD/ksql.commands:/tmp/ksql.commands
    depends_on:
      - broker
      - ksql-server
    entrypoint: /bin/sh
    tty: true

networks:
  default:
    external:
      name: iot_network

コンテナの作成と確認

定義したコンテナをビルドして起動させます。

$ docker-compose up -d
    前略
zookeeper is up-to-date
broker is up-to-date
cli is up-to-date
Creating ksql-server ... done
Creating ksql-cli    ... done

起動しているコンテナを確認します。

$ docker-compose ps
   Name                Command               State                               Ports                         
---------------------------------------------------------------------------------------------------------------
broker        /etc/confluent/docker/run   Up             0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp      
cli           /bin/sh                     Up             9092/tcp                                              
ksql-cli      /bin/sh                     Up                                                                   
ksql-server   /etc/confluent/docker/run   Up (healthy)   0.0.0.0:8088->8088/tcp                                
zookeeper     /etc/confluent/docker/run   Up             2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp

抽出データをストリーミングするためのTopicの作成

broker に接続します。

$ docker exec -it broker /bin/bash
root@broker:/#

抽出データをストリーミングするための topic 「topic-11」を作成します。

root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-11 --partitions 3 replication-factor 1
Created topic topic-11.

作成された topic を確認します。

root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-11
Topic: topic-11	PartitionCount: 3	ReplicationFactor: 1	Configs: 
	Topic: topic-11	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic-11	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: topic-11	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

KSQLでのストリーミングの作成

ksql-cli に接続します。

$ docker exec -it ksql-cli /bin/bash
root@35620515e9f1:/#

ksql-cli から ksql-server に接続します。

root@35620515e9f1:/# ksql http://ksql-server:8088
                  
                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2019 Confluent Inc.
CLI v5.4.3, Server v5.4.3 located at http://ksql-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

Producer から送られてくるデータ(topic-01)のストリーム「topic01_stream1」を作成します。

ksql> CREATE STREAM topic01_stream1 (id INT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic-01', VALUE_FORMAT='JSON', KEY='section');

 Message        
----------------
 Stream created 
----------------

作成された「topic01_stream1」の情報を確認します。

ksql> describe extended topic01_stream1;

Name                 : TOPIC01_STREAM1
Type                 : STREAM
Key field            : SECTION
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : topic-01 (partitions: 3, replication: 1)

 Field     | Type                      
---------------------------------------
 ROWTIME   | BIGINT           (system) 
 ROWKEY    | VARCHAR(STRING)  (system) 
 ID        | INTEGER                   
 TIME      | VARCHAR(STRING)           
 PROC      | VARCHAR(STRING)           
 SECTION   | VARCHAR(STRING)           
 IOT_NUM   | VARCHAR(STRING)           
 IOT_STATE | VARCHAR(STRING)           
 VOL_1     | DOUBLE                    
 VOL_2     | DOUBLE                    
---------------------------------------

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic topic-01)

「topic01_stream1」のストリーミングデータを以下の条件で抽出し、その結果を topic-11 に送信するストリーム「topic01_stream2」を作成します。
抽出条件: section='E' OR section='C' OR section='W'

ksql> CREATE STREAM topic01_stream2 WITH (KAFKA_TOPIC = 'topic-11', VALUE_FORMAT='JSON') AS SELECT t01s1.section as section, t01s1.time as time, t01s1.proc as proc, t01s1.iot_num as iot_num,  t01s1.iot_state as iot_state, t01s1.vol_1 as vol_1, t01s1.vol_2 as vol_2 FROM  topic01_stream1 t01s1 WHERE section='E' OR section='C' OR section='W';

 Message                                                                                            
----------------------------------------------------------------------------------------------------
 Stream TOPIC01_STREAM2 created and running. Created by query with query ID: CSAS_TOPIC01_STREAM2_1 
----------------------------------------------------------------------------------------------------

作成された「topic01_stream2」の情報を確認します。

ksql> describe extended topic01_stream2;

Name                 : TOPIC01_STREAM2
Type                 : STREAM
Key field            : SECTION
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : JSON
Kafka topic          : topic-11 (partitions: 3, replication: 1)

 Field     | Type                      
---------------------------------------
 ROWTIME   | BIGINT           (system) 
 ROWKEY    | VARCHAR(STRING)  (system) 
 SECTION   | VARCHAR(STRING)           
 TIME      | VARCHAR(STRING)           
 PROC      | VARCHAR(STRING)           
 IOT_NUM   | VARCHAR(STRING)           
 IOT_STATE | VARCHAR(STRING)           
 VOL_1     | DOUBLE                    
 VOL_2     | DOUBLE                    
---------------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_TOPIC01_STREAM2_1 : CREATE STREAM TOPIC01_STREAM2 WITH (KAFKA_TOPIC='topic-11', PARTITIONS=3, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
  T01S1.SECTION "SECTION",
  T01S1.TIME "TIME",
  T01S1.PROC "PROC",
  T01S1.IOT_NUM "IOT_NUM",
  T01S1.IOT_STATE "IOT_STATE",
  T01S1.VOL_1 "VOL_1",
  T01S1.VOL_2 "VOL_2"
FROM TOPIC01_STREAM1 T01S1
WHERE (((T01S1.SECTION = 'E') OR (T01S1.SECTION = 'C')) OR (T01S1.SECTION = 'W'))
EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:         0   total-messages:         2     last-message: 2021-02-04T09:12:35.209Z

(Statistics of the local KSQL server interaction with the Kafka topic topic-11)

作成したストリームとトピックの紐付け情報を確認します。

ksql> show streams;

 Stream Name     | Kafka Topic | Format 
----------------------------------------
 TOPIC01_STREAM1 | topic-01    | JSON   
 TOPIC01_STREAM2 | topic-11    | JSON   
----------------------------------------

KSQLでのストリーミングの確認

「topic01_stream2」でストリーミングされる情報を確認するための設定を行います。

ksql> select * from topic01_stream2 emit changes;

Press CTRL-C to interrupt

↑ Producerからの抽出データの受け取りを待っています。

データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。

$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#

IoTSampleData-v2.py を実行し、生成データを送信します(データ件数:30件)。

root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f43c5e36850>

データ作成件数:30
データ作成時間:0.12617969512939453 [sec]

kqsl のプロンプトにProducerから送信された抽出データが表示されます。

ksql> select * from topic01_stream2 emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|ROWTIME     |ROWKEY      |SECTION     |TIME        |PROC        |IOT_NUM     |IOT_STATE   |VOL_1       |VOL_2       |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|161243152299|2021/02/04  |E           |2021-02-04T1|111         |734-3238    |大分県       |139.02429108|51.183520722|
|2           |            |            |8:38:42.8751|            |            |            |703143      |012844      |
|            |            |            |73          |            |            |            |            |            |
|161243152299|2021/02/04  |W           |2021-02-04T1|111         |409-8822    |高知県       |140.98584152|58.169693933|
|2           |            |            |8:38:42.8753|            |            |            |262225      |88201       |
|            |            |            |73          |            |            |            |            |            |
|161243152299|2021/02/04  |C           |2021-02-04T1|111         |169-2154    |長崎県       |119.11701506|69.181332688|
|3           |            |            |8:38:42.8756|            |            |            |472588      |56029       |
|            |            |            |75          |            |            |            |            |            |

Press CTRL-C to interrupt

これで、Producer上のPythonプログラムで生成したデータが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことを確認できました。

次回について

次回(第5回)は KSQL でストリーミング抽出処理されたデータを Consumer で受け取れることを確認します。

第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?