5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

著者: 伊藤 雅博, 株式会社日立製作所

はじめに

Streams for Apache Kafka (Strimzi) のクラスタに、認証・認可を設定する方法を紹介します。

Streams for Apache Kafka on OpenShiftは、Kubernetes上でApache Kafkaを運用・管理するためのオープンソースプロジェクトStrimziを基盤とした製品です。本稿で紹介する手順の多くはStrimziでも同様に利用できます。

記事一覧:

  1. AWSにおけるOpenShiftプライベートクラスタの構築
  2. OpenShiftにStreams for Apache Kafka (Strimzi) をデプロイする
  3. Streams for Apache Kafka Console (Kafkaコンソール) をデプロイする
  4. Streams for Apache Kafka (Strimzi) の認証・認可(本稿)

前提環境

今回はOpenShiftにStreams for Apache Kafka (Strimzi) をデプロイするで構築済みのKafkaクラスタに、認証・認可の設定を追加します。

  • Cluster Operatorバージョン: 3.0.1-3
  • Console Operatorバージョン: 3.0.1-3

前提となるKafkaクラスタの構成を以下に示します。

ocp-kafka-cluster-console.png

認証・認可の概要

Streams for Apache Kafkaでは、KafkaクライアントとKafkaサーバ間の認証方式を指定できます。なお、Kafkaサーバ間(Broker間、Controller間、Broker-Controller間)の通信は常にmTLS認証が使用されます。

Kafkaの認証・認可の概要を以下に示します。

  • 認証:

    • ユーザ名/パスワード、またはクライアント証明書によるユーザ認証
    • Kafkaリソースでリスナごとに認証方式を指定
    • KafkaUserリソースで認証対象のユーザを定義
  • 認可:

    • 認証済みユーザに対してACLによるアクセス制御を実施
    • KafkaUserリソースでユーザのACLを定義

参考: 第16章 Kafka クラスターへのアクセスのセキュア化

認証の概要

認証方式

利用可能な認証方式を以下に示します(Streams for Apache KafkaではPLAIN認証はサポートされません)。

認証方式 説明
SCRAM-SHA-512認証 ユーザ名・パスワードで認証。TLSサーバ認証(通信暗号化)の有無を選択可能
TLS認証 TLSサーバ認証で通信暗号化。ユーザ認証はなし
mTLS認証(相互TLS認証) TLSサーバ認証で通信暗号化 + クライアント証明書でユーザ認証
OAuth 2.0 外部の認証基盤を利用するユーザ認証
カスタム認証 独自認証を実装

Kafka内で完結する方式を選ぶ場合の目安を以下に示します。

  • ユーザ名・パスワードでユーザ認証したい: SCRAM-SHA-512認証
  • クライアント証明書でユーザ認証したい: mTLS認証
  • ユーザ認証は不要で通信のみ暗号化したい:TLS認証

リスナの公開先

Kafkaではリスナ(クライアントが接続するエンドポイント)単位で認証を設定します。クライアントがOpenShiftクラスタの内部と外部のどちらからアクセスするかによって公開方法が異なります。

  • クラスタ内部に公開する場合:

    • internal: Cluster IPで公開(推奨方式)
  • クラスタ外部に公開する場合:

    • route: Routeで公開(TLS必須、推奨方式)
    • ingress: Ingressで公開(TLS必須)
    • loadbalancer: 外部Load Balancerで公開
    • nodeport: NodePortで公開

一般的には、内部に公開する場合はinternal、外部に公開する場合はrouteを使用します。

認証設定のパターン一覧

Kafka内で完結する方式で、リスナ公開方式をinternalまたはrouteに限定すると、リスナの認証設定は以下の8パターンとなります。

kafka_auth.png

この中から、実現したい内容に応じて方式を選択するとよいでしょう。

認可の概要

Kafkaの認可では、認証済みユーザに対してACL (Access Control List) によるアクセス制御を行います。各種リソースに対する操作権限や、アクセス元ホストの指定が可能です。

ACLの定義例:

  • ユーザTaroに対して、Topictopic-1への操作ReadWriteを許可する
  • ユーザJiroに対して、IPアドレス172.26.1.28からのTopictopic-2への操作Readを許可する

認可はリスナ単位ではなく、Kafkaクラスタ単位で適用されます。

  • リソース:

    • Topic
    • Consumer Group
    • Transaction ID (トランザクショナルProducerを示すID)
    • Cluster
  • リソースに対する操作:

    • Topic: Describe, Read, Write, Create, Delete, Alter, AlterConfigs, DescribeConfigs, All
    • Consumer Group: Describe, Read, Delete, All
    • Transaction ID: Describe, Write, All
    • Cluster: Describe, ClusterAction, Create, Alter, AlterConfigs, DescribeConfigs, IdempotentWrite, All
  • 操作の許可・拒否:

    • 許可: allow、拒否: deny
  • アクセス元ホスト:

    • 例: 172.26.128.14, *

また、認可とは別にクォータで以下のBrokerリソースを制限できます。

  • 送信/受信のネットワーク帯域幅
  • リクエスト送信レート、CPU使用率

Kafkaクラスタの設定

Kafkaリソースの定義

Kafkaクラスタ側で、リスナの認証設定と認可の有効化を実施します。リスナは下表の全8パターンの認証方式に対応する設定例を示します。

# 認証方式 リスナ公開先 リスナポート
1 無認証 [ユーザ認証なし] 内部 9092
2 SCRAM-SHA-512認証(TLS無効化) 内部 9093
3 SCRAM-SHA-512認証(TLS有効化) 外部 9094
4 SCRAM-SHA-512認証(TLS有効化) 外部 9095
5 TLS認証 [ユーザ認証なし] 内部 9096
6 TLS認証 [ユーザ認証なし] 外部 9097
7 mTLS認証 内部 9098
8 mTLS認証 外部 9099

なお、Kafkaクラスタで認可を有効化すると、ユーザ認証がないリスナには接続できても権限がないため、操作はできません。

第2回のリソースファイルに対して、「★」のコメント個所に変更を加えています。

my-cluster-01.yaml
---
# Controllerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controller-pool
  namespace: kafka-01
  labels:
    strimzi.io/cluster: my-cluster-01
spec:
  roles:
    - controller
  replicas: 3
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 50Gi
        kraftMetadata: shared
        deleteClaim: false
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1000m
      memory: 2Gi

---
# Brokerプール
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker-pool
  namespace: kafka-01
  labels:
    strimzi.io/cluster: my-cluster-01
spec:
  roles:
    - broker
  replicas: 3
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        kraftMetadata: shared
        deleteClaim: false
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1000m
      memory: 2Gi

---
# Kafkaクラスタ
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster-01
  namespace: kafka-01
  annotations:
    strimzi.io/kraft: enabled
    strimzi.io/node-pools: enabled
spec:
  kafka:
    version: 4.0.0
    metadataVersion: 4.0-IV3
    config:
      default.replication.factor: 3
      min.insync.replicas: 2
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      auto.create.topics.enable: false

    # ★認可設定
    authorization: 
      # 簡易認可を有効化(Kafkaクラスタ内でACLルールを管理)
      type: simple
      # スーパーユーザを指定(スーパーユーザはすべてのリソースにアクセス可能)
      superUsers:
        - scram-user # SCRAM-SHA-512認証のユーザ

    # ★リスナ設定:各リスナに認証方式を指定。ポート番号は9092以上(ただし9404、9999 以外)
    listeners:
      # 無認証の内部リスナ(ユーザ認証なしのため利用不可)
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9092
      - name: plain
        port: 9092
        type: internal # 内部公開
        tls: false # TLSサーバ認証なし

      # SCRAM-SHA-512認証(TLSなし)の内部リスナ
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9093
      - name: scram1
        port: 9093
        type: internal # 内部公開
        tls: false # TLSサーバ認証なし
        authentication:
          type: scram-sha-512 # SCRAM-SHA-512認証

      # SCRAM-SHA-512認証(TLSあり)の内部リスナ
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9094
      - name: scram2
        port: 9094
        type: internal # 内部公開
        tls: true # TLSサーバ認証あり
        authentication:
          type: scram-sha-512 # SCRAM-SHA-512認証

      # SCRAM-SHA-512認証(TLSあり)の外部リスナ
      # 外部エンドポイント: my-cluster-01-kafka-scram3-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com:443
      - name: scram3
        port: 9095
        type: route # 外部公開
        tls: true # TLSサーバ認証あり
        authentication:
          type: scram-sha-512 # SCRAM-SHA-512認証

      # TLS認証の内部リスナ(ユーザ認証なしのため利用不可)
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9096
      - name: tls1
        port: 9096
        type: internal # 内部公開
        tls: true # TLSサーバ認証あり

      # TLS認証の外部リスナ(ユーザ認証なしのため利用不可)
      # 外部エンドポイント: my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com:443
      - name: tls2
        port: 9097
        type: route # 外部公開
        tls: true # TLSサーバ認証あり

      # mTLS認証の内部リスナ
      # 内部エンドポイント: my-cluster-01-kafka-bootstrap.kafka-01:9098
      - name: mtls1
        port: 9098
        type: internal # 内部公開
        tls: true # TLSサーバ認証あり
        authentication:
          type: tls # TLSクライアント認証あり

      # mTLS認証の外部リスナ
      # 外部エンドポイント: my-cluster-01-kafka-mtls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com:443
      - name: mtls2
        port: 9099
        type: route # 外部公開
        tls: true # TLSサーバ認証あり
        authentication:
          type: tls # TLSクライアント認証あり

  # Entity Operatorの設定
  entityOperator:
    topicOperator:
      resources:
        requests:
          cpu: 500m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 512Mi
    userOperator:
      resources:
        requests:
          cpu: 500m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 512Mi

---

参考:

Kafkaリソースの適用

Kafkaリソースを適用し、Kafkaクラスタの設定を変更します。また、リスナのエンドポイントとなるServiceとRouteを確認します。

# Kafkaクラスタの設定を更新
oc apply -f my-cluster-01.yaml -n kafka-01

# Serviceの確認
oc get service -n kafka-01 | grep bootstrap
## my-cluster-01-kafka-bootstrap          ClusterIP   172.24.75.156    <none>        9091/TCP,9092/TCP,9093/TCP,9094/TCP,9096/TCP,9098/TCP                     6d4h
## my-cluster-01-kafka-mtls2-bootstrap    ClusterIP   172.24.54.185    <none>        9099/TCP                                                                  3h12m
## my-cluster-01-kafka-scram3-bootstrap   ClusterIP   172.24.202.7     <none>        9095/TCP                                                                  3h12m
## my-cluster-01-kafka-tls2-bootstrap     ClusterIP   172.24.251.150   <none>        9097/TCP                                                                  6d4h

# Routeの確認
oc get route -n kafka-01 | grep bootstrap
## my-cluster-01-kafka-mtls2-bootstrap    my-cluster-01-kafka-mtls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com           my-cluster-01-kafka-mtls2-bootstrap    9099    passthrough     None
## my-cluster-01-kafka-scram3-bootstrap   my-cluster-01-kafka-scram3-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com          my-cluster-01-kafka-scram3-bootstrap   9095    passthrough     None
## my-cluster-01-kafka-tls2-bootstrap     my-cluster-01-kafka-tls2-bootstrap-kafka-01.apps.ocp420.ossc.hitachi.com            my-cluster-01-kafka-tls2-bootstrap     9097    passthrough     None

KafkaUserリソースの定義

認証するユーザをKafkaUserリソースで定義して、認可設定(ACL)を記載します。これをOpenShiftに適用すると、ユーザ情報がKafkaクラスタに登録されます。

今回はSCRAM-SHA-512認証とmTLS認証のユーザを作成します。

参考:

SCRAM-SHA-512認証のユーザ

SCRAM認証ユーザの例を示します。独自のパスワードを使用する場合は、パスワードを格納したSecretも作成します。

scram-user.yaml
---
# SCRAM認証のユーザ
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  # ユーザ名
  name: scram-user
  # OpenShiftプロジェクト名
  namespace: kafka-01
  labels:
    # Kafkaクラスタ名
    strimzi.io/cluster: my-cluster-01
spec:
  # 認証設定
  authentication:
    type: scram-sha-512 # SCRAM認証

    # 独自のパスワードを使用する場合は以下を追加。未指定ならランダム生成。
    password: 
      valueFrom:
        secretKeyRef:
          # パスワードを格納したSecretを参照
          name: scram-user-password
          # パスワードのキー
          key: password

  # 認可設定なし(スーパーユーザとして全権限を付与するため)

---
# SCRAM認証のユーザのパスワードを格納したSecret
apiVersion: v1
kind: Secret
metadata:
  name: scram-user-password
  namespace: kafka-01
type: Opaque
stringData:
  password: password777 # パスワード

---

参考: 16.3.2.3. SCRAM-SHA-512 認証

mTLS認証のユーザ

mTLS認証のユーザに、特定のリソースへの権限を付与する例を示します。

mtls-user.yaml
---
# mTLS認証のユーザにアクセス権限を付与
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  # ユーザ名
  name: mtls-user
  # OpenShiftプロジェクト名
  namespace: kafka-01
  labels:
    # Kafkaクラスタ名
    strimzi.io/cluster: my-cluster-01
spec:
  # 認証設定
  authentication:
    type: tls # mTLS認証

  # 認可設定(ACL)
  authorization:
    type: simple # 簡易認可
    acls:
      # Topicに対する権限
      - resource:
          type: topic
          patternType: literal
          # Topic名
          name: my-topic-01
        operations:
          - Describe
          - Read
          - Write
          #- Create
          #- Delete
          #- Alter
          #- AlterConfigs
          - DescribeConfigs
          #- All
        type: allow
        host: "*"

      # Consumer Groupに対する権限
      - resource:
          type: group
          patternType: prefix # Prefix(接頭辞)で指定
          # Consumer Group名
          name: my-group-
        operations:
          - Describe
          - Read
          #- Delete
          #- All
        type: allow
        host: "*"

      # トランザクションIDに対する権限
      - resource:
          type: transactionalId
          patternType: literal
          # トランザクションID
          name: "*"
        operations:
          - Describe
          #- Write
          #- All
        type: allow
        host: "*"

      # Clusterに対する権限
      - resource:
          type: cluster
        operations:
          - Describe
          #- ClusterAction
          #- Create
          #- Alter
          #- AlterConfigs
          #- DescribeConfigs
          #- IdempotentWrite
          #- All
        type: allow
        host: "*"

  # クォータ設定
  quotas:
    # Brokerへの送信レート上限(Brokerごとのbps/秒)
    producerByteRate: 1048576 # 1Mbps/秒
    # Brokerからの取得レート上限(Brokerごとのbps/秒)
    consumerByteRate: 2097152 # 2Mbps/秒
    # Brokerでのリクエスト処理の最大CPU使用率(%)
    requestPercentage: 55
    # Topic Partitionの作成・削除・設定変更リクエストなどのレート上限(Partition数/秒など)
    controllerMutationRate: 10

---

参考: 16.3.2.1. mTLS 認証

KafkaUserリソースの適用

KafkaUserリソースを適用し、Kafkaユーザを作成できたか確認します。

# Kafkaユーザの作成
oc apply -f scram-user.yaml -n kafka-01
oc apply -f mtls-user.yaml -n kafka-01

# Kafkaユーザの確認: ユーザの Ready ステータスが True に変わるまで待つ
oc get kafkausers -n kafka-01
## NAME         CLUSTER         AUTHENTICATION   AUTHORIZATION   READY
## mtls-user    my-cluster-01   tls              simple          True
## scram-user   my-cluster-01   scram-sha-512                    True

READY列がTrueにならない場合は、User Operatorのログを確認してください。

Kafkaクライアントの設定

OpenShiftクラスタ内にKafkaクライアントのPodを起動して、定義した各リスナにアクセスします。クライアントには以下の認証情報を持たせる必要があります。

# 認証方式 クライアントに必要な認証情報
1 無認証 [ユーザ認証なし] なし
2 SCRAM-SHA-512認証(TLSなし) ユーザ名・パスワード
3 SCRAM-SHA-512認証(TLSあり) ユーザ名・パスワード、サーバのルート証明書
4 TLS認証 [ユーザ認証なし] サーバのルート証明書
5 mTLS認証 サーバのルート証明書、クライアント証明書

OpenShiftクラスタ内部のクライアントの場合、以下を実施することで上記の認証情報を持たせることができます。

  • 認証設定用のクライアント設定ファイル(.properties)をSecretに格納しマウント
  • Cluster Operatorが自動作成した、証明書の格納されているSecretをマウント

クライアントにマウントするSecretを以下に示します。認証方式ごとに必要なSecretが異なりますが、今回は上記5つの認証方式を1つのクライアントで試すため、下記3つのSecretをすべてマウントします。

kafka_cert_secrets.png

今回はOpenShiftクラスタ外部のKafkaクライアントからのアクセスは試しません。外部から接続する場合は、Secretから抽出した証明書および設定ファイルを用います。

KeyStoreのパスワードを確認

mTLS認証で使用するクライアント証明書のPKCS#12ファイル(公開鍵+秘密鍵)のパスワードを控えておきます。これは次に説明するクライアント設定ファイルに記載します。

oc get secret mtls-user -n kafka-01 -o=jsonpath='{.data.user\.password}' | base64 -d
## tJGzqwJXyhRwMbQAXoFt9qh4Bp7xbuqR

Kafkaクライアントのリソースを定義

各認証方式を使用できるKafkaクライアントのPodを定義します。

auth-client.yaml
---
# クライアント設定ファイルを格納したSecret
apiVersion: v1
kind: Secret
metadata:
  name: client-configs
  namespace: kafka-01
type: Opaque
stringData:
  # SCRAM-SHA-512認証(TLSなし)の設定ファイル
  scram.properties: |
    # SCRAM-SHA-512認証+TLS暗号化なし
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=SCRAM-SHA-512

    # SCRAM認証のユーザ・パスワード
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="scram-user" password="password777";

  # SCRAM-SHA-512認証(TLSあり)の設定ファイル
  scramtls.properties: |
    # SCRAM-SHA-512認証+TLS暗号化あり
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512

    # SCRAM認証のユーザ・パスワード
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="scram-user" password="password777";

    # サーバ認証の設定
    # TrustStoreのファイル形式
    ssl.truststore.type=PEM
    # サーバのルート証明書
    ssl.truststore.location=/etc/kafka/ca/ca.crt

  # TLS認証の設定ファイル
  tls.properties: |
    # TLS認証
    security.protocol=SSL

    # サーバ認証の設定
    # TrustStoreのファイル形式
    ssl.truststore.type=PEM
    # サーバのルート証明書
    ssl.truststore.location=/etc/kafka/ca/ca.crt

  # mTLS認証の設定ファイル
  mtls.properties: |
    # TLS認証
    security.protocol=SSL

    # サーバ認証の設定
    # TrustStoreのファイル形式
    ssl.truststore.type=PEM
    # サーバのルート証明書
    ssl.truststore.location=/etc/kafka/ca/ca.crt

    # クライアント認証の設定
    # KeyStoreのファイル形式
    ssl.keystore.type=PKCS12
    # クライアント証明書のPKCS#12形式ファイル
    ssl.keystore.location=/etc/kafka/mtls-user/user.p12
    # KeyStoreのパスワード
    ssl.keystore.password=tJGzqwJXyhRwMbQAXoFt9qh4Bp7xbuqR

---
# 認証を使用するKafkaクライアントのPod
apiVersion: v1
kind: Pod
metadata:
  name: auth-client
  namespace: kafka-01
spec:
  containers:
    - name: app
      image: registry.redhat.io/amq-streams/kafka-40-rhel9:3.0.1
      command: ["/bin/sh", "-c", "sleep infinity"]
      resources:
        requests:
          memory: "512Mi"
        limits:
          memory: "1024Mi"
      volumeMounts:
        # クライアント設定ファイルを格納したSecretをマウント
        - name: kafka-client-configs
          mountPath: /etc/kafka/configs
          readOnly: true
        # サーバのルート証明書を格納したSecretをマウント
        - name: kafka-ca-cert
          mountPath: /etc/kafka/ca
          readOnly: true
        # クライアント証明書を格納したSecretをマウント
        - name: mtls-user-cert
          mountPath: /etc/kafka/mtls-user
          readOnly: true
  volumes:
    # クライアント設定ファイルを格納したSecret
    - name: kafka-client-configs
      secret:
        secretName: client-configs
    # サーバのルート証明書を格納したSecret: SCRAM-SHA-512認証(TLSあり)、TLS認証、mTLS認証で使用
    - name: kafka-ca-cert
      secret:
        secretName: my-cluster-01-cluster-ca-cert
    # クライアント証明書を格納したSecret: mTLS認証で使用
    - name: mtls-user-cert
      secret:
        secretName: mtls-user

---

Kafkaクライアントの作成

上記のリソースファイルを適用してKafkaクライアントのPodを起動します。

コンテナ起動時のセキュリティ警告 Warning: would violate PodSecurity ... は今回は無視します。本番運用時は適切なセキュリティ設定を実施したPodを作成してください。

# リソースを作成
oc apply -f auth-client.yaml -n kafka-01

Kafkaクライアントの動作確認

KafkaクライアントPodにログインし、内部公開されている各リスナへ接続してTopic一覧を表示します。

ただし、Kafkaクラスタで簡易認可を有効化しているため、ユーザ認証のないリスナには接続できてもTopicに対するアクセス権がありません。そのため結果は以下のようになります。

# 認証方式 リスナポート 結果
1 無認証 [ユーザ認証なし] 9092 何も表示されない
2 SCRAM-SHA-512認証(TLSなし) 9093 全Topicを表示
3 SCRAM-SHA-512認証(TLSあり) 9094 全Topicを表示
4 TLS認証 [ユーザ認証なし] 9096 何も表示されない
5 mTLS認証 9098 権限あるmy-topic-01のみ表示
# KafkaクライアントのPodにログイン
oc exec -it auth-client -n kafka-01 -- /bin/bash

# 無認証のリスナにアクセス:
# Topic一覧を確認すると、権限がないため何も表示されない
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9092 \
--list
## 何も表示されない

# SCRAM-SHA-512認証(TLSなし)のリスナにアクセス:
# Topic一覧を確認すると、全Topicが表示される
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9093 \
--command-config /etc/kafka/configs/scram.properties \
--list
## __consumer_offsets
## my-topic-01

# SCRAM-SHA-512認証(TLSあり)のリスナにアクセス:
# Topic一覧を確認すると、全Topicが表示される
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9094 \
--command-config /etc/kafka/configs/scramtls.properties \
--list
## __consumer_offsets
## my-topic-01

# TLS認証のリスナにアクセス:
# Topic一覧を確認すると、権限がないため何も表示されない
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9096 \
--command-config /etc/kafka/configs/tls.properties \
--list
## 何も表示されない

# mTLS認証のリスナにアクセス:
# Topic一覧を確認すると、アクセス権限のある`my-topic-01`のみ表示される
./bin/kafka-topics.sh \
--bootstrap-server my-cluster-01-kafka-bootstrap.kafka-01:9098 \
--command-config /etc/kafka/configs/mtls.properties \
--list
## my-topic-01

# ログアウト
exit

Kafkaコンソールの認証設定

Kafkaコンソール用のSCRAM認証ユーザを定義して、KafkaコンソールがそのユーザでSCRAM認証リスナへ接続するように設定します。

my-console.yaml
---
# Kafkaコンソール用のSCRAM認証ユーザ
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  # ユーザ名
  name: console-user
  namespace: kafka-01
  labels:
    strimzi.io/cluster: my-cluster-01
spec:
  # 認証設定
  authentication:
    type: scram-sha-512 # SCRAM-SHA-512認証

  # 認可設定:Kafkaコンソールに必要な権限を付与
  authorization:
    type: simple
    acls:
      # すべてのTopicリソースに対する Read、Describe、DescribeConfigs 権限
      - resource:
          type: topic
          name: "*"
          patternType: literal
        operations:
          - Read
          - Describe
          - DescribeConfigs

      # すべてのConsumer Groupリソースに対する Read、Describe 権限
      - resource:
          type: group
          name: "*"
          patternType: literal
        operations:
          - Read
          - Describe

      # Clusterリソースに対する Describe、DescribeConfigs 権限
      - resource:
          type: cluster
        operations:
          - Describe
          - DescribeConfigs

---
# Kafkaコンソール
apiVersion: console.streamshub.github.com/v1alpha1
kind: Console
metadata:
  name: my-console
  namespace: kafka-01
spec:
  # OpenShiftクラスタ外におけるコンソールのホスト名
  hostname: my-console.ossc.hitachi.com
  # 接続先Kafkaクラスタ
  kafkaClusters:
    # クラスタ名
    - name: my-cluster-01
      # OpenShiftプロジェクト名
      namespace: kafka-01
      # リスナ名
      listener: scram1 # SCRAM-SHA-512認証(TLSなし)

      # 認証ありのリスナに接続するため、認証用のユーザ(KafkaUser)を指定
      credentials:
        kafkaUser:
          # 認証ユーザ名
          name: console-user

---

リソースファイルを適用します。

oc apply -f my-console.yaml -n kafka-01

おわりに

本稿では、Streams for Apache Kafka (Strimzi)における認証・認可の設定方法と、その動作検証結果を紹介しました。実現したい要件に応じて認証方式を選択し、ACLによる適切な権限設定を行うことで、安全なKafkaクラスタ運用が可能となります。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?