20
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【初心者】Amazon Managed Streaming for Apache Kafka (MSK) / MSK Serverless を使ってみる

Posted at

1. 目的

  • 知識0からKafkaの学習をしている。公式チュートリアルを実施して、基本構成やKafka用語についてざっくり理解できるようにする。

2. やったこと

  • 以下2つの公式チュートリアルを実施し、構築手順や処理の流れを確認する。
  • 内容としては以下が含まれる。
    • Kafkaクラスター作成
    • Topicの作成
    • Producerからのメッセージ送信
    • Consumerからのメッセージ取得

3. Amazon Managed Streaming for Apache Kafka (MSK) とは(自分の理解)

  • Kafkaクラスター(自分で構築する場合、インストール、冗長化設定、バージョンアップなど煩雑)をAWS Managedで提供してくれるサービス。

4. 構成図

構成図.png

5. 手順

基本的にチュートリアルの手順通りに実施する。詰まった点や補足的に実施したコマンドなどを記載する。

5.1 事前準備

  • 3つのsubnet(全て別AZ)を持つVPCを作成しておく。(通常版のKafkaクラスターは3AZでの冗長が基本構成のため)

5.2 Getting Started Using Amazon MSK (通常版Kafkaクラスター)

Step 1: Create an Amazon MSK Cluster

  • 「Amazon MSK - クラスター - クラスターの作成」 からKafkaクラスターを作成する。以下がデフォルト値から変更しているところ。

    • 作成方法: 「カスタム作成」を選択 (「クイック作成」だとDefaultVPCに作成されてしまう)
    • クラスタータイプ: 「プロビジョンド」を選択 (「プロビジョンド」が通常版、「サーバーレス」がServerless版)
    • ブローカータイプ: 「kafka.t3.small」を選択 (検証用のため一番小さいインスタンスタイプ)
    • VPC: あらかじめ作成した、subnetが3つあるVPCを選択
    • アクセスコントロール方法: 「認証されていないアクセス」を選択 (認証なしでシンプルにインスタンスから読み書きを行うため)
    • 暗号化: 「プレーンテキスト」を選択 (追加の設定をしなくてよいようにするため)

kafka01a.png

kafka02a.png
kafka03a.png
kafka04a.png
kafka05a.png
kafka06a.png
kafka07a.png
kafka08a.png
kafka09a.png

Step 2: Create a Client Machine

  • Kafkaクラスターと同一VPC内にAmazon Linux 2 のインスタンスを起動する。負荷をかける作業は行わないため、インスタンスタイプはt2.microにする。

Step 3: Create a Topic

  • Kafkaのアプリケーションをインストールし、topicを作成する。
[ec2-user@ip-10-0-0-54 ~]$ sudo yum install java-1.8.0
[ec2-user@ip-10-0-0-54 ~]$ wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
[ec2-user@ip-10-0-0-54 ~]$ tar -xzf kafka_2.12-2.6.2.tgz

# Topicの作成
[ec2-user@ip-10-0-0-54 ~]$ bin/kafka-topics.sh --create --zookeeper [ZookeeperConnectString] --replication-factor 3 --partitions 1 --topic MSKTutorialTopic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic MSKTutorialTopic.

# Topicのリスト表示
[ec2-user@ip-10-0-0-54 kafka_2.12-2.6.2]$ bin/kafka-topics.sh --list --zookeeper [ZookeeperConnectString]
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
MSKTutorialTopic
__amazon_msk_canary
__consumer_offsets
[ec2-user@ip-10-0-0-54 kafka_2.12-2.6.2]$
  • [ZookeeperConnectString]は、マネージメントコンソールの以下の箇所を参照する。

kafka10b.png

Step 4: Produce and Consume Data

  • 通信方式をPLAINTEXTに設定する。(Kafkaクラスター作成時も暗号化なしにしているため)
[ec2-user@ip-10-0-0-54 bin]$ cat client.properties
security.protocol=PLAINTEXT
  • bootstrap-server を指定して、Producerとしてメッセージの送信を行う。(「aaaaa」などの文字列をメッセージとして手入力する)
[ec2-user@ip-10-0-0-54 bin]$ ./kafka-console-producer.sh --broker-list [endpoint] --producer.config client.properties --topic MSKTutorialTopic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
>aaaaa
>bbbbb
>ccccc
>^C
  • EC2インスタンスでもう1つ別のターミナルを開き、Consumerとしてメッセージの取得を行う。Producer側で入力した文字列がリアルタイムに取得できる。
[ec2-user@ip-10-0-0-54 bin]$ ./kafka-console-consumer.sh --bootstrap-server [endpoint] --consumer.config client.properties --topic MSKTutorialTopic --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
aaaaa
bbbbb
ccccc
^CProcessed a total of 3 messages
[ec2-user@ip-10-0-0-54 bin]$
  • broker-list/bootstrap-serverのendpointは、マネージメントコンソールの以下の場所を参照。

kafka10a.png

5.3 Getting started using MSK Serverless clusters (Serverless版Kafkaクラスター)

Step 1: Create an MSK Serverless cluster

  • 「Amazon MSK - クラスター - クラスターの作成」 からKafkaクラスターを作成する。以下がデフォルト値から変更しているところ。
    • 作成方法: 「カスタム作成」を選択 (「クイック作成」だとDefaultVPCに作成されてしまう)
    • クラスタータイプ: 「サーバーレス」を選択 (「プロビジョンド」が通常版、「サーバーレス」がServerless版)
    • VPC: あらかじめ作成したVPCを使用する。Subnetは2つのみ選択する。

kafka12a.png
kafka13a.png
kafka14a.png
kafka15a.png
kafka16a.png

Step 2: Create an IAM role

  • EC2インスタンス(Producer/Consumer)からKafkaクラスターへアクセスするためのIAMポリシー/IAMロールを作成する。
  • 今回は作業簡略化のため、チュートリアルのIAMポリシー/IAMロールは作成せず、既存のPowerUser権限のあるIAMロールを使用する。

Step 3: Create a client machine

  • Kafkaクラスターと同一VPC内にAmazon Linux 2 のインスタンスを起動する。負荷をかける作業は行わないため、インスタンスタイプはt2.microにする。
  • PowerUser権限のあるIAMロールをインスタンスに付与する。
  • アプリケーションのインストール、設定を行う。
[ec2-user@ip-10-0-0-130 ~]$ sudo yum -y install java-11
[ec2-user@ip-10-0-0-130 ~]$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
[ec2-user@ip-10-0-0-130 ~]$ tar -xzf kafka_2.12-2.8.1.tgz
[ec2-user@ip-10-0-0-130 libs]$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
[ec2-user@ip-10-0-0-130 bin]$ cat client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

Step 4: Create an Apache Kafka topic

  • Topicを作成する。
# Topicを作成
[ec2-user@ip-10-0-0-130 bin]$ ./kafka-topics.sh --bootstrap-server [endpoint] --command-config client.properties --create --topic msk-serverless-tutorial --partitions 6
Created topic msk-serverless-tutorial.

# Topicのリスト表示
[ec2-user@ip-10-0-0-130 bin]$ ./kafka-topics.sh --bootstrap-server [endpoint] --command-config client.properties --list
msk-serverless-tutorial

# (参考) IAM権限が不足している場合のTopic作成時のエラー
[ec2-user@ip-10-0-0-130 bin]$ ./kafka-topics.sh --bootstrap-server [endpoint] --command-config client.properties --create --topic msk-serverless-tutorial --partitions 6
Error while executing topic command : Authorization failed.
[2022-06-20 01:10:42,909] ERROR org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.
 (kafka.admin.TopicCommand$)

  • bootstrap-serverのendpointは、マネージメントコンソールの以下を参照する。

kafka17a.png

Step 5: Produce and consume data

  • Producerとしてメッセージの送信を行う。
[ec2-user@ip-10-0-0-130 bin]$ ./kafka-console-producer.sh --broker-list [endpoint] --producer.config client.properties --topic msk-serverless-tutorial
>11111
>22222
>33333
>^C
  • ターミナルをもう1個開き、Consumerとしてメッセージの受信を行う。
[ec2-user@ip-10-0-0-130 bin]$ ./kafka-console-consumer.sh --bootstrap-server [endpoint] --consumer.config client.properties --topic msk-serverless-tutorial --from-beginning
11111
22222
33333
^CProcessed a total of 3 messages
  • broker-list/bootstrap-server のendpointの値は、Topic作成時のものと同じものを使用する。

6. 所感

  • Producer --> Kafkaクラスター --> Consumer とメッセージを伝送する、Kafkaの最も基本的なところの動作確認ができた。引き続き、使いい方など深堀していきたい。
20
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?