0
0

More than 1 year has passed since last update.

#3 Confluent for Kubernetes を使用して AKS 上に Confluent Platform を構成してみました - Connector Plugin編

Last updated at Posted at 2022-06-29

概要

Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。

CFK を使用して AKS上にデプロイされている Confluent Platform に Connector を構成するためのワークフローの概要は以下のとおりです。

  1. Kubernetes 環境を準備します(事前準備で完了)
  2. Confluent for Kubernetes をデプロイします(事前準備で完了)
  3. Confluent Platform をデプロイします(事前準備で完了)
  4. Confluent Platform を Connector Plugin のコンテナで再デプロイします
  5. Connector Plugin の確認

デプロイ後のイメージは以下となります。
image.png


ローカル環境

  • macOS Monterey 12.3.1
  • python 3.8.12
  • Azure CLI 2.34.1
  • helm v3.6.3
  • kubectl v1.21.3

事前準備

  1. この記事 を実行して、AKSクラスタ環境が構築されていること
  2. この記事 を実行して、AKS 上に Confluent Platform をデプロイされていること
  3. この記事 を参照して、新たに定義した Connector Plugin がはいっている Connect コンテナイメージが DockerHub に公開されていること

Confluent Platform を Connector Plugin のコンテナで再デプロイ

Confluent Platform の構成変更

上記の事前準備 「2.」 の中の 「confluent_platform_ccc.yaml」 を CosmosDB の Connector Plugin が入っている Docker イメージを利用するように変更します

confluent_platform_ccc.yaml

##以下の部分の変更
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
  name: connect
  namespace: akscfk231
spec:
  replicas: 1
  image:
    application: keisz/confluent_connector:1.0         # <--- ここを変更
    init: confluentinc/confluent-init-container:2.3.0
  dependencies:
    kafka:
      bootstrapEndpoint: kafka:9071

Confluent Platform の再デプロイ

$ kubectl apply -f confluent_platform_ccc.yaml

zookeeper.platform.confluent.io/zookeeper unchanged
kafka.platform.confluent.io/kafka unchanged
connect.platform.confluent.io/connect configured
ksqldb.platform.confluent.io/ksqldb unchanged
controlcenter.platform.confluent.io/controlcenter unchanged
schemaregistry.platform.confluent.io/schemaregistry unchanged
kafkarestproxy.platform.confluent.io/kafkarestproxy unchanged

Connector Plugin の確認

Pod/svc の変更

$ kubectl get pods

NAME                                  READY   STATUS    RESTARTS   AGE
confluent-operator-76d7677b8c-pdp8s   1/1     Running   0          13m
connect-0                             1/1     Running   0          111s
controlcenter-0                       1/1     Running   0          8m20s
kafka-0                               1/1     Running   0          9m51s
kafka-1                               1/1     Running   0          9m51s
kafka-2                               1/1     Running   0          9m51s
kafkarestproxy-0                      1/1     Running   0          8m21s
ksqldb-0                              1/1     Running   0          8m20s
schemaregistry-0                      1/1     Running   0          8m20s
zookeeper-0                           1/1     Running   0          12m
zookeeper-1                           1/1     Running   0          12m
zookeeper-2                           1/1     Running   0          12m


$ kubectl get svc

NAME                        TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                          AGE
confluent-operator          ClusterIP   10.1.0.231   <none>        7778/TCP                                                         15m
connect                     ClusterIP   None         <none>        8083/TCP,7203/TCP,7777/TCP,7778/TCP                              14m
connect-0-internal          ClusterIP   10.1.0.232   <none>        8083/TCP,7203/TCP,7777/TCP,7778/TCP                              14m
controlcenter               ClusterIP   None         <none>        9021/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
controlcenter-0-internal    ClusterIP   10.1.0.95    <none>        9021/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
kafka                       ClusterIP   None         <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-0-internal            ClusterIP   10.1.0.181   <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-1-internal            ClusterIP   10.1.0.103   <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafka-2-internal            ClusterIP   10.1.0.27    <none>        9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   11m
kafkarestproxy              ClusterIP   None         <none>        8082/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
kafkarestproxy-0-internal   ClusterIP   10.1.0.107   <none>        8082/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
ksqldb                      ClusterIP   None         <none>        8088/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
ksqldb-0-internal           ClusterIP   10.1.0.22    <none>        8088/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
schemaregistry              ClusterIP   None         <none>        8081/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
schemaregistry-0-internal   ClusterIP   10.1.0.227   <none>        8081/TCP,7203/TCP,7777/TCP,7778/TCP                              10m
zookeeper                   ClusterIP   None         <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-0-internal        ClusterIP   10.1.0.66    <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-1-internal        ClusterIP   10.1.0.21    <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m
zookeeper-2-internal        ClusterIP   10.1.0.118   <none>        2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP            14m

Connector の確認

確認するために Connect をローカルホストにリダイレクトします

$ kubectl port-forward --address localhost svc/connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

別のターミナルを立ち上げ、Connector の Plug-in が存在することを確認する(CosmosDB のPluginの確認)

$ curl http://localhost:8083/connector-plugins | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   504  100   504    0     0   2030      0 --:--:-- --:--:-- --:--:--  2032
[
  {
    "class": "com.azure.cosmos.kafka.connect.sink.CosmosDBSinkConnector",
    "type": "sink",
    "version": "null"
  },
  {
    "class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
    "type": "source",
    "version": "null"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

Confluent Control Center 上での確認

$ kubectl confluent dashboard controlcenter
http://localhost:9021

ブラウザが自動的に立ち上がり、Confluent Contorol Center にログインし、「Connect」を確認します
スクリーンショット 2022-06-29 14.51.19.png


後処理

Pod / secret / namespace のアンインストール方法

## Pod : confluent-operator
$ helm delete confluent-operator

## Pod : confluent-platform
$ kubectl delete -f confluent_platform_ccc.yaml

## namespace の削除方法(namespace配下のPodは全て削除される)
$ kubectl delete namespace akscfk231

AKSクラスターの停止・起動

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

Check for any error messages in events

kubectl get events

まとめ

Connector Plugin がインストールされている(自作構成) Dockerイメージで、Confluent Platform が構成できることを確認しました。

参考情報

以下の情報を参考にさせていただきました

Confluent for Kubernetes

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0