著者: 伊藤 雅博, 株式会社日立製作所
はじめに
OpenShift上にRed Hat Streams for Apache Kafkaをデプロイする手順を紹介します。
記事一覧:
- AWSにおけるOpenShiftプライベートクラスタの構築
- OpenShiftにStreams for Apache Kafka (Strimzi) をデプロイする(本稿)
- Streams for Apache Kafka Console (Kafkaコンソール) をデプロイする
- Streams for Apache Kafka (Strimzi) の認証・認可
Streams for Apache Kafka (Strimzi) の概要
Streams for Apache Kafka on OpenShiftは、Kubernetes上でApache Kafkaを運用・管理するためのオープンソースプロジェクトStrimziを基盤とした製品です。本稿で紹介する手順の多くはStrimziでも同様に利用できます。
Streams for Apache Kafkaでは、Kafkaクラスタの状態更新はカスタムリソース(CR)で実施し、状態参照はKafkaコンソール(Web UI)で実施します。Kafka CLIツールは動作確認やデバッグ、特殊な運用で使用します。
主なカスタムリソースを以下に示します。
- Kafkaクラスタの作成・設定変更・削除:
Kafka、KafkaNodePoolリソース - Topicの作成・設定変更・削除:
KafkaTopicリソース - ユーザとACLの作成・設定変更・削除:
KafkaUserリソース - Kafkaクラスタのリバランス:
KafkaRebalanceリソース
本稿ではこのうちKafkaクラスタとTopicを作成する例を示します。ユーザとACLの作成については第4回の投稿で説明します。
Kafkaコンソールではノード、Topic、Consumer Groupの情報を参照できます。Kafkaコンソールについては第3回の投稿で説明します。
前提環境
AWSにおけるOpenShiftプライベートクラスタの構築で構築済みの下記OpenShiftクラスタにKafkaをデプロイします。
- プラットフォーム: AWS
- OpenShiftバージョン:
4.20 - ネットワーク構成:
- インターネットへの接続は許可、インターネットからの接続は不許可
- プライベートドメインを使用
- 3AZで冗長化
OpenShiftノード構成:
| ノード種別 | 台数 | OS | インスタンスタイプ | ボリュームタイプ |
|---|---|---|---|---|
| Controller | 3 | RHCOS | m6i.xlarge (4 vCPU, 16 GiBメモリ) | gp3, 100 GB, 2000 IOPS |
| Compute | 3 | RHCOS | m6i.xlarge (4 vCPU, 16 GiBメモリ) | gp3, 200 GB, 2000 IOPS |
| Manager | 1 | Amazon Linux 2023 | m6i.large (2 vCPU, 8 GiBメモリ) | gp3, 100 GB, 1000 IOPS |
構築するKafka環境
バージョン
- Cluster Operatorバージョン:
3.0(Apache Kafka 4.0ベース)
公式ドキュメント: Streams for Apache Kafka 3.0
プロジェクト構成
Streams for Apache KafkaのリソースをデプロイするOpenShiftプロジェクト(ネームスペース)を以下に示します。
| リソース | プロジェクト |
|---|---|
| Cluster Operator | openshift-operators |
| Kafkaクラスタ | kafka-01 |
Cluster Operatorは全プロジェクトから利用できるように、既存のopenshift-operatorsに1つだけインストールします。Cluster Operatorはプロジェクトで分離できないクラスタスコープのリソースを持つため、複数プロジェクトへの重複デプロイはできません。
Kafkaクラスタについては、クラスタごとにプロジェクトを作成してそこにデプロイします。
以下にプロジェクトとPodの配置例を示します。
Kafkaクラスタ構成
今回は動作確認を目的としているため、リソースの割り当てを最小限にしてデプロイします。
| ノード種別 | Pod数 | vCPU | Memory | Disk |
|---|---|---|---|---|
| Controller | 3 | 0.5-1 vCPU | 1-2 GiB | gp3, 50 GB |
| Broker | 3 | 0.5-1 vCPU | 1-2 GiB | gp3, 100 GB |
Kafkaサーバのリスナ定義
Kafkaのエンドポイントとなるリスナの定義について説明します。
以下の内部管理用のリスナは自動作成されます。
| 種別 | 用途 | ポート | Service |
|---|---|---|---|
| Controllerリスナ | Controllerアクセス用 | 9090 | ClusterIP |
| Brokerリスナ | Broker間複製用 | 9091 | ClusterIP |
今回は以下のクライアント用リスナを作成します。認証方式はリスナごとに指定し、公開先はOpenShiftクラスタ内部または外部を選択できます。
今回は無認証(内部)とTLSサーバ認証(内部/外部)のリスナを定義します。ユーザ認証(クライアント認証)については第4回の投稿で紹介します。
| 種別 | 用途 | ポート | 認証方式 | 公開先 (Service) |
|---|---|---|---|---|
| Brokerリスナ | 内部アクセス用 | 9092 | なし | 内部 (ClusterIP) |
| Brokerリスナ | 内部アクセス用 | 9093 | TLSサーバ認証 | 内部 (ClusterIP) |
| Brokerリスナ | 外部アクセス用 | 9094 (443) ※ | TLSサーバ認証 | 外部 (Route) |
※: 外部公開のRouteに対しては、クライアントは443番ポートで接続します(内部のServiceのターゲットポートは9094)。
Cluster Operatorのインストール
OpenShift WebコンソールのOperatorHubを使用して、Streams for Apache Kafka Operatorをインストールします。
ソフトウェアカタログからStreams for Apache Kafkaを検索します。
このCluster Operatorをデフォルトのプロジェクト openshift-operatorsにインストールすると、全プロジェクトから利用可能になります。
- Operator名:
Streams for Apache Kafka - Channel:
stable - バージョン:
3.0.1-3 - インストールモード: クラスターのすべての namespace (デフォルト)
- namespace:
openshift-operators
- namespace:
- 更新の承認:
手動
Kafkaクラスタのデプロイ
Cluster Operatorを利用してKafkaクラスタをデプロイします。
参考:
- 第3章 Streams for Apache Kafka Operator を使用した Kafka コンポーネントのデプロイ
- 7.2. Streams for Apache Kafka Operator を使用した Kafka コンポーネントのデプロイ
Kafkaクラスタ用のプロジェクトを作成
Kafkaクラスタのデプロイ先となるプロジェクトkafka-01を作成します。
# プロジェクト`kafka-01`を作成
oc new-project kafka-01
## Now using project "kafka-01" on server "https://api.ocp420.ossc.hitachi.com:6443".
## ・・・
# 現在のプロジェクトを確認
oc project
## Using project "kafka-01" on server "https://api.ocp420.ossc.hitachi.com:6443".
デプロイ方法の選択
デプロイ方法は以下の2種類がありますが、今回は設定を管理しやすい1.の方法で実施します。
- ローカルでリソースファイル(YAML)を作成し、
oc applyコマンドでデプロイ(推奨) - OpenShift Webコンソールからリソースファイルを作成してデプロイ
リソースファイルの作成
以下のカスタムリソースを作成します。
-
KafkaNodePoolリソース- Kafkaクラスタ内の異なるKafkaノードのグループを表す
- ControllerとBrokerのグループを定義し、それぞれの台数、リソース(vCPU、メモリ)、ストレージ設定などを記載
-
Kafkaリソース- Kafkaクラスタを定義
-
KafkaNodePoolで定義した設定が優先される
作成するリソースファイルを以下に示します。
---
# Controllerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: controller-pool
# OpenShiftプロジェクト名
namespace: kafka-01
labels:
# Kafkaクラスタ名
strimzi.io/cluster: my-cluster-01
spec:
roles:
- controller
# ノード台数
replicas: 3
# ストレージ設定
storage:
type: jbod
volumes:
- id: 0
# 永続ボリューム(PV)を指定
type: persistent-claim
# ボリュームの容量
size: 50Gi
# このボリュームにKRaft metadata logを配置(1ボリュームのみ指定)
kraftMetadata: shared
# Pod削除時にボリュームを削除しない
deleteClaim: false
# CPU・メモリ設定
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
---
# Brokerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker-pool
# OpenShiftプロジェクト名
namespace: kafka-01
labels:
# Kafkaクラスタ名
strimzi.io/cluster: my-cluster-01
spec:
roles:
- broker
# ノード台数
replicas: 3
# ストレージ設定
storage:
# JBODストレージ: 1台以上のボリュームを指定
type: jbod
volumes:
- id: 0
# 永続ボリューム(PV)を指定
type: persistent-claim
# ボリュームの容量
size: 100Gi
# このボリュームにKRaft metadata logを配置(1ボリュームのみ指定)
kraftMetadata: shared
# Pod削除時にボリュームを削除しない
deleteClaim: false
# CPU・メモリ設定
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
---
# Kafkaクラスタ
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
# Kafkaクラスタ名
name: my-cluster-01
# OCPプロジェクト名
namespace: kafka-01
annotations:
strimzi.io/kraft: enabled
strimzi.io/node-pools: enabled
spec:
kafka:
version: 4.0.0
metadataVersion: 4.0-IV3
# Kafka設定
config:
# Partitionの複製数
default.replication.factor: 3
# Partitionの最小複製数(ISR数)
min.insync.replicas: 2
# Consumer Offset用TopicのPartitionの複製数
offsets.topic.replication.factor: 3
# Producerのトランザクション用Topicの設定のPartitionの複製数
transaction.state.log.replication.factor: 3
# Producerのトランザクション用Topicの設定のPartitionの最小複製数
transaction.state.log.min.isr: 2
# Topicの自動作成を無効化(KafkaTopicリソースで管理するため)
auto.create.topics.enable: false
# リスナ設定:ポート番号は9092以上(ただし9404、9999 以外)
listeners:
# 無認証の内部公開リスナ
# 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9092
- name: plain
port: 9092
type: internal
tls: false # TLS認証なし
# TLS認証ありの内部公開リスナ
# 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9093
- name: tls1
port: 9093
type: internal
tls: true
# TLS認証ありの外部公開リスナ:外部公開リスナ(route)はTLS有効化必須
# 外部エンドポイント: https://my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com
- name: tls2
port: 9094
type: route
tls: true
# Entity Operatorの設定
entityOperator:
# Kafka Topicを管理するTopic Operatorの設定
topicOperator:
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 512Mi
# Kafkaユーザを管理するUser Operatorの設定
userOperator:
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 1000m
memory: 512Mi
リソースファイルの記述方法は以下の公式ドキュメントを参照してください。
デプロイの実行
プロジェクトkafka-01にリソースファイルを適用します。
# リソースファイルを適用
oc apply -f my-cluster-01.yaml -n kafka-01
## kafkanodepool.kafka.strimzi.io/controller-pool created
## kafkanodepool.kafka.strimzi.io/broker-pool created
## kafka.kafka.strimzi.io/my-cluster-01 created
デプロイの確認
しばらく待機してから、デプロイされたPodを確認します。
Broker、Controller、Entity OperatorのPodがデプロイされ、RunningになっていればOKです。
# Podの確認
oc get pod -n kafka-01
## NAME READY STATUS RESTARTS AGE
## my-cluster-01-broker-pool-0 1/1 Running 0 2m48s
## my-cluster-01-broker-pool-1 1/1 Running 0 2m48s
## my-cluster-01-broker-pool-2 1/1 Running 0 2m48s
## my-cluster-01-controller-pool-3 1/1 Running 0 2m48s
## my-cluster-01-controller-pool-4 1/1 Running 0 2m48s
## my-cluster-01-controller-pool-5 1/1 Running 0 2m48s
## my-cluster-01-entity-operator-7469ddfc6f-fhzr7 2/2 Running 0 2m14s
Topicの作成
KafkaTopicリソースでTopicを作成します。Topicの作成・設定変更・削除はKafkaのCLIツールでも可能ですが、管理しやすいようにKafkaTopicリソースで実施することを推奨します。
---
# Topic
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
# Topic名
name: my-topic-01
# OpenShiftプロジェクト名
namespace: kafka-01
labels:
# Kafkaクラスタ名
strimzi.io/cluster: my-cluster-01
spec:
# Partition数
partitions: 6
# Partition Replica数(複製数)
replicas: 3
# Topic設定
config:
# 保持期間(ミリ秒)
retention.ms: 604800000
# リソースファイルを適用
oc apply -f my-topic-01.yaml -n kafka-01
## kafkatopic.kafka.strimzi.io/my-topic-01 created
Kafkaクライアントの接続先リスナを確認
Kafkaクライアントの接続先となるBrokerリスナのアドレスを確認します。
| リスナ名 | 用途 | ポート | 認証方式 | 公開先 (Service) |
|---|---|---|---|---|
plain |
内部アクセス用 | 9092 | なし | 内部 (ClusterIP) |
tls1 |
内部アクセス用 | 9093 | TLSサーバ認証 | 内部 (ClusterIP) |
tls2 |
外部アクセス用 | 9094 (443) | TLSサーバ認証 | 外部 (Route) |
OpenShiftクラスタ内部から接続する場合
OpenShiftクラスタ内部で公開されているServiceを確認します。Kafkaクライアントの接続先は*-bootstrapのServiceです。
oc get service -n kafka-01
## NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
## my-cluster-01-broker-pool-tls2-0 ClusterIP 172.24.174.73 <none> 9094/TCP 6m56s
## my-cluster-01-broker-pool-tls2-1 ClusterIP 172.24.233.40 <none> 9094/TCP 6m56s
## my-cluster-01-broker-pool-tls2-2 ClusterIP 172.24.139.255 <none> 9094/TCP 6m56s
## my-cluster-01-kafka-bootstrap ClusterIP 172.24.20.12 <none> 9091/TCP,9092/TCP,9093/TCP 6m56s
## my-cluster-01-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP 6m56s
## my-cluster-01-kafka-tls2-bootstrap ClusterIP 172.24.254.94 <none> 9094/TCP 6m56s
内部公開のリスナのポート番号は9092と9093なので、該当するServiceはmy-cluster-01-kafka-bootstrapです。
Kafkaクライアントの接続先ホスト名(FQDN)は以下のいずれかを使用できます。
my-cluster-01-kafka-bootstrap.kafka-01my-cluster-01-kafka-bootstrap.kafka-01.svc.cluster.local-
my-cluster-01-kafka-bootstrap(同一プロジェクトからアクセスする場合のみ)
よって、Kafkaクライアントの接続先アドレスは以下になります。
- 無認証:
my-cluster-01-kafka-bootstrap.kafka-01:9092 - TLS認証あり:
my-cluster-01-kafka-bootstrap.kafka-01:9093
OpenShiftクラスタ外部から接続する場合
OpenShiftクラスタ外部に公開されているRouteを確認します。Kafkaクライアントの接続先は*-bootstrapのRouteです。
oc get route -n kafka-01
## NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD
## my-cluster-01-broker-pool-tls2-0 my-cluster-01-broker-pool-tls2-0-kafka-01.apps.ocp420.ossc.hitachi.com my-cluster-01-broker-pool-tls2-0 9094 passthrough None
## my-cluster-01-broker-pool-tls2-1 my-cluster-01-broker-pool-tls2-1-kafka-01.apps.ocp420.ossc.hitachi.com my-cluster-01-broker-pool-tls2-1 9094 passthrough None
## my-cluster-01-broker-pool-tls2-2 my-cluster-01-broker-pool-tls2-2-kafka-01.apps.ocp420.ossc.hitachi.com my-cluster-01-broker-pool-tls2-2 9094 passthrough None
## my-cluster-01-kafka-tls2-bootstrap my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com my-cluster-01-kafka-tls2-bootstrap 9094 passthrough None
Kafkaクライアントの接続先はHOST/PORT列のアドレスで、TLSで443番ポートに接続するため以下となります。
- TLS認証あり:
my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com:443
Kafkaクラスタの動作確認
KafkaクライアントのPodをデプロイして、Kafka CLIツールを使用します。
コンテナ起動時のセキュリティ警告 Warning: would violate PodSecurity ... は今回は無視します。本番運用時は適切なセキュリティ設定を実施したPodを作成してください。
なお、今回のKafkaクライアントは無認証の内部リスナに接続します。認証ありのリスナへの接続方法については、第4回の投稿で紹介します。
クラスタの状態確認
今回はCLIでKafkaクラスタの状態とTopicの情報を確認します。KafkaコンソールによるKafkaクラスタの状態参照については第3回の投稿で説明します。
なお、Kafkaクラスタの更新操作(Topicの作成・設定変更・削除など)はカスタムリソースで管理すべきなので、CLIツールでは実施しないでください。
# KafkaクライアントのPodをデプロイしてログイン
oc run kafka-temp-client -it -n kafka-01 \
--rm=true \
--restart=Never \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--command -- /bin/bash
# Controller/Brokerのメタデータ複製状態を確認
./bin/kafka-metadata-quorum.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
describe --replication
## NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestampStatus
## 5 AAAAAAAAAAAAAAAAAAAAAA 5026 0 1764739261073 1764739261073 Leader
## 3 AAAAAAAAAAAAAAAAAAAAAA 5026 0 1764739260585 1764739260585 Follower
## 4 AAAAAAAAAAAAAAAAAAAAAA 5026 0 1764739260586 1764739260586 Follower
## 1 6UKcy7wh9MuAZ4fuHv4YRA 5026 0 1764739260584 1764739260584 Observer
## 0 mVEioYYhh04VuUCAuCcuig 5026 0 1764739260584 1764739260584 Observer
## 2 lu6ifVn2hALq6GMGRBqpkQ 5026 0 1764739260584 1764739260584 Observer
# クラスタとそのメンバーに関する情報を表示
./bin/kafka-metadata-quorum.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
describe --status
## ClusterId: _bzBziYWQlOPJAHyWJ2I7w
## LeaderId: 5
## LeaderEpoch: 2
## HighWatermark: 5060
## MaxFollowerLag: 0
## MaxFollowerLagTimeMs: 367
## CurrentVoters: [{"id": 3, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-3.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}, {"id": 4, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-4.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}, {"id": 5, "directoryId": null, "endpoints": ["CONTROLPLANE-9090://my-cluster-01-controller-pool-5.my-cluster-01-kafka-brokers.kafka-01.svc.cluster.local:9090"]}]
## CurrentObservers: [{"id": 1, "directoryId": "6UKcy7wh9MuAZ4fuHv4YRA"}, {"id": 0, "directoryId": "mVEioYYhh04VuUCAuCcuig"}, {"id": 2, "directoryId": "lu6ifVn2hALq6GMGRBqpkQ"}]
# Topicの一覧を表示
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--list
## my-topic-01
# Topicの設定を表示
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--describe \
--topic my-topic-01
## Topic: my-topic-01 TopicId: FzFUZwPNSsWc4Mxlf6zaiQ PartitionCount: 6 ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=7200000
## Topic: my-topic-01 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Elr: LastKnownElr:
## Topic: my-topic-01 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Elr: LastKnownElr:
## Topic: my-topic-01 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Elr: LastKnownElr:
## Topic: my-topic-01 Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Elr: LastKnownElr:
## Topic: my-topic-01 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Elr: LastKnownElr:
## Topic: my-topic-01 Partition: 5 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Elr: LastKnownElr:
# ログアウト
exit
ログアウトするとPodは削除されます。
Topicの読み書き
ProducerのPodを起動してメッセージを書き込みます。
# Kafka ProducerをデプロイしてTopicにメッセージを書き込み
oc run kafka-temp-producer -it -n kafka-01 \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--rm=true \
--restart=Never \
-- bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--topic my-topic-01
> aaa
> bbb
> ccc
別のターミナルでConsumerのPodを起動してメッセージを取得します。
# Kafka ConsumerをデプロイしてTopicからメッセージを取得
oc run kafka-temp-consumer -it -n kafka-01 \
--image=registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1 \
--rm=true \
--restart=Never \
-- bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--topic my-topic-01 \
--from-beginning
aaa
bbb
ccc
Producerでさらにメッセージを送信すると、それをConsumerが受信します。
Ctrl+Cでターミナルを終了するとPodは削除されます。
おわりに
本稿ではOpenShift上にStreams for Apache Kafkaをデプロイする手順を紹介しました。次回はKafkaコンソールのデプロイ方法と参照可能な情報を紹介します。


