概要
Spring BootのWebFluxでSpring-Cloud-Streamを使ってKafkaにメッセージを書き込む方法
環境
- 開発環境: Mac
- IDE: IntelliJ
- 言語: Kotlin
事前準備
MacにKafkaをインストールする
以下の非常にわかりやすい手順を参考にMacにKafkaをインストールします
手順
プロジェクト作成
IntelliJを使って、SPRING INITIALIZRからSpring Bootのプロジェクトを作成します。
File -> New -> Project...
Project Metadata
- Language: Kotlin
Dependencies
- Spring Boot: 2.0.3
- Web -> Reactive Web
- Cloud Messaging -> Reactive Cloud Stream
Project name, location
name, locationを適当にいれてFinish
※プロジェクト作成中はしばらく待ちます
作成が完了したらこんな感じになります。
pomの確認と修正
SPRING INITIALIZRで作成されたプロジェクトのpomの設定内容を確認
spring-cloud.version
には、Finchley.RELEASE
が指定されているはず
<properties>
...
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
</properties>
<dependencies>
にspring-boot-starter-webflux
とspring-cloud-stream-reactive
の指定があるはず
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
...
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
...
</dependencies>
pomにkafkaのコネクターの設定を追加
<dependencies>
にspring-cloud-starter-stream-kafka
を追加する。
※この設定がないとKafkaに接続しないので注意
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<repositories>
の要素を追加して、spring-cloud-starter-stream-kafka
を読み込めるようにする
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
コードの追加
ファイル構成
Routes.kt, Handler.kt, Producer.ktを追加
21:03:59 ~/IdeaProjects/spring-cloud-stream-kafka-demo/src/main/kotlin/com/example/springcloudstreamkafkademo
$ tree
.
├── Handler.kt
├── Producer.kt
├── Routes.kt
└── SpringCloudStreamKafkaDemoApplication.kt
Routes.kt
/api/putText
のパスで、JSONのリクエストを受け付け、ハンドラー(Handler)を呼び出します
package com.example.springcloudstreamkafkademo
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
@Configuration
class Routes(
private val handler: Handler
) {
@Bean
fun router() = org.springframework.web.reactive.function.server.router {
"/api".nest {
accept(MediaType.APPLICATION_JSON).nest {
POST("/putText", handler::output)
}
}
}
}
Handler.kt
リクエストのJSONをオブジェクト(RequestMessage
)に変換し、Kafkaプロデューサーに渡します。
package com.example.springcloudstreamkafkademo
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import reactor.core.publisher.Mono
data class RequestMessage(
val id: Long,
val text: String
)
@Component
class Handler(private val producer: Producer) {
fun output(req: ServerRequest) =
req.bodyToMono(RequestMessage::class.java)
.flatMap { ServerResponse.ok().body(Mono.just(producer.output(it))) }
}
Producer.kt
ここが肝(のよう)です
説明は、Spring Cloud Stream Reference Guideにあります。
が、さまざまな試行錯誤の後、うまくできた方法なので他にもいいやり方があるかもしれません。
output関数で、fluxSink
にメッセージを受け渡していますが、fluxSink
はFlux生成時に取得したemitter
で、fluxをsubscribeしているemit関数が、fluxSinkに渡したメッセージを順繰りに処理してくれます。
package com.example.springcloudstreamkafkademo
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Source
import org.springframework.cloud.stream.reactive.StreamEmitter
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
@Component
@EnableBinding(Source::class)
class Producer {
private var fluxSink: FluxSink<RequestMessage>? = null
private var flux: Flux<RequestMessage> = Flux.create<RequestMessage>(
{ emitter -> this.fluxSink = emitter },
FluxSink.OverflowStrategy.BUFFER)
fun output(message: RequestMessage): RequestMessage {
this.fluxSink?.next(message)
return message
}
@StreamEmitter
@Output(Source.OUTPUT)
fun emit(): Flux<RequestMessage> {
return flux
}
}
稼働確認
SpringCloudStreamKafkaDemoApplication
からSpring Bootを起動します。
以下のような、Kafkaの接続情報とともに、output
というTopicにつながっていそうなログでます。
...
2018-06-23 21:33:28.809 INFO 26767 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: output
2018-06-23 21:33:28.812 INFO 26767 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.209 INFO 26767 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.242 INFO 26767 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2018-06-23 21:33:29.254 INFO 26767 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.output' has 1 subscriber(s).
2018-06-23 21:33:29.255 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483547
2018-06-23 21:33:29.360 INFO 26767 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-06-23 21:33:29.360 INFO 26767 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2018-06-23 21:33:29.363 INFO 26767 --- [ main] .SpringCloudStreamKafkaDemoApplicationKt : Started SpringCloudStreamKafkaDemoApplicationKt in 6.443 seconds
...
APIを叩く
curlでAPIを叩いてみます。
$ curl -X POST -H 'Content-Type: application/json' localhost:8080/api/putText -d '{"id":100, "text":"aa"}'
事前にトピックをコンシュームしておくことで、メッセージが流れてくることが確認できます。
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic output
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}