0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

#2 Azure Kubernetes Service (AKS) で Confluent Platform の Helm Chart を使用してアプリケーションを動かしてみました

Last updated at Posted at 2021-08-30

概要

この記事 を参考にさせていただき、ちょっとだけ理解したつもりの Helm v3 を使って、Confuent Platform の Helm Chart と ACRにあるコンテナイメージ(rabbitmq)から AKS でコンテナアプリケーションを稼働させる手順をまとめました。

ベースとなるDockerコンテナアプリケーションは、この記事達 となり、これをAKS上で稼働させます。
image.png

以下の2つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.AKS上での Confluent Platform 環境構築
STEP-2.AKS上でのコンテナアプリケーションの実行

ローカル環境

macOS Big Sur 11.3
python 3.8.3
helm 3.6.3

事前準備

  1. この記事 を参考にして、AKS と ACR を構築し、AKSクラスター上で動く Node の確認まで完了していること。
  2. この記事 を参考にして、ACRへコンテナイメージをプッシュしておきます。
  3. この記事 にある、「Helm 準備」が完了していること。

Pod への設定

Pod の確認

$ kubectl get pod -n akscp01

NAME                                       READY   STATUS    RESTARTS   AGE
cp600-cp-control-center-5b55c5676f-tn5w9   1/1     Running   3          3m46s
cp600-cp-kafka-0                           2/2     Running   1          3m46s
cp600-cp-kafka-1                           2/2     Running   0          3m8s
cp600-cp-kafka-2                           2/2     Running   0          2m46s
cp600-cp-kafka-connect-764c9bd6cd-n84qw    2/2     Running   3          3m46s
cp600-cp-ksql-server-5948c75b8b-dnfcm      2/2     Running   4          3m46s
cp600-cp-schema-registry-5d79b8c57-r4r5p   2/2     Running   3          3m46s
cp600-cp-zookeeper-0                       2/2     Running   0          3m46s
cp600-cp-zookeeper-1                       2/2     Running   0          3m8s
cp600-cp-zookeeper-2                       2/2     Running   0          2m32s
cp600db-influxdb-5ff9b5cfbc-zkvjb          1/1     Running   0          33m
cp600gf-grafana-59cff8f44b-rwlwx           1/1     Running   0          13m
cp600mq-rabbitmq-0                         1/1     Running   0          6m22s

Kafka topic の作成

## Kafka-client への接続
$ kubectl exec -it kafka-client -n akscp01 -- /bin/bash

## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export ZOOKEEPERS=${RELEASE_NAME}-cp-zookeeper:2181

## topic_201 / topic_202 の作成
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_201 --partitions 3 --replication-factor 1
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_202 --partitions 3 --replication-factor 1

## topic の確認
$ kafka-topics --zookeeper $ZOOKEEPERS --list

KSQL の設定

## Ksql-client への接続
$ kubectl exec -it ksql-client -n akscp01 -- /bin/bash

## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export KSQLDB=${RELEASE_NAME}-cp-ksql-server:8088

## KsqlDB への接続
$ ksql http://$KSQLDB

## topic の確認
ksql> list topics;

 Kafka Topic                   | Partitions | Partition Replicas 
-----------------------------------------------------------------
 cp600-cp-kafka-connect-config | 1          | 3                  
 cp600-cp-kafka-connect-offset | 25         | 3                  
 cp600-cp-kafka-connect-status | 5          | 3                  
 topic_201                     | 3          | 1                  
 topic_202                     | 3          | 1                  
-----------------------------------------------------------------

## stream_201 / stream_202 の作成
ksql> CREATE STREAM stream_201 (id BIGINT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic_201', VALUE_FORMAT='JSON'); 

ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W'; 

## stream の確認
ksql> show streams;

 Stream Name | Kafka Topic | Format 
------------------------------------
 STREAM_201  | topic_201   | JSON   
 STREAM_202  | topic_202   | JSON   
------------------------------------

Connector の設定

ACRのコンテナイメージを使用しているので、rabbitmq / infkuxdb の Connector が Plug-in されているのかを確認する

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

別のターミナルを立ち上げ、RabbitMQSourceConnector と InfluxDBSinkConnector の Plug-in が存在することを確認する

$ curl http://localhost:8083/connector-plugins | jq 

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   793  100   793    0     0   6007      0 --:--:-- --:--:-- --:--:--  5962
[
  {
    "class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "type": "source",
    "version": "0.0.0.0"
  },
  {
    "class": "io.confluent.influxdb.InfluxDBSinkConnector",
    "type": "sink",
    "version": "unknown"
  },
    :
   中略
    :
]

RabbitMQSourceConnector の作成

RabbitMQSourceConnector の定義ファイルは以下となります

RabbitMQSourceConnector.json
{
    "name" : "RabbitMQSourceConnector_1",
    "config" : {
      "connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
      "tasks.max" : "1",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "rabbitmq.host": "cp600mq-rabbitmq",
      "rabbitmq.username": "guest",
      "rabbitmq.password": "guest",
      "rabbitmq.port": "5672",
      "kafka.topic": "topic_201",
      "rabbitmq.queue": "IoTHub"
    }
}

RabbitMQSourceConnector を新規作成します

$ curl -s -X POST -H 'Content-Type: application/json' --data @RabbitMQSourceConnector.json http://localhost:8083/connectors

{"name":"RabbitMQSourceConnector_1","config":{"connector.class":"io.confluent.connect.rabbitmq.RabbitMQSourceConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter","rabbitmq.host":"cp600mq-rabbitmq","rabbitmq.username":"guest","rabbitmq.password":"guest","rabbitmq.port":"5672","kafka.topic":"topic_201","rabbitmq.queue":"IoTHub","name":"RabbitMQSourceConnector_1"},"tasks":[],"type":"source"}

InfluxDBSinkConnector の作成

InfluxDBSinkConnector の定義ファイルは以下となります

InfluxDBSinkConnector.json
{
    "name": "InfluxDBSinkConnector_1",
    "config" : {
        "value.converter.schemas.enable": "false",
        "connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
        "tasks.max": "1",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "topics": "topic_201, topic_202",
        "influxdb.url": "http://cp600db-influxdb:8086",
        "influxdb.db": "IoTSample",
        "measurement.name.format": "${topic}"
    }
}

InfluxDBSinkConnector を新規作成します

$ curl -s -X POST -H 'Content-Type: application/json' --data @InfluxDBSinkConnector.json http://localhost:8083/connectors

{"name":"InfluxDBSinkConnector_1","config":{"value.converter.schemas.enable":"false","connector.class":"io.confluent.influxdb.InfluxDBSinkConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.json.JsonConverter","topics":"topic_201, topic_202","influxdb.url":"http://influxdb:8086","influxdb.db":"IoTSample","measurement.name.format":"${topic}","name":"InfluxDBSinkConnector_1"},"tasks":[],"type":"sink"}

Connector の確認

$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort

sink    |  InfluxDBSinkConnector_1    |  RUNNING  |  RUNNING  |  io.confluent.influxdb.InfluxDBSinkConnector
source  |  RabbitMQSourceConnector_1  |  RUNNING  |  RUNNING  |  io.confluent.connect.rabbitmq.RabbitMQSourceConnector

influxdb の設定

## influxdb への接続
$ kubectl exec -it svc/cp600db-influxdb -n akscp01 -- influx                                       

Connected to http://localhost:8086 version 1.8.5
InfluxDB shell version: 1.8.5
>

## データベース「IoTSample」の作成と確認
> create database IoTSample
> show databases
name: databases
name
----
_internal
IoTSample

grafana の設定

grafana の設定をGUIで行なうために、grafana GUI へのポートフォワーディングの設定をおこないます

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000

※ CTRL+C で終了できます

この記事 にある「Grafana と influxdb の接続設定」と「Grafana のダッシュボードの作成」の設定を行います
ただし、URLの設定値は http://influxdb:8086 ではなく http://cp600db-influxdb:8086 とします

rabbitmq の設定

ACRのコンテナイメージを使用しているので、設定箇所はございません


アプリケーションの動作確認

ポートフォワーディングの設定

コンテナアプリケーションは外部公開を行わないので、外部とのインターフェースとなるコンテナアプリケーションの rabbitmq と grafana にポートフォワーディングの設定を行います

$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000 &
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672 &

アプリケーションの実行

この記事 にある「Grafana ダッシュボードでのリアルタイム可視化」と同様にローカルターミナルから IoTデータ生成プログラムを実行します。1秒間隔で100件のデータを生成させました。

$ python IoTSampleData-v5.py --mode mq --count 100 --wait 1

結果も同様になりました。
image.png


後処理

ポートフォワーディングの停止

## 該当のプロセスを検索します
$ ps -axl | grep kubectl                                                                      
  501 35260  8527     4006   0  31  5  5052880  34332 -      SN                  0 ttys000    0:00.23 kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
  501 35261  8527     4006   0  31  5  5053160  33008 -      SN                  0 ttys000    0:00.20 kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672
  501 35276 18433     4006   0  31  0  4399480    828 -      S+                  0 ttys002    0:00.00 grep kubectl

## 該当のプロセスを kill します
$ kill 35260                                                                                  
$ kill 35261

Pod のアンインストール

$ helm delete cp600gf -n akscp01
$ helm delete cp600db -n akscp01
$ helm delete cp600mq -n akscp01
$ helm delete cp600 -n akscp01

namespace の削除

$ kubectl delete namespace akscp01

まとめ

かなりダサイ方法で強引にコンテナアプリケーションを実装しましたが、時間を見つけて ConfigMap, 各種設定ファイル読込等を駆使しながらオシャレな実装を目指したいと思います


本課題のステップ情報

STEP-1.AKS上での Confluent Platform 環境構築
STEP-2.AKS上でのコンテナアプリケーションの実行

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。
Helm v3のすゝめ
事実上の標準ツールとなっているKubernetes向けデプロイツール「Helm」入門

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?