0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#2 「Confluent + InfluxDB + Grafana 」 で IoTデータをストリーミング処理後に可視化してみました

Last updated at Posted at 2021-07-19

STEP-2 : InfluxDBでのBrokerからデータ受信確認

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータを Confluent でストリーミング処理し、InfluxDB+Grafanaでリアルタイム可視化できることを確認しました。該当する Sink Connector を使用しています。

以下の絵(の記事) の環境をベースに、
image.png

Consumer側の可視化の構成を追加する形で確認します。
image.png

以下の3つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

influxdb の設定

  1. STEP-1 で作成したローカルDocker環境に対して、ローカルターミナルから「influxdb」に接続し情報を確認します。
$ docker exec -it influxdb /bin/bash

root@influxdb:/# 
root@influxdb:/# uname -srn
Linux influxdb 5.10.25-linuxkit
root@influxdb:/# 
root@influxdb:/# influxd version
InfluxDB v1.8.6 (git: 1.8 v1.8.6)
root@influxdb:/# 
root@influxdb:/# influx -version
InfluxDB shell version: 1.8.6
root@influxdb:/# 
root@influxdb:/# exit
exit

2. 「influxdb」にデータベース「IoTSample」を作成します。

$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> create database IoTSample
> 
> show databases
name: databases
name
----
_internal
IoTSample
> 

コネクタの作成

  1. Confluent Platform の Control-Center へブラウザから http://localhost:9021 でアクセスします。

  2. 続いて、画面左側から「Connect」を選択し、新たに表示される「All Connect Clusters」画面から「connect-default」を選択し、その後に表示される「Connectors」画面右側から「+ Add connector」ボタンを押します。
    image.png

  3. 新たに表示される「Browse」画面から「InfluxDBSinkConnector」を選択します。
    image.png

  4. その後に表示される「Add Connector」画面の「topics」項目で「topic_201」と「topic_202」を選択し、続く「name」項目に「InfluxDBSinkConnector_1」というコネクタ名を入力します。
    image.png

  5. コネクタ名を入力すると、入力項目が増えますので、以下の内容を入力し、最後に「Continu」ボタンを押します。

カテゴリ 入力項目
Which topics do you want to get data from? topics topic_201, topic_202
How should we connect to your data? name InfluxDBSinkConnector_1
Common tasks.max 1
Common Value converter class org.apache.kafka.connect.json.JsonConverter
InfluxDB InfluxDB API URL* http://influxdb:8086
Write InfluxDB Database IoTSample
Write Measurement Name Format ${topic}
Additional Properties value.converter.schemas.enable false

6. 設定内容に問題なければ、「Launch」ボタンを押します。
image.png

7. Connecotrs画面で作成したConnector「InfluxDBSinkConnector_1」が正常に起動していることを確認します。
image.png

InfluxDBでの確認

  1. ローカルターミナルから「influxdb」に接続し、データベース「IoTSample」にメジャーメント(テーブル)「topic_201」と「topic_202」が自動作成されていることを確認します。
$ docker exec -it influxdb influx

Connected to http://localhost:8086 version 1.8.6
InfluxDB shell version: 1.8.6
> 
> use IoTSample
Using database IoTSample
> 
> show measurements
name: measurements
name
----
topic_201
topic_202
> 

2. ローカルコンピュータ上で、ここにある IoTデータ生成プログラムを実行します。50件のデータを生成します。

$ python IoTSampleData-v5.py --mode mq --count 50 --wait 1

3. 次に各メジャーメント(テーブル)にデータが Write されているか確認します。

> 
> select * from topic_201
name: topic_201
time                id iot_num  iot_state proc section vol_1              vol_2
----                -- -------  --------- ---- ------- -----              -----
1626241925549000000 0  226-2461 島根県       111  M       198.1845397767455  57.268514136837176
1626241926554000000 1  145-2621 宮崎県       111  A       121.70584421224085 75.71752774135611
1626241927559000000 2  852-7187 熊本県       111  A       126.41080104674175 72.15478589353621
    :
    :
    :
1626408886875000000 47 440-1826 兵庫県       111  J       116.80492255443481 58.16002504591993
1626408887881000000 48 238-2420 長野県       111  Q       168.35017898970486 68.57868248992546
1626408888886000000 49 175-8107 高知県       111  S       132.90782936265464 86.22280607722584
> 
> 
> select * from topic_202
name: topic_202
time                IOT_NUM  IOT_STATE PROC SECTION VOL_1              VOL_2              ZZTIME
----                -------  --------- ---- ------- -----              -----              ------
1626408840741000000 327-9411 埼玉県       111  C       127.4471407715012  71.60478903611397  2021-07-16T13:13:59.693142
1626408848776000000 910-2198 兵庫県       111  E       194.46476675784402 70.33439224608537  2021-07-16T13:13:59.693268
1626408863809000000 134-0246 北海道       111  E       123.3766530207224  61.78121028564689  2021-07-16T13:13:59.693468
1626408876870000000 214-7209 埼玉県       111  C       101.40696510178657 88.97291678787539  2021-07-16T13:13:59.693639
1626408878878000000 103-2715 福島県       111  C       160.6706712489061  53.167263544338155 2021-07-16T13:13:59.693665
>

これで正常にBrokerのトピック経由でInfluxDBでデータ受信できることを確認できました。
次のステップでは、このメジャーメント(テーブル)のデータを Grafana でリアルタイム可視化できることを確認してみます。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?