概要
Kafkaに魅せられて、とりあえず手元のマシンで動かしてみましょうか、、、と、インフラしか知らないSEがMacBookProを新たに購入し、Qiita等にアップされている諸先輩方の記事を参考にさせていただき、動作確認したまでの手順を数回に分け記載しています。
なお、Kafkaの概要につきましたは、こちらの記事 を参照ください。
第4回は、第3回をベースに Kafka の KSQL を使用し、Producer から送られてくるデータを とある条件で抽出してみます。
簡単に絵を描くとこんな感じになります。
実行環境
macOS Big Sur 11.1
Docker version 20.10.2, build 2291f61
Python 3.8.3
KSQLコンテナの作成
第1回の docker-compose.yml に ksql-server(KSQLのサーバ) と ksql-cli(KSQLのクライアント) コンテナ定義を追加します。
version: "3"
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "32181:32181"
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:32181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
cli:
image: confluentinc/cp-kafka:5.5.1
hostname: cli
container_name: cli
depends_on:
- broker
entrypoint: /bin/sh
tty: true
ksql-server:
image: confluentinc/cp-ksql-server:5.4.3
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "IoT-demo-1"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_AUTO_OFFSET_RESET: "earliest"
ksql-cli:
image: confluentinc/cp-ksql-cli:5.4.3
container_name: ksql-cli
volumes:
- $PWD/ksql.commands:/tmp/ksql.commands
depends_on:
- broker
- ksql-server
entrypoint: /bin/sh
tty: true
networks:
default:
external:
name: iot_network
コンテナの作成と確認
定義したコンテナをビルドして起動させます。
$ docker-compose up -d
前略
zookeeper is up-to-date
broker is up-to-date
cli is up-to-date
Creating ksql-server ... done
Creating ksql-cli ... done
起動しているコンテナを確認します。
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
cli /bin/sh Up 9092/tcp
ksql-cli /bin/sh Up
ksql-server /etc/confluent/docker/run Up (healthy) 0.0.0.0:8088->8088/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 0.0.0.0:32181->32181/tcp, 3888/tcp
抽出データをストリーミングするためのTopicの作成
broker に接続します。
$ docker exec -it broker /bin/bash
root@broker:/#
抽出データをストリーミングするための topic 「topic-11」を作成します。
root@broker:/# kafka-topics --bootstrap-server broker:9092 --create --topic topic-11 --partitions 3 replication-factor 1
Created topic topic-11.
作成された topic を確認します。
root@broker:/# kafka-topics --bootstrap-server broker:9092 --describe --topic topic-11
Topic: topic-11 PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: topic-11 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-11 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: topic-11 Partition: 2 Leader: 1 Replicas: 1 Isr: 1
KSQLでのストリーミングの作成
ksql-cli に接続します。
$ docker exec -it ksql-cli /bin/bash
root@35620515e9f1:/#
ksql-cli から ksql-server に接続します。
root@35620515e9f1:/# ksql http://ksql-server:8088
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2019 Confluent Inc.
CLI v5.4.3, Server v5.4.3 located at http://ksql-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
Producer から送られてくるデータ(topic-01)のストリーム「topic01_stream1」を作成します。
ksql> CREATE STREAM topic01_stream1 (id INT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic-01', VALUE_FORMAT='JSON', KEY='section');
Message
----------------
Stream created
----------------
作成された「topic01_stream1」の情報を確認します。
ksql> describe extended topic01_stream1;
Name : TOPIC01_STREAM1
Type : STREAM
Key field : SECTION
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : topic-01 (partitions: 3, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | INTEGER
TIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
SECTION | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
---------------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic-01)
「topic01_stream1」のストリーミングデータを以下の条件で抽出し、その結果を topic-11 に送信するストリーム「topic01_stream2」を作成します。
抽出条件: section='E' OR section='C' OR section='W'
ksql> CREATE STREAM topic01_stream2 WITH (KAFKA_TOPIC = 'topic-11', VALUE_FORMAT='JSON') AS SELECT t01s1.section as section, t01s1.time as time, t01s1.proc as proc, t01s1.iot_num as iot_num, t01s1.iot_state as iot_state, t01s1.vol_1 as vol_1, t01s1.vol_2 as vol_2 FROM topic01_stream1 t01s1 WHERE section='E' OR section='C' OR section='W';
Message
----------------------------------------------------------------------------------------------------
Stream TOPIC01_STREAM2 created and running. Created by query with query ID: CSAS_TOPIC01_STREAM2_1
----------------------------------------------------------------------------------------------------
作成された「topic01_stream2」の情報を確認します。
ksql> describe extended topic01_stream2;
Name : TOPIC01_STREAM2
Type : STREAM
Key field : SECTION
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : topic-11 (partitions: 3, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
SECTION | VARCHAR(STRING)
TIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
---------------------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_TOPIC01_STREAM2_1 : CREATE STREAM TOPIC01_STREAM2 WITH (KAFKA_TOPIC='topic-11', PARTITIONS=3, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
T01S1.SECTION "SECTION",
T01S1.TIME "TIME",
T01S1.PROC "PROC",
T01S1.IOT_NUM "IOT_NUM",
T01S1.IOT_STATE "IOT_STATE",
T01S1.VOL_1 "VOL_1",
T01S1.VOL_2 "VOL_2"
FROM TOPIC01_STREAM1 T01S1
WHERE (((T01S1.SECTION = 'E') OR (T01S1.SECTION = 'C')) OR (T01S1.SECTION = 'W'))
EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0 total-messages: 2 last-message: 2021-02-04T09:12:35.209Z
(Statistics of the local KSQL server interaction with the Kafka topic topic-11)
作成したストリームとトピックの紐付け情報を確認します。
ksql> show streams;
Stream Name | Kafka Topic | Format
----------------------------------------
TOPIC01_STREAM1 | topic-01 | JSON
TOPIC01_STREAM2 | topic-11 | JSON
----------------------------------------
KSQLでのストリーミングの確認
「topic01_stream2」でストリーミングされる情報を確認するための設定を行います。
ksql> select * from topic01_stream2 emit changes;
Press CTRL-C to interrupt
↑ Producerからの抽出データの受け取りを待っています。
データを送信するために別ターミナルを立ち上げ、Producer に接続し、プログラムのあるディレクトリに移動します。
$ docker exec -it iotsampledata_iot_1 /bin/bash
root@4e4f79c219e1:/app#
root@4e4f79c219e1:/app# cd opt
root@4e4f79c219e1:/app/opt#
IoTSampleData-v2.py を実行し、生成データを送信します(データ件数:30件)。
root@4e4f79c219e1:/app/opt# python IoTSampleData-v2.py --mode kf --count 30
Kafka 出力
<kafka.producer.future.FutureRecordMetadata object at 0x7f43c5e36850>
データ作成件数:30
データ作成時間:0.12617969512939453 [sec]
kqsl のプロンプトにProducerから送信された抽出データが表示されます。
ksql> select * from topic01_stream2 emit changes;
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|ROWTIME |ROWKEY |SECTION |TIME |PROC |IOT_NUM |IOT_STATE |VOL_1 |VOL_2 |
+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|161243152299|2021/02/04 |E |2021-02-04T1|111 |734-3238 |大分県 |139.02429108|51.183520722|
|2 | | |8:38:42.8751| | | |703143 |012844 |
| | | |73 | | | | | |
|161243152299|2021/02/04 |W |2021-02-04T1|111 |409-8822 |高知県 |140.98584152|58.169693933|
|2 | | |8:38:42.8753| | | |262225 |88201 |
| | | |73 | | | | | |
|161243152299|2021/02/04 |C |2021-02-04T1|111 |169-2154 |長崎県 |119.11701506|69.181332688|
|3 | | |8:38:42.8756| | | |472588 |56029 |
| | | |75 | | | | | |
Press CTRL-C to interrupt
これで、Producer上のPythonプログラムで生成したデータが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことを確認できました。
次回について
次回(第5回)は KSQL でストリーミング抽出処理されたデータを Consumer で受け取れることを確認します。
第1回:Kafka基本コンポーネントをローカルのDocker環境で稼働させる
第2回:Kafkaの Producerから送信されたメッセージが Broker を経由して Consumer で受け取れることの確認
第3回:Producer上のPythonプログラムで生成したデータを Broker を経由して Consumer で受け取れることの確認
第4回:Producer上での生成データが topic-01 を経由して KSQL(topic01_stream1 → topic01_stream2) でストリーミング抽出処理されたことの確認
第5回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
第6回:Producer上での生成データが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムによってS3に書き込めることの確認
第7回:2つのProducerコンテナ上で生成したそれぞれのデータが topic-01 → Ksql → topic-11 を経由して Consumer 上のPythonプログラムで受け取れることの確認
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
Kafka の Docker のチュートリアル
Kafka からKSQLまで Dockerで簡単環境構築