0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#3 「Confluent + RabbitMQ」 で IoTデータを ストリーミング処理してみました

Last updated at Posted at 2021-07-16

STEP-3 : ストリーミング処理後のデータ受信確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータをRabbitMQで受信し、該当する Source Connector を使用し、Confluent でストリーミング処理をできることを確認しました。
image.png

以下の3つのステップで上記内容を順次説明します。今回は STEP-3 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

ストリーミング処理後のデータ受信トピックの作成

  1. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
  2. 画面左側から「Topics」を選択し、新たに表示される「All topics」画面の右側にある「+ Add a topic」ボタンを押します。その後に表示される「New topic」画面の「Topic name」に「topic_202」を入力し、「Create with defaults」ボタンを押します。
    image.png

Producer側の KsqlDB Stream の作成

  1. 続いて、画面左側から「ksqlDB」を選択し、新たに表示される「ksqlDB」画面から「ksqldb1」を選択し、その後に表示される「ksqldb1」画面の上部タグから「Streams」を選択します。切り替わった画面の左側から「Add Stream」ボタンを押します。
    image.png

  2. 新たに表示される「Create a ksqlDB Stream」画面のリストから「topic_201」を選択します。その後に拡張表示される画面の「STREAM name」に「stream_201」を、「Value format」に「JSON」を選択します。それ以外の項目はデフォルト表示のまま、「Save STREAM」ボタンを押します。
    image.png

Producer側 Stream でのデータ受信

  1. ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。
$ python IoTSampleData-v5.py --mode mq --count 5 --wait 1

2. 表示されている「ksqldb1」画面の上部タグから「Flow」を選択し、先程作成した「STREAM_201」を選択します。そうすると、画面右側の「STREAM_201」に、上記で生成したデータが表示されることが確認できます。I
image.png

Consumer側の KsqlDB Stream の作成

  1. KsqlDBのクエリー機能を使用してデータを抽出するには、Confluent Platform の Control-Center では作成できない(?)ので、「ksqldb-cli」経由で「ksqldb-server」を操作します。まずは、「ksqldb-cli」に接続します。
$ docker exec -it ksqldb-cli /bin/bash
[appuser@56b432e8a452 ~]$

2. 「ksqldb-server」に接続します。

[appuser@56b432e8a452 ~]$ ksql http://ksqldb-server:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
                  
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.0.0, Server v6.0.0 located at http://ksqldb-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

3. ストリーミング処理用クエリを含んだストリームを作成します。 作成後の確認も行います。

  • 「stream_201」のストリーミングデータを以下の条件で抽出し、その結果を「topic-202」に送信するストリーム「stream_202」を作成
  • 抽出条件: section='E' OR section='C' OR section='W'
ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';

 Message                                 
-----------------------------------------
 Created query with ID CSAS_STREAM_202_0 
-----------------------------------------
ksql> 
ksql> describe extended stream_202;

Name                 : STREAM_202
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : topic_202 (partitions: 1, replication: 1)
Statement            : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT
  S201.SECTION SECTION,
  S201.TIME ZZTIME,
  S201.PROC PROC,
  S201.IOT_NUM IOT_NUM,
  S201.IOT_STATE IOT_STATE,
  S201.VOL_1 VOL_1,
  S201.VOL_2 VOL_2
FROM STREAM_201 S201
WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W'))
EMIT CHANGES;

 Field     | Type            
-----------------------------
 SECTION   | VARCHAR(STRING) 
 ZZTIME    | VARCHAR(STRING) 
 PROC      | VARCHAR(STRING) 
 IOT_NUM   | VARCHAR(STRING) 
 IOT_STATE | VARCHAR(STRING) 
 VOL_1     | DOUBLE          
 VOL_2     | DOUBLE          
-----------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM_202_0 (RUNNING) : CREATE STREAM STREAM_202 WITH (KAFKA_TOPIC='topic_202', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON') AS SELECT   S201.SECTION SECTION,   S201.TIME ZZTIME,   S201.PROC PROC,   S201.IOT_NUM IOT_NUM,   S201.IOT_STATE IOT_STATE,   S201.VOL_1 VOL_1,   S201.VOL_2 VOL_2 FROM STREAM_201 S201 WHERE (((S201.SECTION = 'E') OR (S201.SECTION = 'C')) OR (S201.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic topic_202)
ksql> 

4. 作成した2つのストリームを確認します。

ksql> show streams;

 Stream Name         | Kafka Topic                 | Format 
------------------------------------------------------------
 KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON   
 STREAM_201          | topic_201                   | JSON   
 STREAM_202          | topic_202                   | JSON   
------------------------------------------------------------
ksql> 

5. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスし、「ksqldb」の「Flow」を確認します。上記で作成したフローが表示されています。
image.png

Consmer側 Stream でのデータ受信

  1. ローカルコンピュータ上で、STEP-2のIoTデータ生成プログラムを実行します。抽出条件を定義しているので、50件のデータを生成します。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1

2. 表示されている画面の「STREAM_202」を選択します。そうすると、画面右側の「STREAM_202」に、抽出されたデータのみが表示されることが確認できます。I
image.png

3. 「topic_202」でも、抽出されたデータのみが表示されることが確認できます。I
image.png

これで、「topic_201」のデータをストリーミング処理(クエリ処理)し、データ抽出結果を「topic_202」で確認できました。

最後に

3つのステップを経て、IoTデータ生成プログラムからデータを送信し、RabbitMQ経由でBrokerの「topic_201」でデータ受信し、ストリーミング処理でデータを抽出できることを確認できました。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の構築
STEP-2.RabbitMQ経由のBrokerでのデータ受信確認
STEP-3.ストリーミング処理後のデータ受信確認

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?