なかなか使う機会が少ないであろうMSKの構築を行ってみる。
手順概要
1 AWS上にMSKをデプロイする
2 IAM権限を設定する
3 設定用のEC2クライアントでブローカーに接続およびTOPIC作成
4 テスト
1 MSKのデプロイ
Amazon MSK>クラスター>クラスターの作成からKafkaクラスターを作成する。
今回は下記のようにProvisioned Standardにて構築する。
クラスター設定
◯クラスターの作成方法:カスタム作成
デフォルトのクイック作成はVPCもデフォルトのものが使われてしまうのでこちらを選択。
◯クラスタータイプ:プロビジョンド
◯Apache Kafkaバージョン:3.6.0(デフォルト)
今回はで推奨の設定を利用する
◯ブローカータイプ:Standardブローカー
◯ゾーン数:3(デフォルト)
◯ゾーンあたりのブローカー:1(デフォルト)
◯クラスター設定:デフォルト
ネットワーク設定
セキュリティ設定
◯アクセスコントロール方法:IAMロールベースの認証(デフォルト)
今回はクライアントをAWS内に持つため最も簡単なIAMを利用した認証とする。
◯クラスターとクライアント間の暗号化:TLS暗号化(デフォルト)
◯クラスター内の暗号化:TLS暗号化(デフォルト)
◯保管時のデータの暗号化:AWSマネージドCMKを使用(デフォルト)
モニタリングおよびタグ
2 IAM権限設定
今回はIAMを使って認証するため下記のようなロールを作成する。
MSKへのアクセスは今回のケースではEC2から行うため下記のようなポリシーからロールを作成し、EC2に対してIAMプロファイルをアタッチする。
アカウントとTOPICは都度書き換える必要がある。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:Account-ID:cluster/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:Account-ID:topic/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:ap-northeast-1:Account-ID:group/MSKTutorialCluster/*"
]
}
]
}
3 ブローカー接続
MSK接続用のクライアントを作成していく。
資材の用意のためインターネット接続可能なEC2を用意して下記を実施。
①Javaインストール
# sudo yum -y install java-11
②kafkaユーザを作成して作業ディレクトリを用意
# sudo -i
# mkdir /opt/kafka
③kafkaとMSK IAM認証用のjarをダウンロード
# cd /opt/kafka/
# wget https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
# tar -xzf kafka_2.13-3.6.0.tgz
# cd kafka_2.13-3.6.0/libs/
# wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
④クライアントの設定を追加
# cd ../bin/
# touch client.properties
# cat << EOF >> touch client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF
⑤TOPIC作成
下記コマンドはそれぞれの環境ごとに修正すること。
エンドポイントは下記の手順で確認できる。
クラスター><自身のクラスター名>>クライアント情報の表示から確認できる。
※例はブローカーが2つの構成。
./kafka-topics.sh --create --bootstrap-server b-1.msktest.XXXXXX.c3.kafka.ap-northeast-1.amazonaws.com:90XX --command-config client.properties --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
この時点でTOPICの作成まで完了している。
あとは実際に試す必要がある。
4 テスト
今回はプロデューサー(書き込み)とコンシューマー(読み込み)の動作を確認するため、2つのセッションでEC2のssh接続を行うかscreen等で切り替えることを推奨する。
①プロデューサー動作確認
下記コマンドはそれぞれの環境ごとに修正すること。
# ./kafka-console-producer.sh --broker-list b-1.msktest.XXXXXX.c3.kafka.ap-northeast-1.amazonaws.com:90XX --producer.config client.properties --topic MSKTutorialTopic
コマンドが通れば好きにメッセージを書き込むことができる。
1行ごとに1メッセージとして処理されEnterで送信される。
②コンシューマー動作確認
下記コマンドはそれぞれの環境ごとに修正すること。
./kafka-console-consumer.sh --bootstrap-server b-1.msktest.XXXXXX.c3.kafka.ap-northeast-1.amazonaws.com:90XX --consumer.config client.properties --topic MSKTutorialTopic --from-beginning
正常に動作するしていればプロデューサー側にて入力したメッセージがコンシューマー側でも1行ずつリアルタイムで反映される。
次回
次回はDRへのレプリケーションを扱う予定です。