0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

#6 Confluent for Kubernetes を使用して AKS 上に Confluent Platform を構成してみました - Source Connector作成編

Last updated at Posted at 2022-07-07

概要

Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。

CFK を使用して AKS上にデプロイされている Confluent Platform に Source Connector を作成するためのワークフローの概要は以下のとおりです。

  1. Kubernetes 環境を準備します(事前準備で完了)
  2. Confluent for Kubernetes をデプロイします(事前準備で完了)
  3. Confluent Platform をデプロイ(必要な Connector Plugin 含む)します(事前準備で完了)
  4. Confluent Platform の追加構成をします
  5. Confluent Platform の追加デプロイ(Sourec Connector作成)をします

デプロイ後のイメージは以下となります。
image.png


ローカル環境

  • macOS Monterey 12.3.1
  • python 3.8.12
  • Azure CLI 2.34.1
  • helm v3.6.3
  • kubectl v1.21.3

事前準備

  1. この記事 を実行して、AKSクラスタ環境が構築されていること
  2. この記事 を実行して、AKS 上に Confluent Platform をデプロイされ、Sink Connector も作成されていること
  3. この記事 を編集実行して、Azure上にSource用に新たにDB等追加定義したCosmosDBが構築されていること
Sink CosmosDB 設定項目
Endpoint https://iturucosmosdb01.documents.azure.com:443/
CosmosDB アカウント名 iturucosmosdb01
データベース名 CPDemoDB002
コンテナ名 container002
パーティション section
Source CosmosDB 設定項目
Endpoint https://iturucosmosdb01.documents.azure.com:443/
CosmosDB アカウント名 iturucosmosdb01
データベース名 CPDemoDB001
コンテナ名 container001
パーティション section

Confluent Platform の追加構成

CosmosDB のアクセスキーの取得

Terraform を用いて CosmosDB を構築した作業ディレクトリ上にて、以下を実行して CosmosDB のアクセスキーを取得します

$ terraform output cosmosdb_account_key
"NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="

Confluent Platform の構成変更

現在、Basic authentication secret を利用した Connect への構成変更は、うまく機能していない(私の力量不足かも)ようなので認証なしの構成で実施いたします。

Source Connector作成用 の Custom Resource の定義

kafka_source_connector.yaml
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: cosmosdb-source-connector001
  namespace: akscfk231
spec:
  class: "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector"
  taskMax: 3
  connectClusterRef:
    name: connect
  configs:
    topics: "topic001"
    key.converter: "io.confluent.connect.avro.AvroConverter"
    key.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    value.converter: "io.confluent.connect.avro.AvroConverter"
    value.converter.schema.registry.url: "http://schemaregistry.akscfk231.svc.cluster.local:8081"
    connect.cosmos.connection.endpoint: "https://iturucosmosdb01.documents.azure.com:443/"
    connect.cosmos.master.key: "NYeu0hLfOMJscPUDUs7ql7U9BJ14Gd1DiwmDwbUVrVC3tOUsluwMNIrm3uCa5nMINqPISAkirjd12qt1efDqjg=="
    connect.cosmos.databasename: "CPDemoDB001"
    connect.cosmos.containers.topicmap: "topic001#container001"
  restartPolicy:
    type: OnFailure
    maxRetry: 10

Confluent Platform への追加デプロイ

Confluent Platform への Connector の作成

Custom Resource から Cosmosdb Source Connector の作成

$ kubectl apply -f kafka_source_connector.yaml

デプロイされた Confluent Platform リソースの確認

$ kubectl get connect  
NAME      REPLICAS   READY   STATUS    AGE
connect   1          1       RUNNING   22m

$ kubectl get connector
NAME                           STATUS    CONNECTORSTATUS   TASKS-READY   AGE
cosmosdb-sink-connector002     CREATED   RUNNING           3/3           8m24s
cosmosdb-source-connector001   CREATED   RUNNING           3/3           33s


## connector の詳細確認
$ kubectl describe connector cosmosdb-source-connector001
Name:         cosmosdb-source-connector001
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>
API Version:  platform.confluent.io/v1beta1
Kind:         Connector
Metadata:
  Creation Timestamp:  2022-07-07T06:12:10Z
  Finalizers:
    connector.finalizers.platform.confluent.io
  Generation:  1
  Managed Fields:
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:class:
        f:configs:
          .:
          f:connect.cosmos.connection.endpoint:
          f:connect.cosmos.containers.topicmap:
          f:connect.cosmos.databasename:
          f:connect.cosmos.master.key:
          f:key.converter:
          f:key.converter.schema.registry.url:
          f:topics:
          f:value.converter:
          f:value.converter.schema.registry.url:
        f:connectClusterRef:
          .:
          f:name:
        f:restartPolicy:
          .:
          f:maxRetry:
          f:type:
        f:taskMax:
    Manager:      kubectl-client-side-apply
    Operation:    Update
    Time:         2022-07-07T06:12:10Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:finalizers:
          .:
          v:"connector.finalizers.platform.confluent.io":
        f:ownerReferences:
          .:
          k:{"uid":"e188f675-166f-4254-8041-25e1b3d540c9"}:
    Manager:      manager
    Operation:    Update
    Time:         2022-07-07T06:12:10Z
    API Version:  platform.confluent.io/v1beta1
    Fields Type:  FieldsV1
    fieldsV1:
      f:status:
        f:connectorState:
        f:tasksReady:
        f:workerID:
    Manager:      manager
    Operation:    Update
    Subresource:  status
    Time:         2022-07-07T06:12:40Z
  Owner References:
    API Version:           platform.confluent.io/v1beta1
    Block Owner Deletion:  true
    Controller:            true
    Kind:                  Connect
    Name:                  connect
    UID:                   e188f675-166f-4254-8041-25e1b3d540c9
  Resource Version:        11227
  UID:                     6ef6bd16-d713-4e39-9b60-a7cdeac08502
Spec:
  Class:  com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector
  Configs:
    connect.cosmos.connection.endpoint:   https://iturucosmosdb01.documents.azure.com:443/
    connect.cosmos.containers.topicmap:   topic001#container001
    connect.cosmos.databasename:          CPDemoDB001
    connect.cosmos.master.key:            3dL10prVb08owex6MPFwhpccCIc8HpQUofwdvTw6GSQSm772AEZYmPnFd1gosedA45XsrdxnT7IEikBxahJKOA==
    key.converter:                        io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url:    http://schemaregistry.akscfk231.svc.cluster.local:8081
    Topics:                               topic001
    value.converter:                      io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url:  http://schemaregistry.akscfk231.svc.cluster.local:8081
  Connect Cluster Ref:
    Name:  connect
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  Task Max:     3
Status:
  Connect Rest Endpoint:  http://connect.akscfk231.svc.cluster.local:8083
  Connector State:        RUNNING
  Kafka Cluster ID:       brE6ta3QQ36Annph22sjfw
  Restart Policy:
    Max Retry:  10
    Type:       OnFailure
  State:        CREATED
  Tasks Ready:  3/3
  Worker ID:    connect-0.connect.akscfk231.svc.cluster.local:8083
Events:         <none>

Connector 構成の確認

Connector の確認

確認するために Connect をローカルホストにリダイレクトします

$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

別のターミナルを立ち上げ、Connector のステータスを確認する

$ curl localhost:8083/connectors                                 
["cosmosdb-source-connector001","cosmosdb-sink-connector002"]


$ curl localhost:8083/connectors/cosmosdb-source-connector001/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   438  100   438    0     0   2327      0 --:--:-- --:--:-- --:--:--  2329
{
  "name": "cosmosdb-source-connector001",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 1,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    },
    {
      "id": 2,
      "state": "RUNNING",
      "worker_id": "connect-0.connect.akscfk231.svc.cluster.local:8083"
    }
  ],
  "type": "source"
}


$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort
sink    |  cosmosdb-sink-connector002    |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector
source  |  cosmosdb-source-connector001  |  RUNNING  |  RUNNING  |  RUNNING  |  RUNNING  |  com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector

Confluent Control Center 上での確認

$ kubectl confluent dashboard controlcenter
http://localhost:9021

ブラウザが自動的に立ち上がり、Confluent Contorol Center にログインし、「Connect」を確認します
スクリーンショット 2022-07-07 15.41.58.png
スクリーンショット 2022-07-07 15.42.24.png


後処理

Pod / secret / namespace のアンインストール方法

## Pod : confluent-operator
$ helm delete confluent-operator             

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml
$ kubectl delete -f kafka_sink_connector.yaml
$ kubectl delete -f kafka_source_connector.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKSクラスターの停止・起動

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

Check for any error messages in events

kubectl get events

まとめ

Confluent Platform に 必要な Connector「CosmosDB Source Connector」が構成できることを確認しました。ただし、データがストリーミングされてきたときに、改めてデータコンバート方法等の調整が必要となります。

参考情報

以下の情報を参考にさせていただきました

Confluent for Kubernetes
Manage Connectors
confluent-kubernetes-examples/connector/datagen-source-connector/
Connector
Connect
Azure Cosmos DB 用 Kafka Connect - シンク コネクタ
Azure Cosmos DB 用 Kafka Connect - ソース コネクタ
Basic authentication

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?