概要
Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。
CFK を使用して AKS上にデプロイされている Confluent Platform に Source Connector を作成するためのワークフローの概要は以下のとおりです。
- Kubernetes 環境を準備します(事前準備で完了)
- Confluent for Kubernetes をデプロイします(事前準備で完了)
- Confluent Platform をデプロイ(必要な Connector Plugin 含む)します(事前準備で完了)
- Confluent Platform の追加構成をします
- Confluent Platform の追加デプロイ(Sourec Connector作成)をします
ローカル環境
- macOS Monterey 12.3.1
- python 3.8.12
- Azure CLI 2.34.1
- helm v3.6.3
- kubectl v1.21.3
事前準備
- この記事 を実行して、AKSクラスタ環境が構築されていること
- この記事 を実行して、AKS 上に Confluent Platform をデプロイされ、Sink Connector も作成されていること
- この記事 を編集実行して、Azure上にSource用に新たにDB等追加定義したCosmosDBが構築されていること
Sink CosmosDB 設定項目 | 値 |
---|---|
Endpoint | https://iturucosmosdb01.documents.azure.com:443/ |
CosmosDB アカウント名 | iturucosmosdb01 |
データベース名 | CPDemoDB002 |
コンテナ名 | container002 |
パーティション | section |
Source CosmosDB 設定項目 | 値 |
---|---|
Endpoint | https://iturucosmosdb01.documents.azure.com:443/ |
CosmosDB アカウント名 | iturucosmosdb01 |
データベース名 | CPDemoDB001 |
コンテナ名 | container001 |
パーティション | section |
Confluent Platform の追加構成
CosmosDB のアクセスキーの取得
Terraform を用いて CosmosDB を構築した作業ディレクトリ上にて、以下を実行して CosmosDB のアクセスキーを取得します
$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
Confluent Platform の構成変更
現在、Basic authentication secret を利用した Connect への構成変更は、うまく機能していない(私の力量不足かも)ようなので認証なしの構成で実施いたします。
Source Connector作成用 の Custom Resource の定義
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: cosmosdb-source-connector001
namespace: akscfk231
spec:
class: "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector"
taskMax: 3
connectClusterRef:
name: connect
configs:
topics: "topic001"
key.converter: "io.confluent.connect.avro.AvroConverter"
key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
value.converter: "io.confluent.connect.avro.AvroConverter"
value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
connect.cosmos.databasename: "CPDemoDB001"
connect.cosmos.containers.topicmap: "topic001#container001"
restartPolicy:
type: OnFailure
maxRetry: 10
Confluent Platform への追加デプロイ
Confluent Platform への Connector の作成
Custom Resource から Cosmosdb Source Connector の作成
$ kubectl apply -f kafka_source_connector.yaml
デプロイされた Confluent Platform リソースの確認
$ kubectl get connect
NAME REPLICAS READY STATUS AGE
connect 1 1 RUNNING 22m
$ kubectl get connector
NAME STATUS CONNECTORSTATUS TASKS-READY AGE
cosmosdb-sink-connector002 CREATED RUNNING 3/3 8m24s
cosmosdb-source-connector001 CREATED RUNNING 3/3 33s
## connector の詳細確認
$ kubectl describe connector cosmosdb-source-connector001
Name: cosmosdb-source-connector001
Namespace: akscfk231
Labels: <none>
Annotations: <none>
API Version: platform.confluent.io/v1beta1
Kind: Connector
Metadata:
Creation Timestamp: 2022-07-07T06:12:10Z
Finalizers:
connector.finalizers.platform.confluent.io
Generation: 1
Managed Fields:
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:class:
f:configs:
.:
f:connect.cosmos.connection.endpoint:
f:connect.cosmos.containers.topicmap:
f:connect.cosmos.databasename:
f:connect.cosmos.master.key:
f:key.converter:
f:key.converter.schema.registry.url:
f:topics:
f:value.converter:
f:value.converter.schema.registry.url:
f:connectClusterRef:
.:
f:name:
f:restartPolicy:
.:
f:maxRetry:
f:type:
f:taskMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2022-07-07T06:12:10Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:finalizers:
.:
v:"connector.finalizers.platform.confluent.io":
f:ownerReferences:
.:
k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
Manager: manager
Operation: Update
Time: 2022-07-07T06:12:10Z
API Version: platform.confluent.io/v1beta1
Fields Type: FieldsV1
fieldsV1:
f:status:
f:connectorState:
f:tasksReady:
f:workerID:
Manager: manager
Operation: Update
Subresource: status
Time: 2022-07-07T06:12:40Z
Owner References:
API Version: platform.confluent.io/v1beta1
Block Owner Deletion: true
Controller: true
Kind: Connect
Name: connect
UID: e188f675-166f-4254-8041-25e1b3d540c9
Resource Version: 11227
UID: 6ef6bd16-d713-4e39-9b60-a7cdeac08502
Spec:
Class: com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
Configs:
connect.cosmos.connection.endpoint: https://iturucosmosdb01.documents.azure.com:443/
connect.cosmos.containers.topicmap: topic001#container001
connect.cosmos.databasename: CPDemoDB001
connect.cosmos.master.key: 3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Topics: topic001
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schemaregistry.akscfk231.svc.cluster.local:8081
Connect Cluster Ref:
Name: connect
Restart Policy:
Max Retry: 10
Type: OnFailure
Task Max: 3
Status:
Connect Rest Endpoint: http://connect.akscfk231.svc.cluster.local:8083
Connector State: RUNNING
Kafka Cluster ID: brE6ta3QQ36Annph22sjfw
Restart Policy:
Max Retry: 10
Type: OnFailure
State: CREATED
Tasks Ready: 3/3
Worker ID: connect-0.connect.akscfk231.svc.cluster.local:8083
Events: <none>
Connector 構成の確認
Connector の確認
確認するために Connect をローカルホストにリダイレクトします
$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
※ CTRL+C で終了できます
別のターミナルを立ち上げ、Connector のステータスを確認する
$ curl localhost:8083/connectors
["cosmosdb-source-connector001","cosmosdb-sink-connector002"]
$ curl localhost:8083/connectors/cosmosdb-source-connector001/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 438 100 438 0 0 2327 0 --:--:-- --:--:-- --:--:-- 2329
{
"name": "cosmosdb-source-connector001",
"connector": {
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
}
],
"type": "source"
}
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | cosmosdb-sink-connector002 | RUNNING | RUNNING | RUNNING | RUNNING | com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
source | cosmosdb-source-connector001 | RUNNING | RUNNING | RUNNING | RUNNING | com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
Confluent Control Center 上での確認
$ kubectl confluent dashboard controlcenter
http://localhost:9021
ブラウザが自動的に立ち上がり、Confluent Contorol Center にログインし、「Connect」を確認します
後処理
Pod / secret / namespace のアンインストール方法
## Pod : confluent-operator
$ helm delete confluent-operator
## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml
$ kubectl delete -f kafka_source_connector.yaml
## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231
AKSクラスターの停止・起動
$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01
Check for any error messages in events
kubectl get events
まとめ
Confluent Platform に 必要な Connector「CosmosDB Source Connector」が構成できることを確認しました。ただし、データがストリーミングされてきたときに、改めてデータコンバート方法等の調整が必要となります。
参考情報
以下の情報を参考にさせていただきました
Confluent for Kubernetes
Manage Connectors
confluent-kubernetes-examples/connector/datagen-source-connector/
Connector
Connect
Azure Cosmos DB 用 Kafka Connect - シンク コネクタ
Azure Cosmos DB 用 Kafka Connect - ソース コネクタ
Basic authentication