3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#1 「Confluent + InfluxDB + Grafana 」 で IoTデータをストリーミング処理後に可視化してみました

Last updated at Posted at 2021-07-19

STEP-1 : Dockerコンテナ環境での Confluent Platform の追加構築

概要

Confluent Platform の「cp-all-in-one」 をベースにローカルのDockerコンテナ環境を構築し、IoTデータ生成Pythonプログラムから送信されるデータを Confluent でストリーミング処理し、InfluxDB+Grafanaでリアルタイム可視化できることを確認しました。該当する Sink Connector を使用しています。

以下の絵(の記事) の環境をベースに、
image.png

Consumer側の可視化の構成を追加する形で確認します。
image.png

以下の3つのステップで上記内容を順次説明します。今回は STEP-1 について説明します。
STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

ローカル環境

macOS Big Sur 11.3
python 3.8.3
Docker version 20.10.7, build f0df350 (CPUs:8, Memory:10GB, Swap:1GB)

作業ディレクトリへの移動

  1. ここ の環境をそのまま使用します。
  2. confluentinc/cp-all-in-one をクローンしたディレクトリ配下にある作業ディレクトリに移動します。
$ cd cp-all-in-one
$ cd cp-all-in-one

InfluxDB Sink Connector のインストール

  1. ConfluentのConnectorに「InfluxDB Sink Connector」をインストールするために DockerFileを編集します。
FROM confluentinc/cp-server-connect-base:6.0.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-rabbitmq:latest
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-influxdb:latest

2. ConnectorのDockerイメージをビルドします。

$ docker build -t cp-connect-base .

docker-compose.yml の再定義

  1. 作業ディレクトリにある「docker-compose.yml」を以下のように編集します。
サービス名 変更有無 内容
zookeeper
broker
schema-registry
connect 使用イメージ変更
control-center
ksqldb-server
ksqldb-cli
rabbitmq
influxdb 新規追加
grafana 新規追加
docker-compose.yml
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,OPTIONS'

  connect:
    image: cp-connect-base:latest
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.0.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.org.apache.kafka.connect.runtime.rest=WARN,reflections=ERROR
      CONNECT_HOST: connect
      
  control-center:
    image: confluentinc/cp-enterprise-control-center:6.0.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
      - ksqldb-server
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:6.0.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
      KSQL_AUTO_OFFSET_RESET: "latest"

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:6.0.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  rabbitmq:
    image: rabbitmq:3.8.17-management
    restart: always
    ports:
      - '5672:5672'
      - '15672:15672'
    hostname: rabbitmq
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest

  influxdb:
    image: influxdb:1.8.6
    ports:
      - 8086:8086
    hostname: influxdb
    container_name: influxdb
  
  grafana:
    image: grafana/grafana
    ports:
      - 3000:3000
    hostname: grafana
    container_name: grafana
    environment:
      - GF_SERVER_ROOT_URL=http://grafana:3000
      - GF_INSTALL_PLUGINS=grafana-polystat-panel,bessler-pictureit-panel,marcuscalidus-svg-panel
      - GF_SECURITY_ADMIN_PASSWORD=admin
    depends_on:
      - influxdb

Confluent Platform の起動

  1. -d オプションを使用して Confluent Platform を起動し、デタッチモードで実行します。
$ docker-compose up -d

2. サービスが稼働状態であることを確認します。

$ docker-compose ps

     Name                    Command               State                               Ports                            
------------------------------------------------------------------------------------------------------------------------
broker            /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp,:::9092->9092/tcp,                    
                                                           0.0.0.0:9101->9101/tcp,:::9101->9101/tcp                     
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 9092/tcp           
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp,:::9021->9021/tcp                     
grafana           /run.sh                          Up      0.0.0.0:3000->3000/tcp,:::3000->3000/tcp                     
influxdb          /entrypoint.sh influxd           Up      0.0.0.0:8086->8086/tcp,:::8086->8086/tcp                     
ksqldb-cli        /bin/sh                          Up                                                                   
ksqldb-server     /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp,:::8088->8088/tcp                     
rabbitmq          docker-entrypoint.sh rabbi ...   Up      15671/tcp, 0.0.0.0:15672->15672/tcp,:::15672->15672/tcp,     
                                                           15691/tcp, 15692/tcp, 25672/tcp, 4369/tcp, 5671/tcp,         
                                                           0.0.0.0:5672->5672/tcp,:::5672->5672/tcp                     
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp,:::8081->8081/tcp                     
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp,:::2181->2181/tcp, 2888/tcp, 3888/tcp 

3. Connectorも稼働状態であることを確認します。

$ curl http://localhost:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   793  100   793    0     0  14962      0 --:--:-- --:--:-- --:--:-- 14962
[
  {
    "class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "type": "sink",
    "version": "unknown"
  },
  {
    "class": "io.confluent.influxdb.source.InfluxdbSourceConnector",
    "type": "source",
    "version": "1.2.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "6.0.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "6.0.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

これで正常にDockerコンテナ環境下で Confluent Platform の稼働が確認できました。
次のステップでは、IoTデータ生成プログラムからデータを送信し、Brokerのトピック経由でInfluxDBでデータ受信できることを確認してみます。

本課題のステップ情報

STEP-1.Dockerコンテナ環境での Confluent Platform の追加構築
STEP-2.InfluxDBでのBrokerからのデータ受信確認
STEP-3.InfluxDB経由のGrafanaでのリアルタイムデータ可視化確認

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?