概要
Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。
CFK を使用して AKS上にデプロイされている Confluent Platform の KSQL に Stream を作成するためのワークフローの概要は以下のとおりです。
- Kubernetes 環境を準備します(事前準備で完了)
- Confluent for Kubernetes をデプロイします(事前準備で完了)
- Confluent Platform をデプロイ(必要な Connector Plugin 含む)します(事前準備で完了)
- Confluent Platform の追加デプロイ(topic作成)をします(事前準備で完了)
- Confluent Platform の追加デプロイ(Sourec/Sink Connector作成)をします(事前準備で完了)
- Confluent Platform の Stream のための追加構成をします
- Confluent Platform の Stream のための追加デプロイをします
ローカル環境
- macOS Monterey 12.3.1
- python 3.8.12
- Azure CLI 2.34.1
- helm v3.6.3
- kubectl v1.21.3
事前準備
- この記事 を実行して、AKSクラスタ環境が構築されていること
- この記事 を編集実行して、Azure上に CosmosDB が これと同様のデータベース名とコンテナ名で構築されていてること
-
この記事 を参考にして、AKS 上の Confluent Platform が以下の図のような構成でデプロイされていること
Confluent Platform の追加構成
Confluent Platform の構成変更
いままで Confluent Platform を設定定義していた「confluent_platform_ccc.yaml」ファイルに対し、KSQL REST API(ksqldb-cli) を利用するために「kind: KsqlDB」に「spec: - configOverrides: - server:」部分を追加定義します。
apiVersion: platform.confluent.io/v1beta1
kind: KsqlDB
metadata:
name: ksqldb
namespace: akscfk231
spec:
replicas: 1
image:
application: confluentinc/cp-ksqldb-server:7.1.0
init: confluentinc/confluent-init-container:2.3.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081
- listeners=http://ksqldb.akscfk231.svc.cluster.local:8088
Confluent Platform への追加デプロイ
Confluent Platform の ksqldb への追加設定の反映
上記で追加定義した「confluent_platform_ccc.yaml」を再度 apply します。ksqldb のみ configured として設定追加されます。
$ kubectl apply -f confluent_platform_ccc.yaml
zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect unchanged
ksqldb.platform.confluent.io/ksqldb configured # <--- ここだけ configured
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged
デプロイされた Confluent Platform リソースの確認
ksqldb の Configmap 情報
## Configmap 一覧の取得
$ kubectl get configmap
NAME DATA AGE
connect-init-config 3 26m
connect-shared-config 5 26m
controlcenter-init-config 3 23m
controlcenter-shared-config 6 23m
kafka-init-config 3 24m
kafka-shared-config 5 24m
kafkarestproxy-init-config 3 23m
kafkarestproxy-shared-config 4 23m
ksqldb-init-config 3 23m
ksqldb-shared-config 5 23m
kube-root-ca.crt 1 29m
schemaregistry-init-config 3 23m
schemaregistry-shared-config 4 23m
zookeeper-init-config 3 26m
zookeeper-shared-config 6 26m
## ksqldb の Configmap 情報
$ kubectl describe configmap ksqldb-shared-config
Name: ksqldb-shared-config
Namespace: akscfk231
Labels: app.kubernetes.io/managed-by=confluent-operator
confluent-platform=true
cr-name=ksqldb
type=ksqldb
Annotations: platform.confluent.io/cr-name: ksqldb
platform.confluent.io/last-applied:
eyJkYXRhIjp7ImRpc2stdXNhZ2UtYWdlbnQucHJvcGVydGllcyI6ImRpc2suZGF0YT0vbW50L2RhdGEvZGF0YVxuc2VydmljZS5uYW1lPWtzcWxkYlxuIiwiam14LWV4cG9ydGVyLn...
platform.confluent.io/namespace: akscfk231
platform.confluent.io/type: ksqldb
Data
====
jvm.config:
----
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=7203
-Dcom.sun.management.jmxremote.rmi.port=7203
-Dcom.sun.management.jmxremote.ssl=false
-Djava.awt.headless=true
-Djdk.tls.ephemeralDHKeySize=2048
-Djdk.tls.server.enableSessionTicketExtension=false
-XX:+ExplicitGCInvokesConcurrent
-XX:+PrintFlagsFinal
-XX:+UnlockDiagnosticVMOptions
-XX:+UseG1GC
-XX:ConcGCThreads=1
-XX:G1HeapRegionSize=16
-XX:InitiatingHeapOccupancyPercent=35
-XX:MaxGCPauseMillis=20
-XX:MaxMetaspaceFreeRatio=80
-XX:MetaspaceSize=96m
-XX:MinMetaspaceFreeRatio=50
-XX:ParallelGCThreads=1
-server
ksqldb.properties:
----
authentication.skip.paths=/chc/live,/chc/ready
bootstrap.servers=kafka.akscfk231.svc.cluster.local:9071
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
confluent.support.metrics.enable=true
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081 # <--- 追加された設定
ksql.service.id=akscfk231.ksqldb_
ksql.sink.replicas=3
ksql.streams.num.standby.replicas=1
ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.streams.producer.request.timeout.ms=300000
ksql.streams.producer.retries=2147483647
ksql.streams.replication.factor=3
ksql.streams.state.dir=/mnt/data/data/ksql-state
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088 # <--- 追加された設定
request.timeout.ms=20000
retry.backoff.ms=500
security.protocol=PLAINTEXT
log4j.properties:
----
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %d [%t] %c %M - %m%n
log4j.rootLogger=INFO, stdout
disk-usage-agent.properties:
----
disk.data=/mnt/data/data
service.name=ksqldb
jmx-exporter.yaml:
----
lowercaseOutputLabelNames: false
lowercaseOutputName: true
ssl: false
Events: <none>
ksqldb の詳細情報
$ kubectl describe ksqldb ksqldb
Name: ksqldb
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: KsqlDB
Metadata:
Creation Timestamp: 2022-08-18T05:56:26Z
Finalizers:
ksqldb.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:configOverrides:
.:
f:server:
f:dataVolumeCapacity:
f:image:
.:
f:application:
f:init:
f:replicas:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-08-18T05:56:26Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"ksqldb.finalizers.platform.confluent.io":
Manager: manager
Operation: Update
Time: 2022-08-18T05:56:26Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:clusterName:
f:clusterNamespace:
f:conditions:
f:currentReplicas:
f:internalTopicNames:
f:kafka:
.:
f:bootstrapEndpoint:
f:operatorVersion:
f:phase:
f:readyReplicas:
f:replicas:
f:restConfig:
.:
f:internalEndpoint:
f:selector:
f:serviceID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-08-18T06:04:03Z
Resource Version: 5691
UID: 3cc5b96a-245c-4982-a21d-28a178c1a1e5
Spec:
Config Overrides:
Server:
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081 # <--- 追加された設定
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088 # <--- 追加された設定
Data Volume Capacity: 10Gi
Image:
Application: confluentinc/cp-ksqldb-server:7.1.0
Init: confluentinc/confluent-init-container:2.3.0
Replicas: 1
Status:
Cluster Name: ksqldb
Cluster Namespace: akscfk231
Conditions:
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:04:03Z
Message: Deployment has minimum availability.
Reason: MinimumReplicasAvailable
Status: True
Type: platform.confluent.io/statefulset-available
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:04:03Z
Message: Kubernetes resources ready.
Reason: KubernetesResourcesReady
Status: True
Type: platform.confluent.io/resources-ready
Last Probe Time: 2022-08-18T06:01:01Z
Last Transition Time: 2022-08-18T06:01:01Z
Message: Cluster is not being garbage collected
Reason: Garbage Collection not triggered
Status: False
Type: platform.confluent.io/garbage-collecting
Current Replicas: 1
Internal Topic Names:
_confluent-ksql-akscfk231.ksqldb__command_topic
_confluent-ksql-akscfk231.ksqldb__configs
Kafka:
Bootstrap Endpoint: kafka.akscfk231.svc.cluster.local:9071
Operator Version: v0.435.23
Phase: RUNNING
Ready Replicas: 1
Replicas: 1
Rest Config:
Internal Endpoint: http://ksqldb.akscfk231.svc.cluster.local:8088
Selector: app=ksqldb,clusterId=akscfk231,confluent-platform=true,type=ksqldb
Service ID: akscfk231.ksqldb_
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning Warning 39m (x19 over 44m) ksqlDB waiting for at-least one kafka pod availability
Normal SuccessfulCreate 39m (x2 over 39m) ksqlDB resource type *v1.Service successfully created
Normal SuccessfulCreate 39m (x2 over 39m) ksqlDB resource type *v1.ConfigMap successfully created
Normal SuccessfulCreate 39m ksqlDB resource type *v1.PersistentVolumeClaim successfully created
Normal SuccessfulCreate 39m ksqlDB resource type *v1.StatefulSet successfully created
Stream の構成追加
- 以下の2つのクエリを Configmap で定義します
- 「topic001」→「stream001」用のクエリとして「queries001.sql」を定義
- 必要なカラムのみを stream001 に転送します
- 「stream001」をベースに「stream002」→「topic002」用のクエリとして「queries002.sql」を定義
- カラム - section : 'C', 'E', 'F', 'W' のデータのみ抽出し、topic002 に転送します
- 「topic001」→「stream001」用のクエリとして「queries001.sql」を定義
作業用 Pod の構成
2つのクエリをConfigmapを利用して定義した Podの構成ファイルを作成します
---
apiVersion: v1
kind: Pod
metadata:
name: ksql-client2
namespace: akscfk231
spec:
containers:
- name: ksql-client2
image: confluentinc/ksqldb-cli:latest
env:
- name: KSQL_BOOTSTRAP_SERVERS
value: PLAINTEXT://kafka.akscfk231.svc.cluster.local:9092
- name: KSQL_KSQL_SCHEMA_REGISTRY_URL
value: http://schemaregistry.akscfk231.svc.cluster.local:8081
volumeMounts:
- mountPath: /ituru
name: ksql-queries
volumes:
- name: ksql-queries
configMap:
name: demo-ksql-client-queries-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
name: demo-ksql-client-queries-configmap
namespace: akscfk231
data: # <--- 以下に2つのクエリを定義する
queries001.sql: |-
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql: |-
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
作業用 Pod の起動
## ksqldb-cli Pod の作成
$ kubectl apply -f ksql-client2.yaml
pod/ksql-client2 created
configmap/demo-ksql-client-queries-configmap created
作業用 Pod の確認
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
confluent-operator-76d7677b8c-ll6rf 1/1 Running 0 17m
connect-0 1/1 Running 1 (11m ago) 16m
controlcenter-0 1/1 Running 0 12m
kafka-0 1/1 Running 0 13m
kafka-1 1/1 Running 0 13m
kafka-2 1/1 Running 0 13m
kafkarestproxy-0 1/1 Running 0 12m
ksql-client2 1/1 Running 0 54s
ksqldb-0 1/1 Running 1 (10m ago) 12m
schemaregistry-0 1/1 Running 0 12m
zookeeper-0 1/1 Running 0 16m
zookeeper-1 1/1 Running 0 16m
zookeeper-2 1/1 Running 0 16m
Configmap の確認
## ksqldb-cli の Configmap 設定情報確認
$ kubectl get configmap demo-ksql-client-queries-configmap
NAME DATA AGE
demo-ksql-client-queries-configmap 2 112s
## ksqldb-cli の Configmap 詳細情報確認
$ kubectl describe configmap demo-ksql-client-queries-configmap
Name: demo-ksql-client-queries-configmap
Namespace: akscfk231
Labels: <none>
Annotations: <none>
Data
====
queries001.sql:
----
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql:
----
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
Events: <none>
Stream の設定
ksqldb cli による作成
上記で作成した作業用 Pod に接続し、ksqldb cli を使用して Stream を構成します。
## 作業用 Pod への接続
$ kubectl exec -it ksql-client2 -- /bin/bash
## 設定ファイルの確認(stream001)
[appuser@ksql-client2 ~]$ cat /ituru/queries001.sql
CREATE STREAM stream001 (
id VARCHAR,
ctid BIGINT,
section VARCHAR,
iot_state VARCHAR,
val_1 DOUBLE,
val_2 DOUBLE,
created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
## 設定ファイルの確認(stream002)
[appuser@ksql-client2 ituru]$ cat /ituru/queries002.sql
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
## 現在の topic 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW TOPICS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------------
_schemas_schemaregistry_akscfk231 | 1 | 3
akscfk231.connect-configs | 1 | 3
akscfk231.connect-offsets | 25 | 3
akscfk231.connect-status | 5 | 3
topic001 | 1 | 3
topic002 | 1 | 3
---------------------------------------------------------------------
## stream001 の作成
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Message
----------------
Stream created
----------------
## stream001 の確認
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM001
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : AVRO
Kafka topic : topic001 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic001)
## stream002 の作成
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries002.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Message
----------------------------------------
Created query with ID CSAS_STREAM002_1
----------------------------------------
## stream002 の確認
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM002
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON_SR
Kafka topic : topic002 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic002)
Consumer Groups summary:
Consumer Group : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1
<no offsets committed by this group yet>
## stream 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW STREAMS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Stream Name | Kafka Topic | Key Format | Value Format | Windowed
------------------------------------------------------------------
STREAM001 | topic001 | KAFKA | AVRO | false
STREAM002 | topic002 | KAFKA | JSON_SR | false
------------------------------------------------------------------
## ちなみに、既存の stream(stream001) を再度作成しようとするとエラーになる
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Cannot add stream 'STREAM001': A stream with the same name already exists
Confluent Control Center 上での確認
$ kubectl confluent dashboard controlcenter
http://localhost:9021
ブラウザが自動的に立ち上がり、Confluent Contorol Center にログインし、「ksqldb」を確認します
後処理
Pod / secret / namespace のアンインストール方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
## Pod : 作業用
$ kubectl delete -f ksql-client2.yaml
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
AKSクラスターの停止・起動
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
Check for any error messages
$ kubectl get events
$ kubectl logs ksql-client2
まとめ
Confluent Platform に ksql cli を利用して Stream が構成できることを確認しました。ただし、データがストリーミングされてきたときに、改めてデータコンバート方法等の調整が必要となります。次回は実際にデータをストリーミングさせてみたいと思います。
参考情報
以下の情報を参考にさせていただきました
Advanced Configuration Options for Confluent Platform
Example KSQL app deployment for Kubernetes
Configure ksqlDB CLI
ksqlDB
ksqlDB Documentation
CREATE STREAM
KSQL Examples
キーコンバーターと値コンバーターの構成
Using Kafka Connect with Schema Registry