この記事の目的は?
先日のre:InventでMSK Serverlessが発表されていました(こちら)。
現時点ではオハイオリージョンでのみ利用できるパブリックプレビューの段階ですが、MSK発表の頃から登場を期待していたサービスなので早速少し遊んでみました。
この記事はその際の情報をメモ代わりにまとめたものです。特にServerlessになることでどうなるか気になるところを中心に触ってみています。
おやくそくの注意書き
この記事に記載していることは2021/12時点で確認した内容などを記載したものです。
記載内容は個人の見解であり、記載の内容を保証するものではありません。参考にされる際は自己責任でお願いします。
誤りなどがあったらコメントなどで(やさしく)指摘して頂けるとうれしいです。
前置き: クラスタの作成とクラスタへの接続
MSK Serverlessクラスタの作成
まずはAWSのマネージドコンソールからMSK Serverlessのクラスタを作成します。
MSKを開き、リージョンをオハイオリージョンに設定すると、MSK Serverlessを選択できるようになります。
ここでは作成方法の詳細は割愛しますが、クラスメソッドさんの解説(こちら)が丁寧だったので、分からない人はそちらを参照するとよいと思います。
Kafkaクライアントの用意
現時点ではMSK Serverlessがサポートしている認証方法はIAM認証のみです。
ここでは、MSK Serverlessを作成したのと同じVPC/Subnet内にAmazon LinuxのEC2インスタンスを作成して、そこからクラスタへの接続を試みます。
EC2インスタンスの起動
基本的には通常通り起動させるだけです。ちょっと試すくらいであれば t3.medium くらいでおそらく十分です。
上述の通り、MSK ServerlessはIAM認証なので、適切なポリシーを付与したIAM roleを付与する必要があることのみ注意してください。
本来は運用面を考慮して権限を細かく設定する必要があるでしょうが、今回はお試しなので自分のアカウント内であれば何でも許可する乱暴な設定を行っておきました。
(下記のxxxxxxxxxxxにはAWSのアカウント(12桁の数値)が入っています)
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "kafka-cluster:*",
"Resource": [
"arn:aws:kafka:*:xxxxxxxxxxxx:transactional-id/*/*/*",
"arn:aws:kafka:*:xxxxxxxxxxxx:group/*/*/*",
"arn:aws:kafka:*:xxxxxxxxxxxx:cluster/*/*",
"arn:aws:kafka:*:xxxxxxxxxxxx:topic/*/*/*"
]
}
]
}
Kafkaクライアントのインストール
MSKは従来のものもServerlessも提供されるものは基本的にクラスタだけで、クライアントはOSSのApache Kafkaのものをそのまま使います。
なのでまずはApache Kafkaの公式(こちら)からダウンロードします。
Apache Kafkaの公式からダウンロードできる資材にはサーバ/クライアントの区別はないので、利用するバージョンのバイナリをダウンロードします。
Kafkaは開発コミュニティが公式に一定バージョンの後方互換性を保証しているので、基本的には最新バージョンを利用するので問題ないのですが、
ここではMSK ServerlessのGetting Startedに倣って2.8.1を利用することにします (ただし、ScalaのバージョンはApache Kafka推奨の2.13にしました)。
$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
ダウンロードしたアーカイブファイルを展開して、適切なディレクトリに配置し、必要に応じて配置したKafkaのバイナリのbinディレクトリにPATHを通しておきます。
また、KafkaはJavaで動くので、Javaもインストールしておきます。
Kafkaは既にJava14などもサポートしていますが、ここではお試しなので簡単にインストールできたJava11を使うことにしました。
$ sudo amazon-linux-extras install java-openjdk1
認証のためのライブラリと設定ファイルの用意
MSKのIAM認証では、Kafkaの機能を使ってSASL認証を利用するようですが、必要なライブラリがAWS公式からGitHubで公開されています。
AWSのドキュメント(こちら)に倣って、このライブラリをダウンロードし、Kafkaのlibsに配置しておきます
(Kafkaは各コマンドの実行時にlibs以下のjarファイルを自動的に全てクラスパスに含めてくれるので、libs以下に配置しておくだけで大丈夫です)
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
$ mv aws-msk-iam-auth-1.1.1-all.jar path/to/kafka_2.13-2.8.1/libs/
認証のための設定のファイルも用意しておきます。
AWSのドキュメントに倣いclient.propertiesというファイルをconfigs以下に用意し、以下の通りに記載します。
(ファイル名はコマンド実行時に指定するので、client.propertiesでなくても、使いやすいファイル目なら何でもよいです)
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
各パラメタの意味はApache Kafka公式のドキュメント(こちら)に記載されていますので、気になる方はそちらを確認してください。
(Apache KafkaはOSSですが、ドキュメントがとてもしっかり記述されているのが特徴ですね)
MSK Serverlessへの接続
ここまででMSK Serverlessのクラスタを利用する用意はできました。
ここではKafkaではお約束のConsole Producer/Console Consumerを使ってメッセージの送受信をしてみることで、動作確認を行います。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test --replication-factor 3 --partitions 10
Created topic test.
BootstrapServerに指定するアドレス(ここではboot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098)はマネージドコンソールから確認できます。
Console Producerを起動してメッセージ送信
$ kafka-console-producer --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --topic test
> hoge
> fuga
> piyo
Console Consumerを起動してメッセージ受信
$ kafka-console-consumer --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --topic test --from-beginning
hoge
fuga
piyo
どうやら、正しくTopicが作成でき、メッセージの送受信も行えているようです。
本題: MSK Serverlessの挙動を確認していく
前置きが長くなりましたが、ここからが本題。
MSK Serverlessで気になる挙動をいくつか確認していきましょう。
Partition数の制限
AWSの公式にはPartition数の上限は120とありますが(こちら)、これはどうやら1Topicあたりの制限ではなく、1クラスタ全体での制限ということらしいです。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test1 --replication-factor 3 --partitions 10
Created topic test1.
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test2 --replication-factor 3 --partitions 115
Error while executing topic command : Quota exceeded for maximum number of partitions
ERROR org.apache.kafka.common.errors.InvalidRequestException: Quota exceeded for maximum number of partitions
(kafka.admin.TopicCommand$)
test2の自体はPartition数115ですが、先に作成したtest1のPartitionと合わせて120を超えるので、test2の作成は失敗するということのようです。
Replica数の指定
MSK ServerlessではTopicの作成時にReplication-Factorで3以外を指定しても強制的に3に設定されるようです(エラーなどにもならず勝手にReplca数3で作成される)。
後述していますが、Partition Reassignmentもできないようなので、とりあえずReplica数3でTopicを作成した後、任意の数に変更するという処理もできず、Replica数は3で運用していくことになるようです。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test3 --replication-factor 2 --partitions 10
Created topic test3.
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --topic test3
Topic: test3 TopicId: Ifl0f4PsRrGGVcITF-bGww PartitionCount: 10 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=536870912,message.format.version=2.8-IV1,unclean.leader.election.enable=false,retention.bytes=268435456000
Topic: test3 Partition: 0 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test3 Partition: 1 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
Topic: test3 Partition: 2 Leader: 35 Replicas: 35,43,107 Isr: 35,43,107
Topic: test3 Partition: 3 Leader: 77 Replicas: 77,107,71 Isr: 77,107,71
Topic: test3 Partition: 4 Leader: 107 Replicas: 107,35,43 Isr: 107,35,43
Topic: test3 Partition: 5 Leader: 71 Replicas: 71,77,33 Isr: 71,77,33
Topic: test3 Partition: 6 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test3 Partition: 7 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
Topic: test3 Partition: 8 Leader: 35 Replicas: 35,43,107 Isr: 35,43,107
Topic: test3 Partition: 9 Leader: 77 Replicas: 77,107,71 Isr: 77,107,71
test3はReplica数2で作成したはずなのに、ReplicationFactor3になっています。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test4 --replication-factor 4 --partitions 10
Created topic test4.
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --topic test4
Topic: test4 TopicId: Y3KZkHPCQPWRwVtf64NRaA PartitionCount: 10 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=536870912,message.format.version=2.8-IV1,unclean.leader.election.enable=false,retention.bytes=268435456000
Topic: test4 Partition: 0 Leader: 35 Replicas: 35,43,33 Isr: 35,43,33
Topic: test4 Partition: 1 Leader: 43 Replicas: 43,107,71 Isr: 43,107,71
Topic: test4 Partition: 2 Leader: 33 Replicas: 33,35,77 Isr: 33,35,77
Topic: test4 Partition: 3 Leader: 71 Replicas: 71,77,107 Isr: 71,77,107
Topic: test4 Partition: 4 Leader: 77 Replicas: 77,33,35 Isr: 77,33,35
Topic: test4 Partition: 5 Leader: 107 Replicas: 107,71,43 Isr: 107,71,43
Topic: test4 Partition: 6 Leader: 35 Replicas: 35,43,33 Isr: 35,43,33
Topic: test4 Partition: 7 Leader: 43 Replicas: 43,107,71 Isr: 43,107,71
Topic: test4 Partition: 8 Leader: 33 Replicas: 33,35,77 Isr: 33,35,77
Topic: test4 Partition: 9 Leader: 71 Replicas: 71,77,107 Isr: 71,77,107
こちらも同じくtest4はReplica数4で作成したはずなのに、ReplicationFactor3になっています。
データの保存期間
ドキュメントやMSK Serverlessのクラスタ作成時にはメッセージの保存期間が1日と記載されていますが、これはretention.ms=86400000(24時間)がBrokerの設定として行われているということのようです。
Topic Level Configurationで1日より長い期間に上書きしようとしてもInvalidConfigurationと弾かれてしまい、対応できないようです。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --create --topic test5 --replication-factor 3 --partitions 10 --config retention.ms=604800000
Error while executing topic command : You exceeded the maximum allowed limit for configuration retention.ms. The limit is 86400000.
ERROR org.apache.kafka.common.errors.InvalidConfigurationException: You exceeded the maximum allowed limit for configuration retention.ms. The limit is 86400000.
(kafka.admin.TopicCommand$)
コマンド例は記載しませんが、保存容量(retention.bytes)も同様に上限の250GBを超える値を設定しようとしても弾かれてしまいました。
BrokerConfigの確認
kafka-configs.shを利用することでクラスタ内のBrokerの設定を本来は確認できますが、MSK ServerlessではInvalidRequestとしてサポートしていない旨のエラーメッセージが返されます
(本来なら全Brokerの全設定が列挙されるはずです)
$ kafka-configs.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --entity-type brokers --all
All configs for broker 33 are:
Error while executing config command with args '--bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --entity-type brokers --all'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Amazon MSK Serverless doesn't support fetching broker or broker-logger configurations.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
at kafka.admin.ConfigCommand$.getResourceConfig(ConfigCommand.scala:552)
at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$4(ConfigCommand.scala:512)
at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$4$adapted(ConfigCommand.scala:504)
at scala.collection.immutable.List.foreach(List.scala:333)
at kafka.admin.ConfigCommand$.describeResourceConfig(ConfigCommand.scala:504)
at kafka.admin.ConfigCommand$.describeConfig(ConfigCommand.scala:484)
at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:304)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Amazon MSK Serverless doesn't support fetching broker or broker-logger configurations.
Partition Reassignment
MSK Serverlessのクラスタの全体の情報が分からないので、適当な3Brokerを選定して計画を作成して実行させて見ましたが、ALTER_PARTITION_REASSIGNMENTS となり、実行できませんでした。
このエラー(This broker does not support ~)は本来APIバージョンの差分でサポートされていないリクエストを行った時に出るものですが、MSK Serverlessではこれを応用してリクエストを拒否しているのでしょうか?
まずはtest1 Topicに対して、Reassign Planを作成
$ kafka-reassign-partitions.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --generate --broker-list 35,43,33 --topics-to-move-json-file /tmp/msk/topics-to-move.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[33,35,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":1,"replicas":[35,77,107],"log_dirs":["any","any","any"]},{"topic":"test1","partition":2,"replicas":[43,33,71],"log_dirs":["any","any","any"]},{"topic":"test1","partition":3,"replicas":[107,71,77],"log_dirs":["any","any","any"]},{"topic":"test1","partition":4,"replicas":[71,43,33],"log_dirs":["any","any","any"]},{"topic":"test1","partition":5,"replicas":[77,107,35],"log_dirs":["any","any","any"]},{"topic":"test1","partition":6,"replicas":[33,35,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":7,"replicas":[35,77,107],"log_dirs":["any","any","any"]},{"topic":"test1","partition":8,"replicas":[43,33,71],"log_dirs":["any","any","any"]},{"topic":"test1","partition":9,"replicas":[107,71,77],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[35,33,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":1,"replicas":[43,35,33],"log_dirs":["any","any","any"]},{"topic":"test1","partition":2,"replicas":[33,43,35],"log_dirs":["any","any","any"]},{"topic":"test1","partition":3,"replicas":[35,43,33],"log_dirs":["any","any","any"]},{"topic":"test1","partition":4,"replicas":[43,33,35],"log_dirs":["any","any","any"]},{"topic":"test1","partition":5,"replicas":[33,35,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":6,"replicas":[35,33,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":7,"replicas":[43,35,33],"log_dirs":["any","any","any"]},{"topic":"test1","partition":8,"replicas":[33,43,35],"log_dirs":["any","any","any"]},{"topic":"test1","partition":9,"replicas":[35,43,33],"log_dirs":["any","any","any"]}]}
作成されたReassign Planを実行
$ kafka-reassign-partitions.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --execute --reassignment-json-file /tmp/msk/reassignment.json
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[33,35,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":1,"replicas":[35,77,107],"log_dirs":["any","any","any"]},{"topic":"test1","partition":2,"replicas":[43,33,71],"log_dirs":["any","any","any"]},{"topic":"test1","partition":3,"replicas":[107,71,77],"log_dirs":["any","any","any"]},{"topic":"test1","partition":4,"replicas":[71,43,33],"log_dirs":["any","any","any"]},{"topic":"test1","partition":5,"replicas":[77,107,35],"log_dirs":["any","any","any"]},{"topic":"test1","partition":6,"replicas":[33,35,43],"log_dirs":["any","any","any"]},{"topic":"test1","partition":7,"replicas":[35,77,107],"log_dirs":["any","any","any"]},{"topic":"test1","partition":8,"replicas":[43,33,71],"log_dirs":["any","any","any"]},{"topic":"test1","partition":9,"replicas":[107,71,77],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Error reassigning partition(s):
test1-0: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-1: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-2: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-3: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-4: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-5: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-6: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-7: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-8: The broker does not support ALTER_PARTITION_REASSIGNMENTS
test1-9: The broker does not support ALTER_PARTITION_REASSIGNMENTS
Internal Topics
通常のKafkaではOffset Commitの情報やTransactional Producerの情報が__consumer_offsets とか __transaction_state などのInternal Topicに書き込まれますが、MSK ServerlessではTopicの存在自体が見えないようになっているようです。
本来ならば必要な時にこれらのTopicが自動で作成されて kafka-topics.shで --execulude-internal などを明示的に指定しない限り確認できますが、MSK Serverlessでは必要な処理を動かした後でもこれらのTopicは見えませんでした。
(Offset CommitやTransactional ProducerのCommit/Abortは正しく動作しているようなので、Topic自体が存在しないというよりは見えないように作られているということのようです)
通常のKafkaであれば以下のようにInternalのTopicが自動で作られ、kafka-topics.shで確認できる
$ kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
__transaction_state
test1
test2
MSK ServerlessではこれらのTopicはkafka-topics.shでは確認できない
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --list
test1
test2
明示的に指定してもエラーになる
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --topic __transaction_state
Error while executing topic command : Topic '__transaction_state' does not exist as expected
ERROR java.lang.IllegalArgumentException: Topic '__transaction_state' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
at kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:317)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --topic __consumer_offsets
Error while executing topic command : Topic '__consumer_offsets' does not exist as expected
ERROR java.lang.IllegalArgumentException: Topic '__consumer_offsets' does not exist as expected
at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:542)
at kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:317)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:69)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
(kafka.admin.TopicCommand$)
MSK Serverlessで確認できるメトリクス
クラスタに関連して標準で出力されているメトリクスは以下の6つのみでした。
- クライアントから受信する1秒あたりのバイト数 (Topicごと)
- クライアントに送信される1秒あたりのバイト数 (Topicごと)
- トピックの1秒あたりの受信メッセージ数 (Topicごと)
- 推定最長遅延時間 (クラスタ全体)
- オフセット遅延の最大値 (クラスタ全体)
- オフセット遅延の合計 (クラスタ全体)
Kafkaクラスタ全体の情報 (何台のBrokerで構成されたクラスタなのか知りたい)
何らかの方法があるかとあれこれ試しましたが、わかりませんでした。
通常はZooKeeperに接続して/brokers/ids 以下の情報を確認するか、上述のkafka-configs --all などで分かりますが、ZooKeeperの接続情報は提供されず、kafka-configs.shも使えずで単純には無理そうでした。
Partitionはどのように配置されているのか
そもそもこれを知らなくてよいというのがServerlessなわけですが。
少し多めにPartition数を作成してみると、同じようなBrokerが表示されていますが、ある程度グループ的なものが存在していることが伺えます。
$ kafka-topics.sh --bootstrap-server boot-xxxxxxxx.xx.kafka-serverless.us-east-2.amazonaws.com:9098 --command-config config/client.properties --describe --topic test6
Topic: test6 TopicId: j-2ZHJ86RFW4ECjHzJiYFg PartitionCount: 20 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=536870912,message.format.version=2.8-IV1,unclean.leader.election.enable=false,retention.bytes=268435456000
Topic: test6 Partition: 0 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test6 Partition: 1 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
Topic: test6 Partition: 2 Leader: 35 Replicas: 35,43,107 Isr: 35,43,107
Topic: test6 Partition: 3 Leader: 77 Replicas: 77,107,71 Isr: 77,107,71
Topic: test6 Partition: 4 Leader: 107 Replicas: 107,35,43 Isr: 107,35,43
Topic: test6 Partition: 5 Leader: 71 Replicas: 71,77,33 Isr: 71,77,33
Topic: test6 Partition: 6 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test6 Partition: 7 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
Topic: test6 Partition: 8 Leader: 35 Replicas: 35,43,107 Isr: 35,43,107
Topic: test6 Partition: 9 Leader: 77 Replicas: 77,107,71 Isr: 77,107,71
Topic: test6 Partition: 10 Leader: 107 Replicas: 107,35,43 Isr: 107,35,43
Topic: test6 Partition: 11 Leader: 71 Replicas: 71,77,33 Isr: 71,77,33
Topic: test6 Partition: 12 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test6 Partition: 13 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
Topic: test6 Partition: 14 Leader: 35 Replicas: 35,43,107 Isr: 35,43,107
Topic: test6 Partition: 15 Leader: 77 Replicas: 77,107,71 Isr: 77,107,71
Topic: test6 Partition: 16 Leader: 107 Replicas: 107,35,43 Isr: 107,35,43
Topic: test6 Partition: 17 Leader: 71 Replicas: 71,77,33 Isr: 71,77,33
Topic: test6 Partition: 18 Leader: 43 Replicas: 43,33,35 Isr: 43,33,35
Topic: test6 Partition: 19 Leader: 33 Replicas: 33,71,77 Isr: 33,71,77
ここではおそらく[43, 71], [33, 107], [35, 77]が同じRackに属している(71と77は逆かもしれませんが)ので、おそらくAZごとに均等に配置されるようAssign計画が作られているのではないかと思われます。
考察: MSK Serverlessがつかえるところ/つかいにくいところ
つかえるところ
Kafkaの主なユースケースであるリアルタイム処理/ストリーム処理では特に問題なく使えそうです。
おそらくOffsetのLag(Producerの送信側のHigh WatermarkとConsumerの受信側のOffset Commitの位置の差)を常にゼロにすることを期待するようなワークロードを想定しているものと思われます。
AWSのサービスだと、GlueやLambdaとの連携でメッセージが入力され次第順次処理をしてくようなワークロードへの適用などがよさそうです。
つかいにいくところ
例えば以下のような使い方には、いまのところMSK Serverlessは向かないようです
- メッセージ入力(Produce)は随時だが、出力(Consume)は日次などでまとめて行うケース (例: まとめてのファイルへの書き出しなど) ※
- メッセージ処理の並列度が高いケース (特にConsumerGroupを利用した処理の並列度はPartition数に依存しているため、並列度がPartition数の120で頭打ちになる)
- 将来的にPartition数を増やすなどして処理性能を拡張させていく運用が必要なケース (Partition Reassignmentが行えないので、Partition数を後から増やすことができない)
- 大量の/巨大なメッセージの処理のために、高いスループットが要求される場合 (通常のクラスタではディスクに依存して性能が向上するため、ディスクの球数やBroker数で調整できる)
※日次の処理であればデータ保持期間が24時間のMSK Serverlessでも対応できそうですが、障害などで期待したタイミングでの処理に失敗したときのリカバリのことを考慮すると日次でも難しそうです
MSKとMSK Serverlessの比較
MSK Serverlessのデータ保持期間/容量、スループット、並列度(Partition数)の制約が問題にならないかというところが選択のポイントでしょうか。
将来的(※)にもMSK Serverlessの制約が問題にならない見込みならばServerlss、気になるところがあるのであればMSK、それでも対応できないケースがあればセルフマネージド(Apache Kafka on EC2など)といった形などかと思われます。
※リアルタイム処理/ストリーム処理ではリリース後に容易に停止させることができず、移行が困難なケースもあるので、データ量の増大などで近い将来問題にならないかを最初によく考慮しておくほうがよいです。
MSK ServerlessとKinesis Data Streamsの比較
MSK Serverlessの登場でKiness Data Streamsとの違い/選択がより分かりにくくなったようには思います。両者の意識しているワークロードも近いですし。
両者の選択の場合は、プロダクト自体の選択ではなく、エコシステム(周辺のプロダクト/サービス)を基準に選ぶことになりそうです。Kinesis Firehoseを使いたい、Kafka Streamsを使いたい、将来的にマルチクラウド化を意識しておきたい、など。
クロージング
利用の制約があり、既存のApache Kafkaのすべてをカバーできるというわけではないようですが、クラスタのメンテナンスなどが不要で、ストリーム処理をより手軽に始められるという点ではよい選択肢ではないかと思います。
これを機にさらにリアルタイム処理/ストリーム処理の利用が広がるとよいでですね。