6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

著者: 伊藤 雅博, 株式会社日立製作所

はじめに

OpenShift上にRed Hat Streams for Apache Kafkaをデプロイする手順を紹介します。

記事一覧:

  1. AWSにおけるOpenShiftプライベートクラスタの構築
  2. OpenShiftにStreams for Apache Kafka (Strimzi) をデプロイする(本稿)
  3. Streams for Apache Kafka Console (Kafkaコンソール) をデプロイする
  4. Streams for Apache Kafka (Strimzi) の認証・認可

Streams for Apache Kafka (Strimzi) の概要

Streams for Apache Kafka on OpenShiftは、Kubernetes上でApache Kafkaを運用・管理するためのオープンソースプロジェクトStrimziを基盤とした製品です。本稿で紹介する手順の多くはStrimziでも同様に利用できます。

Streams for Apache Kafkaでは、Kafkaクラスタの状態更新はカスタムリソース(CR)で実施し、状態参照はKafkaコンソール(Web UI)で実施します。Kafka CLIツールは動作確認やデバッグ、特殊な運用で使用します。

主なカスタムリソースを以下に示します。

  • Kafkaクラスタの作成・設定変更・削除: KafkaKafkaNodePoolリソース
  • Topicの作成・設定変更・削除: KafkaTopicリソース
  • ユーザとACLの作成・設定変更・削除: KafkaUserリソース
  • Kafkaクラスタのリバランス: KafkaRebalanceリソース

本稿ではこのうちKafkaクラスタとTopicを作成する例を示します。ユーザとACLの作成については第4回の投稿で説明します。

Kafkaコンソールではノード、Topic、Consumer Groupの情報を参照できます。Kafkaコンソールについては第3回の投稿で説明します。

前提環境

AWSにおけるOpenShiftプライベートクラスタの構築で構築済みの下記OpenShiftクラスタにKafkaをデプロイします。

  • プラットフォーム: AWS
  • OpenShiftバージョン: 4.20
  • ネットワーク構成:
    • インターネットへの接続は許可、インターネットからの接続は不許可
    • プライベートドメインを使用
    • 3AZで冗長化

ocp-vpc-cluster.png

OpenShiftノード構成:

ノード種別 台数 OS インスタンスタイプ ボリュームタイプ
Controller 3 RHCOS m6i.xlarge (4 vCPU, 16 GiBメモリ) gp3, 100 GB, 2000 IOPS
Compute 3 RHCOS m6i.xlarge (4 vCPU, 16 GiBメモリ) gp3, 200 GB, 2000 IOPS
Manager 1 Amazon Linux 2023 m6i.large (2 vCPU, 8 GiBメモリ) gp3, 100 GB, 1000 IOPS

構築するKafka環境

バージョン

  • Cluster Operatorバージョン: 3.0 (Apache Kafka 4.0ベース)

公式ドキュメント: Streams for Apache Kafka 3.0

プロジェクト構成

Streams for Apache KafkaのリソースをデプロイするOpenShiftプロジェクト(ネームスペース)を以下に示します。

リソース プロジェクト
Cluster Operator openshift-operators
Kafkaクラスタ kafka-01

Cluster Operatorは全プロジェクトから利用できるように、既存のopenshift-operatorsに1つだけインストールします。Cluster Operatorはプロジェクトで分離できないクラスタスコープのリソースを持つため、複数プロジェクトへの重複デプロイはできません。

Kafkaクラスタについては、クラスタごとにプロジェクトを作成してそこにデプロイします。

以下にプロジェクトとPodの配置例を示します。

ocp-kafka-cluster.png

Kafkaクラスタ構成

今回は動作確認を目的としているため、リソースの割り当てを最小限にしてデプロイします。

ノード種別 Pod数 vCPU Memory Disk
Controller 3 0.5-1 vCPU 1-2 GiB gp3, 50 GB
Broker 3 0.5-1 vCPU 1-2 GiB gp3, 100 GB

Kafkaサーバのリスナ定義

Kafkaのエンドポイントとなるリスナの定義について説明します。

以下の内部管理用のリスナは自動作成されます。

種別 用途 ポート Service
Controllerリスナ Controllerアクセス用 9090 ClusterIP
Brokerリスナ Broker間複製用 9091 ClusterIP

今回は以下のクライアント用リスナを作成します。認証方式はリスナごとに指定し、公開先はOpenShiftクラスタ内部または外部を選択できます。

今回は無認証(内部)とTLSサーバ認証(内部/外部)のリスナを定義します。ユーザ認証(クライアント認証)については第4回の投稿で紹介します。

種別 用途 ポート 認証方式 公開先 (Service)
Brokerリスナ 内部アクセス用 9092 なし 内部 (ClusterIP)
Brokerリスナ 内部アクセス用 9093 TLSサーバ認証 内部 (ClusterIP)
Brokerリスナ 外部アクセス用 9094 (443) ※ TLSサーバ認証 外部 (Route)

※: 外部公開のRouteに対しては、クライアントは443番ポートで接続します(内部のServiceのターゲットポートは9094)。

Cluster Operatorのインストール

OpenShift WebコンソールのOperatorHubを使用して、Streams for Apache Kafka Operatorをインストールします。

ソフトウェアカタログからStreams for Apache Kafkaを検索します。

kafka_operator.png

このCluster Operatorをデフォルトのプロジェクト openshift-operatorsにインストールすると、全プロジェクトから利用可能になります。

  • Operator名: Streams for Apache Kafka
  • Channel: stable
  • バージョン: 3.0.1-3
  • インストールモード: クラスターのすべての namespace (デフォルト)
    • namespace: openshift-operators
  • 更新の承認: 手動

Kafkaクラスタのデプロイ

Cluster Operatorを利用してKafkaクラスタをデプロイします。

参考:

Kafkaクラスタ用のプロジェクトを作成

Kafkaクラスタのデプロイ先となるプロジェクトkafka-01を作成します。

# プロジェクト`kafka-01`を作成
oc new-project kafka-01
## Now using project "kafka-01" on server "https://api.ocp420.ossc.hitachi.com:6443".
## ・・・

# 現在のプロジェクトを確認
oc project
## Using project "kafka-01" on server "https://api.ocp420.ossc.hitachi.com:6443".

デプロイ方法の選択

デプロイ方法は以下の2種類がありますが、今回は設定を管理しやすい1.の方法で実施します。

  1. ローカルでリソースファイル(YAML)を作成し、oc applyコマンドでデプロイ(推奨)
  2. OpenShift Webコンソールからリソースファイルを作成してデプロイ

リソースファイルの作成

以下のカスタムリソースを作成します。

  • KafkaNodePoolリソース
    • Kafkaクラスタ内の異なるKafkaノードのグループを表す
    • ControllerとBrokerのグループを定義し、それぞれの台数、リソース(vCPU、メモリ)、ストレージ設定などを記載
  • Kafkaリソース
    • Kafkaクラスタを定義
    • KafkaNodePoolで定義した設定が優先される

作成するリソースファイルを以下に示します。

my-cluster-01.yaml
---
# Controllerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controller-pool
  # OpenShiftプロジェクト名
  namespace: kafka-01
  labels:
    # Kafkaクラスタ名
    strimzi.io/cluster: my-cluster-01
spec:
  roles:
    - controller
  # ノード台数
  replicas: 3

  # ストレージ設定
  storage:
    type: jbod
    volumes:
      - id: 0
        # 永続ボリューム(PV)を指定
        type: persistent-claim
        # ボリュームの容量
        size: 50Gi
        # このボリュームにKRaft metadata logを配置(1ボリュームのみ指定)
        kraftMetadata: shared
        # Pod削除時にボリュームを削除しない
        deleteClaim: false

  # CPU・メモリ設定
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1000m
      memory: 2Gi

---
# Brokerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker-pool
  # OpenShiftプロジェクト名
  namespace: kafka-01
  labels:
    # Kafkaクラスタ名
    strimzi.io/cluster: my-cluster-01
spec:
  roles:
    - broker
  # ノード台数
  replicas: 3

  # ストレージ設定
  storage:
    # JBODストレージ: 1台以上のボリュームを指定
    type: jbod
    volumes:
      - id: 0
        # 永続ボリューム(PV)を指定
        type: persistent-claim
        # ボリュームの容量
        size: 100Gi
        # このボリュームにKRaft metadata logを配置(1ボリュームのみ指定)
        kraftMetadata: shared
        # Pod削除時にボリュームを削除しない
        deleteClaim: false

  # CPU・メモリ設定
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1000m
      memory: 2Gi

---
# Kafkaクラスタ
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  # Kafkaクラスタ名
  name: my-cluster-01
  # OCPプロジェクト名
  namespace: kafka-01
  annotations:
    strimzi.io/kraft: enabled
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 4.0.0
    metadataVersion: 4.0-IV3

    # Kafka設定
    config:
      # Partitionの複製数
      default.replication.factor: 3
      # Partitionの最小複製数(ISR数)
      min.insync.replicas: 2
      # Consumer Offset用TopicのPartitionの複製数
      offsets.topic.replication.factor: 3
      # Producerのトランザクション用Topicの設定のPartitionの複製数
      transaction.state.log.replication.factor: 3
      # Producerのトランザクション用Topicの設定のPartitionの最小複製数
      transaction.state.log.min.isr: 2
      # Topicの自動作成を無効化(KafkaTopicリソースで管理するため)
      auto.create.topics.enable: false

    # リスナ設定:ポート番号は9092以上(ただし9404、9999 以外)
    listeners:
      # 無認証の内部公開リスナ
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9092
      - name: plain
        port: 9092
        type: internal
        tls: false # TLS認証なし

      # TLS認証ありの内部公開リスナ
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9093
      - name: tls1
        port: 9093
        type: internal
        tls: true

      # TLS認証ありの外部公開リスナ:外部公開リスナ(route)はTLS有効化必須
      # 外部エンドポイント: https://my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com
      - name: tls2
        port: 9094
        type: route
        tls: true

  # Entity Operatorの設定
  entityOperator:
    # Kafka Topicを管理するTopic Operatorの設定
    topicOperator:
      resources:
        requests:
          cpu: 500m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 512Mi

    # Kafkaユーザを管理するUser Operatorの設定
    userOperator:
      resources:
        requests:
          cpu: 500m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 512Mi

リソースファイルの記述方法は以下の公式ドキュメントを参照してください。

デプロイの実行

プロジェクトkafka-01にリソースファイルを適用します。

# リソースファイルを適用
oc apply -f my-cluster-01.yaml -n kafka-01
## kafkanodepool.kafka.strimzi.io/controller-pool created
## kafkanodepool.kafka.strimzi.io/broker-pool created
## kafka.kafka.strimzi.io/my-cluster-01 created

デプロイの確認

しばらく待機してから、デプロイされたPodを確認します。
Broker、Controller、Entity OperatorのPodがデプロイされ、RunningになっていればOKです。

# Podの確認
oc get pod -n kafka-01
## NAME                                             READY   STATUS    RESTARTS   AGE
## my-cluster-01-broker-pool-0                      1/1     Running   0          2m48s
## my-cluster-01-broker-pool-1                      1/1     Running   0          2m48s
## my-cluster-01-broker-pool-2                      1/1     Running   0          2m48s
## my-cluster-01-controller-pool-3                  1/1     Running   0          2m48s
## my-cluster-01-controller-pool-4                  1/1     Running   0          2m48s
## my-cluster-01-controller-pool-5                  1/1     Running   0          2m48s
## my-cluster-01-entity-operator-7469ddfc6f-fhzr7   2/2     Running   0          2m14s

Topicの作成

KafkaTopicリソースでTopicを作成します。Topicの作成・設定変更・削除はKafkaのCLIツールでも可能ですが、管理しやすいようにKafkaTopicリソースで実施することを推奨します。

my-topic-01.yaml
---
# Topic
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  # Topic名
  name: my-topic-01
  # OpenShiftプロジェクト名
  namespace: kafka-01
  labels:
    # Kafkaクラスタ名
    strimzi.io/cluster: my-cluster-01
spec:
  # Partition数
  partitions: 6
  # Partition Replica数(複製数)
  replicas: 3

  # Topic設定
  config:
    # 保持期間(ミリ秒)
    retention.ms: 604800000
# リソースファイルを適用
oc apply -f my-topic-01.yaml -n kafka-01
## kafkatopic.kafka.strimzi.io/my-topic-01 created

Kafkaクライアントの接続先リスナを確認

Kafkaクライアントの接続先となるBrokerリスナのアドレスを確認します。

リスナ名 用途 ポート 認証方式 公開先 (Service)
plain 内部アクセス用 9092 なし 内部 (ClusterIP)
tls1 内部アクセス用 9093 TLSサーバ認証 内部 (ClusterIP)
tls2 外部アクセス用 9094 (443) TLSサーバ認証 外部 (Route)

OpenShiftクラスタ内部から接続する場合

OpenShiftクラスタ内部で公開されているServiceを確認します。Kafkaクライアントの接続先は*-bootstrapのServiceです。

oc get service -n kafka-01
## NAME                                 TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                                        AGE
## my-cluster-01-broker-pool-tls2-0     ClusterIP   172.24.174.73    <none>        9094/TCP                                       6m56s
## my-cluster-01-broker-pool-tls2-1     ClusterIP   172.24.233.40    <none>        9094/TCP                                       6m56s
## my-cluster-01-broker-pool-tls2-2     ClusterIP   172.24.139.255   <none>        9094/TCP                                       6m56s
## my-cluster-01-kafka-bootstrap        ClusterIP   172.24.20.12     <none>        9091/TCP,9092/TCP,9093/TCP                     6m56s
## my-cluster-01-kafka-brokers          ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   6m56s
## my-cluster-01-kafka-tls2-bootstrap   ClusterIP   172.24.254.94    <none>        9094/TCP                                       6m56s

内部公開のリスナのポート番号は90929093なので、該当するServiceはmy-cluster-01-kafka-bootstrapです。
Kafkaクライアントの接続先ホスト名(FQDN)は以下のいずれかを使用できます。

  • my-cluster-01-kafka-bootstrap.kafka-01
  • my-cluster-01-kafka-bootstrap.kafka-01.svc.cluster.local
  • my-cluster-01-kafka-bootstrap(同一プロジェクトからアクセスする場合のみ)

よって、Kafkaクライアントの接続先アドレスは以下になります。

  • 無認証: my-cluster-01-kafka-bootstrap.kafka-01:9092
  • TLS認証あり: my-cluster-01-kafka-bootstrap.kafka-01:9093

OpenShiftクラスタ外部から接続する場合

OpenShiftクラスタ外部に公開されているRouteを確認します。Kafkaクライアントの接続先は*-bootstrapのRouteです。

oc get route -n kafka-01
## NAME                                 HOST/PORT                                                          PATH   SERVICES                             PORT   TERMINATION   WILDCARD
## my-cluster-01-broker-pool-tls2-0     my-cluster-01-broker-pool-tls2-0-kafka-01.apps.ocp420.ossc.hitachi.com            my-cluster-01-broker-pool-tls2-0     9094   passthrough   None
## my-cluster-01-broker-pool-tls2-1     my-cluster-01-broker-pool-tls2-1-kafka-01.apps.ocp420.ossc.hitachi.com            my-cluster-01-broker-pool-tls2-1     9094   passthrough   None
## my-cluster-01-broker-pool-tls2-2     my-cluster-01-broker-pool-tls2-2-kafka-01.apps.ocp420.ossc.hitachi.com            my-cluster-01-broker-pool-tls2-2     9094   passthrough   None
## my-cluster-01-kafka-tls2-bootstrap   my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com          my-cluster-01-kafka-tls2-bootstrap   9094   passthrough   None

Kafkaクライアントの接続先はHOST/PORT列のアドレスで、TLSで443番ポートに接続するため以下となります。

  • TLS認証あり: my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com:443

Kafkaクラスタの動作確認

KafkaクライアントのPodをデプロイして、Kafka CLIツールを使用します。

コンテナ起動時のセキュリティ警告 Warning: would violate PodSecurity ... は今回は無視します。本番運用時は適切なセキュリティ設定を実施したPodを作成してください。

なお、今回のKafkaクライアントは無認証の内部リスナに接続します。認証ありのリスナへの接続方法については、第4回の投稿で紹介します。

クラスタの状態確認

今回はCLIでKafkaクラスタの状態とTopicの情報を確認します。KafkaコンソールによるKafkaクラスタの状態参照については第3回の投稿で説明します。

なお、Kafkaクラスタの更新操作(Topicの作成・設定変更・削除など)はカスタムリソースで管理すべきなので、CLIツールでは実施しないでください。

# KafkaクライアントのPodをデプロイしてログイン
oc run kafka-temp-client -it -n kafka-01 \
--rm=true \
--restart=Never \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--command -- /bin/bash

# Controller/Brokerのメタデータ複製状態を確認
./bin/kafka-metadata-quorum.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
describe --replication
## NodeId  DirectoryId             LogEndOffset    Lag     LastFetchTimestamp      LastCaughtUpTimestampStatus  
## 5       AAAAAAAAAAAAAAAAAAAAAA  5026            0       1764739261073           1764739261073        Leader  
## 3       AAAAAAAAAAAAAAAAAAAAAA  5026            0       1764739260585           1764739260585        Follower
## 4       AAAAAAAAAAAAAAAAAAAAAA  5026            0       1764739260586           1764739260586        Follower
## 1       6UKcy7wh9MuAZ4fuHv4YRA  5026            0       1764739260584           1764739260584        Observer
## 0       mVEioYYhh04VuUCAuCcuig  5026            0       1764739260584           1764739260584        Observer
## 2       lu6ifVn2hALq6GMGRBqpkQ  5026            0       1764739260584           1764739260584        Observer

# クラスタとそのメンバーに関する情報を表示
./bin/kafka-metadata-quorum.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
describe --status
## ClusterId:              _bzBziYWQlOPJAHyWJ2I7w
## LeaderId:               5
## LeaderEpoch:            2
## HighWatermark:          5060
## MaxFollowerLag:         0
## MaxFollowerLagTimeMs:   367
## CurrentVoters:          [{"id": 3, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-3.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}, {"id": 4, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-4.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}, {"id": 5, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-5.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}]
## CurrentObservers:       [{"id": 1, "directoryId": "6UKcy7wh9MuAZ4fuHv4YRA"}, {"id": 0, "directoryId": "mVEioYYhh04VuUCAuCcuig"}, {"id": 2, "directoryId": "lu6ifVn2hALq6GMGRBqpkQ"}]

# Topicの一覧を表示
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--list
## my-topic-01

# Topicの設定を表示
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--describe \
--topic my-topic-01
## Topic: my-topic-01      TopicId: FzFUZwPNSsWc4Mxlf6zaiQ PartitionCount: 6       ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=7200000
##         Topic: my-topic-01      Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1      Elr: LastKnownElr: 
##         Topic: my-topic-01      Partition: 1    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2      Elr: LastKnownElr: 
##         Topic: my-topic-01      Partition: 2    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0      Elr: LastKnownElr: 
##         Topic: my-topic-01      Partition: 3    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2      Elr: LastKnownElr: 
##         Topic: my-topic-01      Partition: 4    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1      Elr: LastKnownElr: 
##         Topic: my-topic-01      Partition: 5    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0      Elr: LastKnownElr: 

# ログアウト
exit

ログアウトするとPodは削除されます。

Topicの読み書き

ProducerのPodを起動してメッセージを書き込みます。

# Kafka ProducerをデプロイしてTopicにメッセージを書き込み
oc run kafka-temp-producer -it -n kafka-01 \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--rm=true \
--restart=Never \
-- bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--topic my-topic-01

> aaa
> bbb
> ccc

別のターミナルでConsumerのPodを起動してメッセージを取得します。

# Kafka ConsumerをデプロイしてTopicからメッセージを取得
oc run kafka-temp-consumer -it -n kafka-01 \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--rm=true \
--restart=Never \
-- bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--topic my-topic-01 \
--from-beginning

aaa
bbb
ccc

Producerでさらにメッセージを送信すると、それをConsumerが受信します。
Ctrl+Cでターミナルを終了するとPodは削除されます。

おわりに

本稿ではOpenShift上にStreams for Apache Kafkaをデプロイする手順を紹介しました。次回はKafkaコンソールのデプロイ方法と参照可能な情報を紹介します。

6
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?