0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

#8 Confluent for Kubernetes を使用して AKS 上に Confluent Platform を構成してみました - KSQL Stream 作成編

Posted at

概要

Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。

CFK を使用して AKS上にデプロイされている Confluent Platform の KSQL に Stream を作成するためのワークフローの概要は以下のとおりです。

  1. Kubernetes 環境を準備します(事前準備で完了)
  2. Confluent for Kubernetes をデプロイします(事前準備で完了)
  3. Confluent Platform をデプロイ(必要な Connector Plugin 含む)します(事前準備で完了)
  4. Confluent Platform の追加デプロイ(topic作成)をします(事前準備で完了)
  5. Confluent Platform の追加デプロイ(Sourec/Sink Connector作成)をします(事前準備で完了)
  6. Confluent Platform の Stream のための追加構成をします
  7. Confluent Platform の Stream のための追加デプロイをします

デプロイ後のイメージは以下となります。
image.png


ローカル環境

  • macOS Monterey 12.3.1
  • python 3.8.12
  • Azure CLI 2.34.1
  • helm v3.6.3
  • kubectl v1.21.3

事前準備

  1. この記事 を実行して、AKSクラスタ環境が構築されていること
  2. この記事 を編集実行して、Azure上に CosmosDB が これと同様のデータベース名とコンテナ名で構築されていてること
  3. この記事 を参考にして、AKS 上の Confluent Platform が以下の図のような構成でデプロイされていること
    image.png

Confluent Platform の追加構成

Confluent Platform の構成変更

いままで Confluent Platform を設定定義していた「confluent_platform_ccc.yaml」ファイルに対し、KSQL REST API(ksqldb-cli) を利用するために「kind: KsqlDB」に「spec: - configOverrides: - server:」部分を追加定義します。

confluent_platform_ccc.yaml
apiVersion: platform.confluent.io/v1beta1
kind: KsqlDB
metadata:
  name: ksqldb
  namespace: akscfk231
spec:
  replicas: 1
  image:
    application: confluentinc/cp-ksqldb-server:7.1.0
    init: confluentinc/confluent-init-container:2.3.0
  dataVolumeCapacity: 10Gi
  configOverrides:
    server:
      - ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081
      - listeners=http://ksqldb.akscfk231.svc.cluster.local:8088

Confluent Platform への追加デプロイ

Confluent Platform の ksqldb への追加設定の反映

上記で追加定義した「confluent_platform_ccc.yaml」を再度 apply します。ksqldb のみ configured として設定追加されます。

$ kubectl apply -f confluent_platform_ccc.yaml
zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect unchanged
ksqldb.platform.confluent.io/ksqldb configured       # <--- ここだけ configured
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged

デプロイされた Confluent Platform リソースの確認

ksqldb の Configmap 情報

## Configmap 一覧の取得
$ kubectl get configmap                 
NAME                           DATA   AGE
connect-init-config            3      26m
connect-shared-config          5      26m
controlcenter-init-config      3      23m
controlcenter-shared-config    6      23m
kafka-init-config              3      24m
kafka-shared-config            5      24m
kafkarestproxy-init-config     3      23m
kafkarestproxy-shared-config   4      23m
ksqldb-init-config             3      23m
ksqldb-shared-config           5      23m
kube-root-ca.crt               1      29m
schemaregistry-init-config     3      23m
schemaregistry-shared-config   4      23m
zookeeper-init-config          3      26m
zookeeper-shared-config        6      26m

## ksqldb の Configmap 情報
$ kubectl describe configmap ksqldb-shared-config
Name:         ksqldb-shared-config
Namespace:    akscfk231
Labels:       app.kubernetes.io/managed-by=confluent-operator
              confluent-platform=true
              cr-name=ksqldb
              type=ksqldb
Annotations:  platform.confluent.io/cr-name: ksqldb
              platform.confluent.io/last-applied:
                eyJkYXRhIjp7ImRpc2stdXNhZ2UtYWdlbnQucHJvcGVydGllcyI6ImRpc2suZGF0YT0vbW50L2RhdGEvZGF0YVxuc2VydmljZS5uYW1lPWtzcWxkYlxuIiwiam14LWV4cG9ydGVyLn...
              platform.confluent.io/namespace: akscfk231
              platform.confluent.io/type: ksqldb

Data
====
jvm.config:
----
-Dcom.sun.management.jmxremote=true
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.port=7203
-Dcom.sun.management.jmxremote.rmi.port=7203
-Dcom.sun.management.jmxremote.ssl=false
-Djava.awt.headless=true
-Djdk.tls.ephemeralDHKeySize=2048
-Djdk.tls.server.enableSessionTicketExtension=false
-XX:+ExplicitGCInvokesConcurrent
-XX:+PrintFlagsFinal
-XX:+UnlockDiagnosticVMOptions
-XX:+UseG1GC
-XX:ConcGCThreads=1
-XX:G1HeapRegionSize=16
-XX:InitiatingHeapOccupancyPercent=35
-XX:MaxGCPauseMillis=20
-XX:MaxMetaspaceFreeRatio=80
-XX:MetaspaceSize=96m
-XX:MinMetaspaceFreeRatio=50
-XX:ParallelGCThreads=1
-server

ksqldb.properties:
----
authentication.skip.paths=/chc/live,/chc/ready
bootstrap.servers=kafka.akscfk231.svc.cluster.local:9071
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
confluent.support.metrics.enable=true
ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081   # <--- 追加された設定
ksql.service.id=akscfk231.ksqldb_
ksql.sink.replicas=3
ksql.streams.num.standby.replicas=1
ksql.streams.producer.confluent.batch.expiry.ms=9223372036854775807
ksql.streams.producer.max.block.ms=9223372036854775807
ksql.streams.producer.request.timeout.ms=300000
ksql.streams.producer.retries=2147483647
ksql.streams.replication.factor=3
ksql.streams.state.dir=/mnt/data/data/ksql-state
listeners=http://ksqldb.akscfk231.svc.cluster.local:8088                          # <--- 追加された設定
request.timeout.ms=20000
retry.backoff.ms=500
security.protocol=PLAINTEXT

log4j.properties:
----
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.EnhancedPatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %d [%t] %c %M - %m%n
log4j.rootLogger=INFO, stdout

disk-usage-agent.properties:
----
disk.data=/mnt/data/data
service.name=ksqldb

jmx-exporter.yaml:
----
lowercaseOutputLabelNames: false
lowercaseOutputName: true
ssl: false

Events:  <none>

ksqldb の詳細情報

$ kubectl describe ksqldb ksqldb
Name:         ksqldb
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         KsqlDB
Metadata:
  Creation Timestamp:  2022-08-18T05:56:26Z
  Finalizers:
    ksqldb.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:configOverrides:
          .:
          f:server:
        f:dataVolumeCapacity:
        f:image:
          .:
          f:application:
          f:init:
        f:replicas:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-08-18T05:56:26Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"ksqldb.finalizers.platform.confluent.io":
    Manager:      manager
    Operation:    Update
    Time:         2022-08-18T05:56:26Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        .:
        f:clusterName:
        f:clusterNamespace:
        f:conditions:
        f:currentReplicas:
        f:internalTopicNames:
        f:kafka:
          .:
          f:bootstrapEndpoint:
        f:operatorVersion:
        f:phase:
        f:readyReplicas:
        f:replicas:
        f:restConfig:
          .:
          f:internalEndpoint:
        f:selector:
        f:serviceID:
    Manager:         manager
    Operation:       Update
    Subresource:     status
    Time:            2022-08-18T06:04:03Z
  Resource Version:  5691
  UID:               3cc5b96a-245c-4982-a21d-28a178c1a1e5
Spec:
  Config Overrides:
    Server:
      ksql.schema.registry.url=http://schemaregistry.akscfk231.svc.cluster.local:8081   # <--- 追加された設定
      listeners=http://ksqldb.akscfk231.svc.cluster.local:8088                          # <--- 追加された設定
  Data Volume Capacity:  10Gi
  Image:
    Application:  confluentinc/cp-ksqldb-server:7.1.0
    Init:         confluentinc/confluent-init-container:2.3.0
  Replicas:       1
Status:
  Cluster Name:       ksqldb
  Cluster Namespace:  akscfk231
  Conditions:
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:04:03Z
    Message:               Deployment has minimum availability.
    Reason:                MinimumReplicasAvailable
    Status:                True
    Type:                  platform.confluent.io/statefulset-available
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:04:03Z
    Message:               Kubernetes resources ready.
    Reason:                KubernetesResourcesReady
    Status:                True
    Type:                  platform.confluent.io/resources-ready
    Last Probe Time:       2022-08-18T06:01:01Z
    Last Transition Time:  2022-08-18T06:01:01Z
    Message:               Cluster is not being garbage collected
    Reason:                Garbage Collection not triggered
    Status:                False
    Type:                  platform.confluent.io/garbage-collecting
  Current Replicas:        1
  Internal Topic Names:
    _confluent-ksql-akscfk231.ksqldb__command_topic
    _confluent-ksql-akscfk231.ksqldb__configs
  Kafka:
    Bootstrap Endpoint:  kafka.akscfk231.svc.cluster.local:9071
  Operator Version:      v0.435.23
  Phase:                 RUNNING
  Ready Replicas:        1
  Replicas:              1
  Rest Config:
    Internal Endpoint:  http://ksqldb.akscfk231.svc.cluster.local:8088
  Selector:             app=ksqldb,clusterId=akscfk231,confluent-platform=true,type=ksqldb
  Service ID:           akscfk231.ksqldb_
Events:
  Type     Reason            Age                 From    Message
  ----     ------            ----                ----    -------
  Warning  Warning           39m (x19 over 44m)  ksqlDB  waiting for at-least one kafka pod availability
  Normal   SuccessfulCreate  39m (x2 over 39m)   ksqlDB  resource type *v1.Service successfully created
  Normal   SuccessfulCreate  39m (x2 over 39m)   ksqlDB  resource type *v1.ConfigMap successfully created
  Normal   SuccessfulCreate  39m                 ksqlDB  resource type *v1.PersistentVolumeClaim successfully created
  Normal   SuccessfulCreate  39m                 ksqlDB  resource type *v1.StatefulSet successfully created

Stream の構成追加

  • 以下の2つのクエリを Configmap で定義します
    • 「topic001」→「stream001」用のクエリとして「queries001.sql」を定義
      • 必要なカラムのみを stream001 に転送します
    • 「stream001」をベースに「stream002」→「topic002」用のクエリとして「queries002.sql」を定義
      • カラム - section : 'C', 'E', 'F', 'W' のデータのみ抽出し、topic002 に転送します

作業用 Pod の構成

2つのクエリをConfigmapを利用して定義した Podの構成ファイルを作成します

ksql-client2.yaml
---
apiVersion: v1
kind: Pod
metadata:
  name: ksql-client2
  namespace: akscfk231
spec:
  containers:
  - name: ksql-client2
    image: confluentinc/ksqldb-cli:latest
    env:
      - name: KSQL_BOOTSTRAP_SERVERS
        value: PLAINTEXT://kafka.akscfk231.svc.cluster.local:9092
      - name: KSQL_KSQL_SCHEMA_REGISTRY_URL
        value: http://schemaregistry.akscfk231.svc.cluster.local:8081
    volumeMounts:
      - mountPath: /ituru
        name: ksql-queries
  volumes:
  - name: ksql-queries
    configMap:
      name: demo-ksql-client-queries-configmap
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: demo-ksql-client-queries-configmap
  namespace: akscfk231
data:                         # <--- 以下に2つのクエリを定義する
  queries001.sql: |-
    CREATE STREAM stream001 (
      id VARCHAR,
      ctid BIGINT,
      section VARCHAR,
      iot_state VARCHAR,
      val_1 DOUBLE,
      val_2 DOUBLE,
      created_at VARCHAR
    ) WITH (kafka_topic = 'topic001', value_format = 'avro');
  queries002.sql: |-
    CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
    SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';

作業用 Pod の起動

## ksqldb-cli Pod の作成
$ kubectl apply -f ksql-client2.yaml 
pod/ksql-client2 created
configmap/demo-ksql-client-queries-configmap created

作業用 Pod の確認

$ kubectl get pods
NAME                                  READY   STATUS    RESTARTS      AGE
confluent-operator-76d7677b8c-ll6rf   1/1     Running   0             17m
connect-0                             1/1     Running   1 (11m ago)   16m
controlcenter-0                       1/1     Running   0             12m
kafka-0                               1/1     Running   0             13m
kafka-1                               1/1     Running   0             13m
kafka-2                               1/1     Running   0             13m
kafkarestproxy-0                      1/1     Running   0             12m
ksql-client2                          1/1     Running   0             54s
ksqldb-0                              1/1     Running   1 (10m ago)   12m
schemaregistry-0                      1/1     Running   0             12m
zookeeper-0                           1/1     Running   0             16m
zookeeper-1                           1/1     Running   0             16m
zookeeper-2                           1/1     Running   0             16m

Configmap の確認

## ksqldb-cli の Configmap 設定情報確認
$ kubectl get configmap demo-ksql-client-queries-configmap
NAME                                 DATA   AGE
demo-ksql-client-queries-configmap   2      112s


## ksqldb-cli の Configmap 詳細情報確認
$ kubectl describe configmap demo-ksql-client-queries-configmap
Name:         demo-ksql-client-queries-configmap
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>

Data
====
queries001.sql:
----
CREATE STREAM stream001 (
  id VARCHAR,
  ctid BIGINT,
  section VARCHAR,
  iot_state VARCHAR,
  val_1 DOUBLE,
  val_2 DOUBLE,
  created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');
queries002.sql:
----
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';
Events:  <none>

Stream の設定

ksqldb cli による作成

上記で作成した作業用 Pod に接続し、ksqldb cli を使用して Stream を構成します。

## 作業用 Pod への接続
$ kubectl exec -it ksql-client2 -- /bin/bash


## 設定ファイルの確認(stream001)
[appuser@ksql-client2 ~]$ cat /ituru/queries001.sql
CREATE STREAM stream001 (
  id VARCHAR,
  ctid BIGINT,
  section VARCHAR,
  iot_state VARCHAR,
  val_1 DOUBLE,
  val_2 DOUBLE,
  created_at VARCHAR
) WITH (kafka_topic = 'topic001', value_format = 'avro');

## 設定ファイルの確認(stream002)
[appuser@ksql-client2 ituru]$ cat /ituru/queries002.sql
CREATE STREAM stream002 WITH (kafka_topic = 'topic002', value_format = 'JSON_SR') AS 
SELECT * FROM stream001 WHERE section='C' OR section='E' OR section='F' OR section='W';


## 現在の topic 一覧の取得
[appuser@ksql-client2 ~]$ ksql --execute "SHOW TOPICS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Kafka Topic                       | Partitions | Partition Replicas 
---------------------------------------------------------------------
 _schemas_schemaregistry_akscfk231 | 1          | 3                  
 akscfk231.connect-configs         | 1          | 3                  
 akscfk231.connect-offsets         | 25         | 3                  
 akscfk231.connect-status          | 5          | 3                  
 topic001                          | 1          | 3                  
 topic002                          | 1          | 3                  
---------------------------------------------------------------------


## stream001 の作成 
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Message        
----------------
 Stream created 
----------------

## stream001 の確認 
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM001
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
Kafka topic          : topic001 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic topic001)


## stream002 の作成 
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries002.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Message                                
----------------------------------------
 Created query with ID CSAS_STREAM002_1 
----------------------------------------

## stream002 の確認 
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM002
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON_SR
Kafka topic          : topic002 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------

(Statistics of the local KSQL server interaction with the Kafka topic topic002)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1
<no offsets committed by this group yet>


## stream 一覧の取得 
[appuser@ksql-client2 ~]$ ksql --execute "SHOW STREAMS;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

 Stream Name | Kafka Topic | Key Format | Value Format | Windowed 
------------------------------------------------------------------
 STREAM001   | topic001    | KAFKA      | AVRO         | false    
 STREAM002   | topic002    | KAFKA      | JSON_SR      | false    
------------------------------------------------------------------


## ちなみに、既存の stream(stream001) を再度作成しようとするとエラーになる
[appuser@ksql-client2 ~]$ ksql --file /ituru/queries001.sql -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Cannot add stream 'STREAM001': A stream with the same name already exists

Confluent Control Center 上での確認

$ kubectl confluent dashboard controlcenter
http://localhost:9021

ブラウザが自動的に立ち上がり、Confluent Contorol Center にログインし、「ksqldb」を確認します
image.png
image.png


後処理

Pod / secret / namespace のアンインストール方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml

## Pod : 作業用
$ kubectl delete -f ksql-client2.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKSクラスターの停止・起動

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

Check for any error messages

$ kubectl get events
$ kubectl logs ksql-client2

まとめ

Confluent Platform に ksql cli を利用して Stream が構成できることを確認しました。ただし、データがストリーミングされてきたときに、改めてデータコンバート方法等の調整が必要となります。次回は実際にデータをストリーミングさせてみたいと思います。

参考情報

以下の情報を参考にさせていただきました

Advanced Configuration Options for Confluent Platform
Example KSQL app deployment for Kubernetes
Configure ksqlDB CLI
ksqlDB
ksqlDB Documentation
CREATE STREAM
KSQL Examples
キーコンバーターと値コンバーターの構成
Using Kafka Connect with Schema Registry

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?