事象
Spring-Cloud-Streamで複数のインスタンスでKafkaに接続した場合に、コンシューマーグループを同じものに指定しても、同一のPartitionからメッセージがそれぞれ読み込まれてしまう事象があったので、コンシューマーグループの設定方法をメモ。
- org.springframework.cloud:spring-cloud-stream-parent:1.0.0.M4
設定方法
公式ドキュメントを見る
公式ドキュメンに設定方法の記載があった。
ちなみに最近公式ドキュメントがわかりやすくなっていて、下手にコードを読むよりもこちらを読むほうが有益だった。
以下の4つの項の説明を合わせると対応方法がわかった。
- instanceCount=<インスタンス数>
- instanceIndex=<インスタンスの連番(0からインスタンス数-1)>
- bindings..partitioned=true
- bindings..group=<コンシューマーグループ名>(もしかしたら不要)
設定例
application.ymlの例
spring:
cloud:
stream:
instanceCount: 2
instanceIndex: 0
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true
application.propertiesの例
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0
spring.cloud.stream.bindings.input.destination=test.topic
spring.cloud.stream.bindings.input.group=test_consumer_group
spring.cloud.stream.bindings.input.partitioned=true
instanceCount、instanceIndexの設定
instanceCount、instanceIndexは、環境(開発機、テスト、本番)ごとにことなるので、外部の環境変数で指定する。
環境変数の例
export SPRING_CLOUD_STREAM_INSTANCECOUNT=2
export SPRING_CLOUD_STREAM_INSTANCEINDEX=0
Java起動オプションの例
-Dspring.cloud.stream.instanceCount=2 -Dspring.cloud.stream.instanceIndex=0
以下公式ドキュメントの記載箇所をメモ
2.4 Consumer Groups
コンシューマーグループを指定することで、同一パーティションに対して複数のコンシューマー(インスタンス)からアクセスできそうな説明になっているが、コンシューマーグループをだけ指定しても効果がなかった。
4.1 Spring Cloud Stream Properties
Spring Cloud Streamの設定で、instanceCount
とinstanceIndex
というのがあって、Kafkaでは必須の設定とあった。(Defaultの設定があるため、明示的には不要)
例えば、1台のサーバーにインスタンスを1つ立ち上げるとすると、サーバーが2台の場合は、instanceCountは2、instanceIndexはサーバーごとに0と1になる。
サーバーが3台の場合は、instanceCountは3、instanceIndexはサーバーごとに0と1と2になる。
この設定だけでも効果はなかった。
4.2.2 Consumer properties
partitioned
の説明に「パーティションされたプロデューサーを利用するかどうか」とあるが、これだけだと意味が不明。だが、これが必要だった。
説明では、
spring.cloud.stream.bindings.<channelName>.consumer.partitioned=true
のように、consumer
の要素が必要そうだったが、実際は、
spring.cloud.stream.bindings.<channelName>.partitioned=true
で設定が効いた。このへんはバージョンの違いによるものかもしれない。
8.2 Instance Index and Instance Count
ここにもinstanceCount
とinstanceIndex
の説明がある。Spring Cloud Dataflowにデプロイする場合は、この値は自動的に設定される。が、通常のSpring Cloud Streamアプリケーションの場合は、正確に設定する必要があるとさらっと書いてある。
Configuring Input Bindings for Partitioning
この項に具体的な設定方法が出てくる。なぜ似たような説明がいろんなところにでてきているかは不明だが、これをみることで設定方法がわかる。<channelName>
(例ではinput)にpartitioned=true
を設定して、かつinstanceCountなどを適切に設定しないと正しくpartitionからメッセージを読み込めないので注意が必要。
spring.cloud.stream.bindings.input.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5
動作結果
検証条件
- kafkaのpartition数:3
- コンシューマーのインスタンス数:2
application.ymlの設定内容
spring:
cloud:
stream:
instanceCount: 2
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true
実行ログ
ログレベルは、DEBUGにしています。
インスタンス1のログ
起動オプションを-Dspring.cloud.stream.instanceIndex=0
で起動する
-
id=0
はpartitionのインデックスを示す。 -
@3
の3の数字は、メッセージのオフセットを示す。何番目のメッセージまで読んだかがわかる。 - インスタンス1で受け取っているメッセージは、インデックスが0と2のpartition。
2016-04-09 15:21:20.626 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.724 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.726 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.826 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.828 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.927 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
インスタンス2のログ
起動オプションを-Dspring.cloud.stream.instanceIndex=1
で起動する
- インスタンス2で受け取っているメッセージは、インデックスが1のpartition。
2016-04-09 15:15:20.039 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.142 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.244 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
感想
コンシューマーのインスタンスの増減によって、instanceCount、instanceIndexの設定を各インスタンスで再設定しなおさないといけないのは結構手間。
Spring Cloud DataFlowに寄せれば自動設定してくれるのだろうけど、それを導入するコストを考えると運用の手間が多少合っても目をつぶるべきか。