はじめに
この記事では、MSKのクラスタータイプをプロビジョンドでデプロイし、producerの送信からconsumerの受信まで一連の流れを理解することを目的としています。
なお、MSKクラスターへのアクセスコントロール方法には、IAMロールベースの認証を用いて行います。
やってみる
MSK クラスターの作成
作成方法
- クイック作成
- カスタム作成
ここでは、理解しながら作成したいため、「カスタム作成」を選びます。
クラスター名
クラスタータイプ
- サーバレス
- クラスターの容量を管理およびスケールすることなくApache Kafkaを実行することができます。
- プロビジョンド
- ブローカー数とブローカーあたりのストレージ容量を指定します。
Apache Kafka バージョン
ブローカー
ブローカータイプ
検証用途なので、「kafka.t3.small」を選択しました。
ゾーン数
- 3
- 2
AZ数です。3が推奨となりますが、検証用途のため、「2」としています。
クラスター作成後に変更ができないことに留意が必要です。
ゾーンあたりのブローカー
1ゾーンあたりのブローカー数です。後で増やすことが可能です。
検証用途のため、「1」としています。
ストレージ
ブローカーあたりのAmazon EBSストレージ
検証用途のため「1 GiB」にしています。
ブローカーあたりのプロビジョンドストレージスループット
10 GiB 以上のボリュームサイズの場合、250 MiB/秒以上のストレージスループットをプロビジョニングできます。
250 MiB/秒がデフォルトです。
ストレージスループットをプロビジョニングするには、kafka.m5.4xlarge 以上のブローカータイプを選択する必要があります。
構成
クラスター設定
今回は、「Amazon MSK のデフォルト設定」でいきます。
Netzwerk
VPC、サブネット、セキュリティグループを指定します。
セキュリティ設定
アクセスコントロール方法
- 認証されていないアクセス
- 認証不要、全てアクセス可
- IAM ロールベースの認証
- IAMを使用して認証
- SASL/SCRAM認証
- ACMによるTLSクライアント認証
- AWS Private CAを使用してクライアント認証
今回は「IAMロールベースの認証」を使用します。
暗号化
転送中のデータを暗号化
アクセスコントロール方法が IAM、SASL/SCRAM、または TLS の場合、クライアントは TLS 暗号化を使用してブローカーと通信する必要があります。
保管時のデータの暗号化
- AWS マネージド CMK を使用
- カスタマーマネージドキーを使用
今回は、「AWS マネージド CMK を使用」のままでいきます。
モニタリング
このクラスターの Amazon CloudWatch メトリクス
- 基本モニタリング
- 拡張ブローカーレベルモニタリング
- 拡張トピックレベルモニタリング
- 拡張パーティションレベルモニタリング
無料の「基本モニタリング」を選択しています。
Prometheus によるオープンモニタリング
時系列メトリクスデータのオープンソースモニタリングシステムである Prometheus を使用して、MSK クラスターをモニタリングできます。
ブローカーログの配信
- Amazon CloudWatch Logs
- Amazon S3 に配信
- Amazon Kinesis Data Firehose
今回は接続確認だけを行いたいので、特に設定しませんでした。
IAM アクセスコントロール
先程アクセスコントロール方法で、「IAM ロールベースの認証」を指定しました。
IAM アクセスコントロールを使用するには、クライアントプロパティを設定し、アクションを許可または拒否する IAM ポリシーを指定する必要があります。
今回はEC2からアクセスするので、EC2を信頼関係とした以下のポリシーをアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:0123456789012:cluster/MSK-Cluster/abcd1234-0123-abcd-5678-1234abcd-1"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:0123456789012:topic/MSK-Cluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:0123456789012:group/MSK-Cluster/*"
]
}
]
}
なお、インスタンスタイプに応じた TCP 新規接続数のレート制限があることに留意が必要そうです。
- 20 TCP connections per broker per second (t3.small 以外)
- 4 TCP connections per broker per second (t3.small)
EC2
先程の手順で作成したIAMロールをアタッチしたEC2を準備します。
セキュリティグループ
ポート | |
---|---|
9092 | プレーンテキストでブローカーと通信 |
9094 | TLS 暗号化を使用してブローカーと通信(プライベート) |
9194 | TLS 暗号化を使用してブローカーと通信(パブリック) |
9096 | SASL/SCRAM を使用してブローカーと通信(プライベート) |
9196 | SASL/SCRAM を使用してブローカーと通信(パブリック) |
9098 | IAM を使用してブローカーと通信(プライベート) |
9198 | IAMを使用してブローカーと通信(パブリック) |
2182 | TLS 暗号化を使用して Apache と通信 |
2181 | Apache ZooKeeper ノードのデフォルト |
MSKクラスターのセキュリティグループのインバウンドでEC2からの上記で必要なポートを開放します。
各種インストール
java
javaのインストール
sudo yum install -y java-11
java -version
openjdk version "11.0.21" 2023-10-17 LTS
Apache Kafka
Apache Kafkaのダウンロード
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
Amazon MSK IAM JAR
/libsディレクトリ
でAmazon MSK IAM JARファイルをダウンロード
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
/bin ディレクトリ
で以下を作成
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
Topicの作成
/bin ディレクトリ
で以下を実行
./kafka-topics.sh --bootstrap-server b-1.mskcluster.XXXXXX.XX.kafka.ap-northeast-1.amazonaws.com:9098 --command-config client.properties --create --topic MSK-TopicA --partitions 2 --replication-factor 2
Created topic MSK-TopicA.
作成されたTopicを確認
./kafka-topics.sh --bootstrap-server b-1.mskcluster.XXXXXX.XX.kafka.ap-northeast-1.amazonaws.com:9098 --command-config client.properties --describe --topic MSK-TopicA
Topic: MSK-TopicA TopicId: xVa-r9amSr2LENOBIHYw2w PartitionCount: 2 ReplicationFactor: 2 Configs: min.insync.replicas=1,message.format.version=2.8-IV1,unclean.leader.election.enable=true
Topic: MSK-TopicA Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: MSK-TopicA Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
producerの実行
/bin ディレクトリ
で以下を実行
./kafka-console-producer.sh --broker-list b-1.mskcluster.XXXXXX.XX.kafka.ap-northeast-1.amazonaws.com:9098 --producer.config client.properties --topic MSK-TopicA
>test1
>test2
>test3
consumerの実行
/bin ディレクトリ
で以下を実行
./kafka-console-consumer.sh --bootstrap-server b-1.mskcluster.XXXXXX.XX.kafka.ap-northeast-1.amazonaws.com:9098 --consumer.config client.properties --topic MSK-TopicA --from-beginning
test2
test1
test3
順番が違うなと思ったら、Kafkaは同一のPartitionでのみ順序を保証しているため、順序保証が必要であれば同一のPartitionにメッセージを入れる必要があるみあたいですね。