STEP-3 : ストリーミング処理後のデータ受信確認
概要
Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータをRabbitMQで受信し、該当する Source Connector を使用し、Confluent でストリーミング処理をできることを確認しました。
以下の3つのステップで上記内容を順次説明します。今回は STEP-3 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認
ローカル環境
macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)
ストリーミング処理後のデータ受信トピックの作成
- Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
- 画面左側から「Topics」を選択し、新たに表示される「All topics」画面の右側にある「+ Add a topic」ボタンを押します。その後に表示される「New topic」画面の「Topic name」に「topic_202」を入力し、「Create with defaults」ボタンを押します。
Producer側の KsqlDB Stream の作成
-
続いて、画面左側から「ksqlDB」を選択し、新たに表示される「ksqlDB」画面から「ksqldb1」を選択し、その後に表示される「ksqldb1」画面の上部タグから「Streams」を選択します。切り替わった画面の左側から「Add Stream」ボタンを押します。
-
新たに表示される「Create a ksqlDB Stream」画面のリストから「topic_201」を選択します。その後に拡張表示される画面の「STREAM name」に「stream_201」を、「Value format」に「JSON」を選択します。それ以外の項目はデフォルト表示のまま、「Save STREAM」ボタンを押します。
Producer側 Stream でのデータ受信
- ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1
2. 表示されている「ksqldb1」画面の上部タグから「Flow」を選択し、先程作成した「STREAM_201」を選択します。そうすると、画面右側の「STREAM_201」に、上記で生成したデータが表示されることが確認できます。I
Consumer側の KsqlDB Stream の作成
- KsqlDBのクエリー機能を使用してデータを抽出するには、Confluent Platform の Control-Center では作成できない(?)ので、「ksqldb-cli」経由で「ksqldb-server」を操作します。まずは、「ksqldb-cli」に接続します。
$ docker exec -it ksqldb-cli /bin/bash
[appuser@56b432e8a452 ~]$
2. 「ksqldb-server」に接続します。
[appuser@56b432e8a452 ~]$ ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2020 Confluent Inc.
CLI v6.0.0, Server v6.0.0 located at http://ksqldb-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
3. ストリーミング処理用クエリを含んだストリームを作成します。 作成後の確認も行います。
- 「stream_201」のストリーミングデータを以下の条件で抽出し、その結果を「topic-202」に送信するストリーム「stream_202」を作成
- 抽出条件: section='E' OR section='C' OR section='W'
ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';
Message
-----------------------------------------
Created query with ID CSAS_STREAM_202_0
-----------------------------------------
ksql>
ksql> describe extended stream_202;
Name : STREAM_202
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON
Kafka topic : topic_202 (partitions: 1, replication: 1)
Statement : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
S201.SECTION SECTION,
S201.TIME ZZTIME,
S201.PROC PROC,
S201.IOT_NUM IOT_NUM,
S201.IOT_STATE IOT_STATE,
S201.VOL_1 VOL_1,
S201.VOL_2 VOL_2
FROM STREAM_201 S201
WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W'))
EMIT CHANGES;
Field | Type
-----------------------------
SECTION | VARCHAR(STRING)
ZZTIME | VARCHAR(STRING)
PROC | VARCHAR(STRING)
IOT_NUM | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VOL_1 | DOUBLE
VOL_2 | DOUBLE
-----------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_STREAM_202_0 (RUNNING) : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT S201.SECTION SECTION, S201.TIME ZZTIME, S201.PROC PROC, S201.IOT_NUM IOT_NUM, S201.IOT_STATE IOT_STATE, S201.VOL_1 VOL_1, S201.VOL_2 VOL_2 FROM STREAM_201 S201 WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic_202)
ksql>
4. 作成した2つのストリームを確認します。
ksql> show streams;
Stream Name | Kafka Topic | Format
------------------------------------------------------------
KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON
STREAM_201 | topic_201 | JSON
STREAM_202 | topic_202 | JSON
------------------------------------------------------------
ksql>
5. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスし、「ksqldb」の「Flow」を確認します。上記で作成したフローが表示されています。
Consmer側 Stream でのデータ受信
- ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。抽出条件を定義しているので、50件のデータを生成します。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1
2. 表示されている画面の「STREAM_202」を選択します。そうすると、画面右側の「STREAM_202」に、抽出されたデータのみが表示されることが確認できます。I
3. 「topic_202」でも、抽出されたデータのみが表示されることが確認できます。I
これで、「topic_201」のデータをストリーミング処理(クエリ処理)し、データ抽出結果を「topic_202」で確認できました。
最後に
3つのステップを経て、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信し、ストリーミング処理でデータを抽出できることを確認できました。
本課題のステップ情報
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認