概要
この記事 を参考にさせていただき、ちょっとだけ理解したつもりの Helm v3 を使って、Confuent Platform の Helm Chart と ACRにあるコンテナイメージ(rabbitmq)から AKS でコンテナアプリケーションを稼働させる手順をまとめました。
ベースとなるDockerコンテナアプリケーションは、この記事達 となり、これをAKS上で稼働させます。
以下の2つのステップで上記内容を順次説明します。今回は STEP-2 について説明します。
STEP-1.AKS上での Confluent Platform 環境構築
STEP-2.AKS上でのコンテナアプリケーションの実行
ローカル環境
macOS Big Sur 11.3
python 3.8.3
helm 3.6.3
事前準備
- この記事 を参考にして、AKS と ACR を構築し、AKSクラスター上で動く Node の確認まで完了していること。
- この記事 を参考にして、ACRへコンテナイメージをプッシュしておきます。
- この記事 にある、「Helm 準備」が完了していること。
Pod への設定
Pod の確認
$ kubectl get pod -n akscp01
NAME READY STATUS RESTARTS AGE
cp600-cp-control-center-5b55c5676f-tn5w9 1/1 Running 3 3m46s
cp600-cp-kafka-0 2/2 Running 1 3m46s
cp600-cp-kafka-1 2/2 Running 0 3m8s
cp600-cp-kafka-2 2/2 Running 0 2m46s
cp600-cp-kafka-connect-764c9bd6cd-n84qw 2/2 Running 3 3m46s
cp600-cp-ksql-server-5948c75b8b-dnfcm 2/2 Running 4 3m46s
cp600-cp-schema-registry-5d79b8c57-r4r5p 2/2 Running 3 3m46s
cp600-cp-zookeeper-0 2/2 Running 0 3m46s
cp600-cp-zookeeper-1 2/2 Running 0 3m8s
cp600-cp-zookeeper-2 2/2 Running 0 2m32s
cp600db-influxdb-5ff9b5cfbc-zkvjb 1/1 Running 0 33m
cp600gf-grafana-59cff8f44b-rwlwx 1/1 Running 0 13m
cp600mq-rabbitmq-0 1/1 Running 0 6m22s
Kafka topic の作成
## Kafka-client への接続
$ kubectl exec -it kafka-client -n akscp01 -- /bin/bash
## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export ZOOKEEPERS=${RELEASE_NAME}-cp-zookeeper:2181
## topic_201 / topic_202 の作成
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_201 --partitions 3 --replication-factor 1
$ kafka-topics --zookeeper $ZOOKEEPERS --create --topic topic_202 --partitions 3 --replication-factor 1
## topic の確認
$ kafka-topics --zookeeper $ZOOKEEPERS --list
KSQL の設定
## Ksql-client への接続
$ kubectl exec -it ksql-client -n akscp01 -- /bin/bash
## 環境変数の定義
$ export RELEASE_NAME=cp600
$ export KSQLDB=${RELEASE_NAME}-cp-ksql-server:8088
## KsqlDB への接続
$ ksql http://$KSQLDB
## topic の確認
ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------------------------
cp600-cp-kafka-connect-config | 1 | 3
cp600-cp-kafka-connect-offset | 25 | 3
cp600-cp-kafka-connect-status | 5 | 3
topic_201 | 3 | 1
topic_202 | 3 | 1
-----------------------------------------------------------------
## stream_201 / stream_202 の作成
ksql> CREATE STREAM stream_201 (id BIGINT, time VARCHAR, proc VARCHAR, section VARCHAR, iot_num VARCHAR, iot_state VARCHAR, vol_1 DOUBLE, vol_2 DOUBLE) WITH (KAFKA_TOPIC = 'topic_201', VALUE_FORMAT='JSON');
ksql> CREATE STREAM stream_202 WITH (KAFKA_TOPIC = 'topic_202', VALUE_FORMAT='JSON') AS SELECT s201.section as section, s201.time as zztime, s201.proc as proc, s201.iot_num as iot_num, s201.iot_state as iot_state, s201.vol_1 as vol_1, s201.vol_2 as vol_2 FROM stream_201 s201 WHERE section='E' OR section='C' OR section='W';
## stream の確認
ksql> show streams;
Stream Name | Kafka Topic | Format
------------------------------------
STREAM_201 | topic_201 | JSON
STREAM_202 | topic_202 | JSON
------------------------------------
Connector の設定
ACRのコンテナイメージを使用しているので、rabbitmq / infkuxdb の Connector が Plug-in されているのかを確認する
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
別のターミナルを立ち上げ、RabbitMQSourceConnector と InfluxDBSinkConnector の Plug-in が存在することを確認する
$ curl http://localhost:8083/connector-plugins | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 793 100 793 0 0 6007 0 --:--:-- --:--:-- --:--:-- 5962
[
{
"class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"type": "source",
"version": "0.0.0.0"
},
{
"class": "io.confluent.influxdb.InfluxDBSinkConnector",
"type": "sink",
"version": "unknown"
},
:
中略
:
]
RabbitMQSourceConnector の作成
RabbitMQSourceConnector の定義ファイルは以下となります
{
"name" : "RabbitMQSourceConnector_1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"rabbitmq.host": "cp600mq-rabbitmq",
"rabbitmq.username": "guest",
"rabbitmq.password": "guest",
"rabbitmq.port": "5672",
"kafka.topic": "topic_201",
"rabbitmq.queue": "IoTHub"
}
}
RabbitMQSourceConnector を新規作成します
$ curl -s -X POST -H 'Content-Type: application/json' --data @RabbitMQSourceConnector.json http://localhost:8083/connectors
{"name":"RabbitMQSourceConnector_1","config":{"connector.class":"io.confluent.connect.rabbitmq.RabbitMQSourceConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter","rabbitmq.host":"cp600mq-rabbitmq","rabbitmq.username":"guest","rabbitmq.password":"guest","rabbitmq.port":"5672","kafka.topic":"topic_201","rabbitmq.queue":"IoTHub","name":"RabbitMQSourceConnector_1"},"tasks":[],"type":"source"}
InfluxDBSinkConnector の作成
InfluxDBSinkConnector の定義ファイルは以下となります
{
"name": "InfluxDBSinkConnector_1",
"config" : {
"value.converter.schemas.enable": "false",
"connector.class": "io.confluent.influxdb.InfluxDBSinkConnector",
"tasks.max": "1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics": "topic_201, topic_202",
"influxdb.url": "http://cp600db-influxdb:8086",
"influxdb.db": "IoTSample",
"measurement.name.format": "${topic}"
}
}
InfluxDBSinkConnector を新規作成します
$ curl -s -X POST -H 'Content-Type: application/json' --data @InfluxDBSinkConnector.json http://localhost:8083/connectors
{"name":"InfluxDBSinkConnector_1","config":{"value.converter.schemas.enable":"false","connector.class":"io.confluent.influxdb.InfluxDBSinkConnector","tasks.max":"1","value.converter":"org.apache.kafka.connect.json.JsonConverter","topics":"topic_201, topic_202","influxdb.url":"http://influxdb:8086","influxdb.db":"IoTSample","measurement.name.format":"${topic}","name":"InfluxDBSinkConnector_1"},"tasks":[],"type":"sink"}
Connector の確認
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | InfluxDBSinkConnector_1 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
source | RabbitMQSourceConnector_1 | RUNNING | RUNNING | io.confluent.connect.rabbitmq.RabbitMQSourceConnector
influxdb の設定
## influxdb への接続
$ kubectl exec -it svc/cp600db-influxdb -n akscp01 -- influx
Connected to http://localhost:8086 version 1.8.5
InfluxDB shell version: 1.8.5
>
## データベース「IoTSample」の作成と確認
> create database IoTSample
> show databases
name: databases
name
----
_internal
IoTSample
grafana の設定
grafana の設定をGUIで行なうために、grafana GUI へのポートフォワーディングの設定をおこないます
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
Forwarding from 127.0.0.1:3000 -> 3000
Forwarding from [::1]:3000 -> 3000
※ CTRL+C で終了できます
この記事 にある「Grafana と influxdb の接続設定」と「Grafana のダッシュボードの作成」の設定を行います
ただし、URLの設定値は http://influxdb:8086 ではなく http://cp600db-influxdb:8086 とします
rabbitmq の設定
ACRのコンテナイメージを使用しているので、設定箇所はございません
アプリケーションの動作確認
ポートフォワーディングの設定
コンテナアプリケーションは外部公開を行わないので、外部とのインターフェースとなるコンテナアプリケーションの rabbitmq と grafana にポートフォワーディングの設定を行います
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000 &
$ kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672 &
アプリケーションの実行
この記事 にある「Grafana ダッシュボードでのリアルタイム可視化」と同様にローカルターミナルから IoTデータ生成プログラムを実行します。1秒間隔で100件のデータを生成させました。
$ python IoTSampleData-v5.py --mode mq --count 100 --wait 1
後処理
ポートフォワーディングの停止
## 該当のプロセスを検索します
$ ps -axl | grep kubectl
501 35260 8527 4006 0 31 5 5052880 34332 - SN 0 ttys000 0:00.23 kubectl port-forward --address localhost --namespace akscp01 svc/cp600gf-grafana 3000:3000
501 35261 8527 4006 0 31 5 5053160 33008 - SN 0 ttys000 0:00.20 kubectl port-forward --address localhost --namespace akscp01 svc/cp600mq-rabbitmq 5672:5672
501 35276 18433 4006 0 31 0 4399480 828 - S+ 0 ttys002 0:00.00 grep kubectl
## 該当のプロセスを kill します
$ kill 35260
$ kill 35261
Pod のアンインストール
$ helm delete cp600gf -n akscp01
$ helm delete cp600db -n akscp01
$ helm delete cp600mq -n akscp01
$ helm delete cp600 -n akscp01
namespace の削除
$ kubectl delete namespace akscp01
まとめ
かなりダサイ方法で強引にコンテナアプリケーションを実装しましたが、時間を見つけて ConfigMap, 各種設定ファイル読込等を駆使しながらオシャレな実装を目指したいと思います
本課題のステップ情報
STEP-1.AKS上での Confluent Platform 環境構築
STEP-2.AKS上でのコンテナアプリケーションの実行
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
Helm v3のすゝめ
事実上の標準ツールとなっているKubernetes向けデプロイツール「Helm」入門