spring-boot
Kafka
spring-boot-stream

Spring Cloud StreamでKafkaのPartitionをインスタンス別に読み込む

More than 3 years have passed since last update.


事象

Spring-Cloud-Streamで複数のインスタンスでKafkaに接続した場合に、コンシューマーグループを同じものに指定しても、同一のPartitionからメッセージがそれぞれ読み込まれてしまう事象があったので、コンシューマーグループの設定方法をメモ。


  • org.springframework.cloud:spring-cloud-stream-parent:1.0.0.M4


設定方法


公式ドキュメントを見る

公式ドキュメンに設定方法の記載があった。

ちなみに最近公式ドキュメントがわかりやすくなっていて、下手にコードを読むよりもこちらを読むほうが有益だった。

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/

以下の4つの項の説明を合わせると対応方法がわかった。


  • instanceCount=<インスタンス数>

  • instanceIndex=<インスタンスの連番(0からインスタンス数-1)>

  • bindings..partitioned=true

  • bindings..group=<コンシューマーグループ名>(もしかしたら不要)


設定例


application.ymlの例


application.yml

spring:

cloud:
stream:
instanceCount: 2
instanceIndex: 0
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true


application.propertiesの例


application.properties

spring.cloud.stream.instanceCount=2

spring.cloud.stream.instanceIndex=0
spring.cloud.stream.bindings.input.destination=test.topic
spring.cloud.stream.bindings.input.group=test_consumer_group
spring.cloud.stream.bindings.input.partitioned=true


instanceCount、instanceIndexの設定

instanceCount、instanceIndexは、環境(開発機、テスト、本番)ごとにことなるので、外部の環境変数で指定する。


環境変数の例

export SPRING_CLOUD_STREAM_INSTANCECOUNT=2

export SPRING_CLOUD_STREAM_INSTANCEINDEX=0


Java起動オプションの例

-Dspring.cloud.stream.instanceCount=2 -Dspring.cloud.stream.instanceIndex=0


以下公式ドキュメントの記載箇所をメモ


2.4 Consumer Groups

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#consumer-groups

コンシューマーグループを指定することで、同一パーティションに対して複数のコンシューマー(インスタンス)からアクセスできそうな説明になっているが、コンシューマーグループをだけ指定しても効果がなかった。


4.1 Spring Cloud Stream Properties

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_spring_cloud_stream_properties

Spring Cloud Streamの設定で、instanceCountinstanceIndexというのがあって、Kafkaでは必須の設定とあった。(Defaultの設定があるため、明示的には不要)

例えば、1台のサーバーにインスタンスを1つ立ち上げるとすると、サーバーが2台の場合は、instanceCountは2、instanceIndexはサーバーごとに0と1になる。

サーバーが3台の場合は、instanceCountは3、instanceIndexはサーバーごとに0と1と2になる。

この設定だけでも効果はなかった。


4.2.2 Consumer properties

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_consumer_properties

partitionedの説明に「パーティションされたプロデューサーを利用するかどうか」とあるが、これだけだと意味が不明。だが、これが必要だった。

説明では、

spring.cloud.stream.bindings.<channelName>.consumer.partitioned=true

のように、consumerの要素が必要そうだったが、実際は、

spring.cloud.stream.bindings.<channelName>.partitioned=true

で設定が効いた。このへんはバージョンの違いによるものかもしれない。


8.2 Instance Index and Instance Count

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_instance_index_and_instance_count

ここにもinstanceCountinstanceIndexの説明がある。Spring Cloud Dataflowにデプロイする場合は、この値は自動的に設定される。が、通常のSpring Cloud Streamアプリケーションの場合は、正確に設定する必要があるとさらっと書いてある。


Configuring Input Bindings for Partitioning

http://docs.spring.io/spring-cloud-stream/docs/current-SNAPSHOT/reference/htmlsingle/#_configuring_input_bindings_for_partitioning

この項に具体的な設定方法が出てくる。なぜ似たような説明がいろんなところにでてきているかは不明だが、これをみることで設定方法がわかる。<channelName>(例ではinput)にpartitioned=trueを設定して、かつinstanceCountなどを適切に設定しないと正しくpartitionからメッセージを読み込めないので注意が必要。

spring.cloud.stream.bindings.input.partitioned=true

spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5


動作結果


検証条件


  • kafkaのpartition数:3

  • コンシューマーのインスタンス数:2


application.ymlの設定内容


application.yml

spring:

cloud:
stream:
instanceCount: 2
bindings:
input:
destination: test.topic
group: test_consumer_group
partitioned: true


実行ログ

ログレベルは、DEBUGにしています。


インスタンス1のログ

起動オプションを-Dspring.cloud.stream.instanceIndex=0で起動する



  • id=0はpartitionのインデックスを示す。


  • @3の3の数字は、メッセージのオフセットを示す。何番目のメッセージまで読んだかがわかる。

  • インスタンス1で受け取っているメッセージは、インデックスが0と2のpartition。

2016-04-09 15:21:20.626 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3

2016-04-09 15:21:20.724 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.726 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.826 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14
2016-04-09 15:21:20.828 [pool-3-thread-2] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=0]@3
2016-04-09 15:21:20.927 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=2]@14


インスタンス2のログ

起動オプションを-Dspring.cloud.stream.instanceIndex=1で起動する


  • インスタンス2で受け取っているメッセージは、インデックスが1のpartition。

2016-04-09 15:15:20.039 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33

2016-04-09 15:15:20.142 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33
2016-04-09 15:15:20.244 [pool-3-thread-1] DEBUG o.s.i.k.c.DefaultConnection fetch - Reading from Partition[topic='test.topic', id=1]@33


感想

コンシューマーのインスタンスの増減によって、instanceCount、instanceIndexの設定を各インスタンスで再設定しなおさないといけないのは結構手間。

Spring Cloud DataFlowに寄せれば自動設定してくれるのだろうけど、それを導入するコストを考えると運用の手間が多少合っても目をつぶるべきか。