STEP-2 : InfluxDBでのBrokerからデータ受信確認
概要
Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータを Confluent でストリーミング処理し、InfluxDB+Grafanaでリアルタイム可視化できることを確認しました。該当する Sink Connector を使用しています。
以下の絵(の記事) の環境をベースに、
以下の3つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認
ローカル環境
macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)
influxdb の設定
- STEP-1 で作成したローカルDocker環境に対して、ローカルターミナルから「influxdb」に接続し情報を確認します。
$ docker exec -it influxdb /bin/bash
root@influxdb:/#
root@influxdb:/# uname -srn
Linux influxdb 5.10.25-linuxkit
root@influxdb:/#
root@influxdb:/# influxd version
InfluxDB v1.8.6 (git: 1.8 v1.8.6)
root@influxdb:/#
root@influxdb:/# influx -version
InfluxDB shell version: 1.8.6
root@influxdb:/#
root@influxdb:/# exit
exit
2. 「influxdb」にデータベース「IoTSample」を作成します。
$ docker exec -it influxdb influx
Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
>
> create database IoTSample
>
> show databases
name: databases
name
----
_internal
IoTSample
>
コネクタの作成
-
Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。
-
続いて、画面左側から「Connect」を選択し、新たに表示される「All Connect Clusters」画面から「connect-default」を選択し、その後に表示される「Connectors」画面右側から「+ Add connector」ボタンを押します。
-
その後に表示される「Add Connector」画面の「topics」項目で「topic_201」と「topic_202」を選択し、続く「name」項目に「InfluxDBSinkConnector_1」というコネクタ名を入力します。
-
コネクタ名を入力すると、入力項目が増えますので、以下の内容を入力し、最後に「Continu」ボタンを押します。
カテゴリ | 入力項目 | 値 |
---|---|---|
Which topics do you want to get data from? | topics | topic_201, topic_202 |
How should we connect to your data? | name | InfluxDBSinkConnector_1 |
Common | tasks.max | 1 |
Common | Value converter class | org.apache.kafka.connect.json.JsonConverter |
InfluxDB | InfluxDB API URL* | http://influxdb:8086 |
Write | InfluxDB Database | IoTSample |
Write | Measurement Name Format | ${topic} |
Additional Properties | value.converter.schemas.enable | false |
6. 設定内容に問題なければ、「Launch」ボタンを押します。
7. Connecotrs画面で作成したConnector「InfluxDBSinkConnector_1」が正常に起動していることを確認します。
InfluxDBでの確認
- ローカルターミナルから「influxdb」に接続し、データベース「IoTSample」にメジャーメント(テーブル)「topic_201」と「topic_202」が自動作成されていることを確認します。
$ docker exec -it influxdb influx
Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
>
> use IoTSample
Using database IoTSample
>
> show measurements
name: measurements
name
----
topic_201
topic_202
>
2. ローカルコンピュータ上で、ここにある IoTデータ生成プログラムを実行します。50件のデータを生成します。
$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1
3. 次に各メジャーメント(テーブル)にデータが Write されているか確認します。
>
> select * from topic_201
name: topic_201
time id iot_num iot_state proc section vol_1 vol_2
---- -- ------- --------- ---- ------- ----- -----
1626241925549000000 0 226-2461 島根県 111 M 198.1845397767455 57.268514136837176
1626241926554000000 1 145-2621 宮崎県 111 A 121.70584421224085 75.71752774135611
1626241927559000000 2 852-7187 熊本県 111 A 126.41080104674175 72.15478589353621
:
:
:
1626408886875000000 47 440-1826 兵庫県 111 J 116.80492255443481 58.16002504591993
1626408887881000000 48 238-2420 長野県 111 Q 168.35017898970486 68.57868248992546
1626408888886000000 49 175-8107 高知県 111 S 132.90782936265464 86.22280607722584
>
>
> select * from topic_202
name: topic_202
time IOT_NUM IOT_STATE PROC SECTION VOL_1 VOL_2 ZZTIME
---- ------- --------- ---- ------- ----- ----- ------
1626408840741000000 327-9411 埼玉県 111 C 127.4471407715012 71.60478903611397 2021-07-16T13:13:59.693142
1626408848776000000 910-2198 兵庫県 111 E 194.46476675784402 70.33439224608537 2021-07-16T13:13:59.693268
1626408863809000000 134-0246 北海道 111 E 123.3766530207224 61.78121028564689 2021-07-16T13:13:59.693468
1626408876870000000 214-7209 埼玉県 111 C 101.40696510178657 88.97291678787539 2021-07-16T13:13:59.693639
1626408878878000000 103-2715 福島県 111 C 160.6706712489061 53.167263544338155 2021-07-16T13:13:59.693665
>
これで正常にBrokerのトピック経由でInfluxDBでデータ受信できることを確認できました。
次のステップでは、このメジャーメント(テーブル)のデータを Grafana でリアルタイム可視化できることを確認してみます。
本課題のステップ情報
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認