Spring Boot WebFluxとKotlinの組み合わせでSpring-Cloud-Streamを使ってKafkaにメッセージを書き込む


概要

Spring BootのWebFluxでSpring-Cloud-Streamを使ってKafkaにメッセージを書き込む方法


環境


  • 開発環境: Mac

  • IDE: IntelliJ

  • 言語: Kotlin


事前準備


MacにKafkaをインストールする

以下の非常にわかりやすい手順を参考にMacにKafkaをインストールします

https://qiita.com/gosshys/items/eded8f9dcb09811060ad


手順


プロジェクト作成

IntelliJを使って、SPRING INITIALIZRからSpring Bootのプロジェクトを作成します。


File -> New -> Project...

image.png


Project Metadata


  • Language: Kotlin

image.png


Dependencies


  • Spring Boot: 2.0.3

  • Web -> Reactive Web

  • Cloud Messaging -> Reactive Cloud Stream

image.png


Project name, location

name, locationを適当にいれてFinish

image.png

※プロジェクト作成中はしばらく待ちます

作成が完了したらこんな感じになります。

image.png


pomの確認と修正


SPRING INITIALIZRで作成されたプロジェクトのpomの設定内容を確認

spring-cloud.versionには、Finchley.RELEASEが指定されているはず

  <properties>

...
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
</properties>

<dependencies>spring-boot-starter-webfluxspring-cloud-stream-reactiveの指定があるはず

  <dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
...
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
...
</dependencies>


pomにkafkaのコネクターの設定を追加

<dependencies>spring-cloud-starter-stream-kafkaを追加する。

※この設定がないとKafkaに接続しないので注意

    <dependency>

<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<repositories>の要素を追加して、spring-cloud-starter-stream-kafkaを読み込めるようにする

  <repositories>

<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>


コードの追加


ファイル構成

Routes.kt, Handler.kt, Producer.ktを追加

21:03:59 ~/IdeaProjects/spring-cloud-stream-kafka-demo/src/main/kotlin/com/example/springcloudstreamkafkademo 

$ tree
.
├── Handler.kt
├── Producer.kt
├── Routes.kt
└── SpringCloudStreamKafkaDemoApplication.kt


Routes.kt

/api/putTextのパスで、JSONのリクエストを受け付け、ハンドラー(Handler)を呼び出します


Routes

package com.example.springcloudstreamkafkademo

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType

@Configuration
class Routes(
private val handler: Handler
) {

@Bean
fun router() = org.springframework.web.reactive.function.server.router {
"/api".nest {
accept(MediaType.APPLICATION_JSON).nest {
POST("/putText", handler::output)
}
}
}
}



Handler.kt

リクエストのJSONをオブジェクト(RequestMessage)に変換し、Kafkaプロデューサーに渡します。


Handler

package com.example.springcloudstreamkafkademo

import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import reactor.core.publisher.Mono

data class RequestMessage(
val id: Long,
val text: String
)

@Component
class Handler(private val producer: Producer) {

fun output(req: ServerRequest) =
req.bodyToMono(RequestMessage::class.java)
.flatMap { ServerResponse.ok().body(Mono.just(producer.output(it))) }

}



Producer.kt

ここが肝(のよう)です

説明は、Spring Cloud Stream Reference Guideにあります。

が、さまざまな試行錯誤の後、うまくできた方法なので他にもいいやり方があるかもしれません。

output関数で、fluxSinkにメッセージを受け渡していますが、fluxSinkはFlux生成時に取得したemitterで、fluxをsubscribeしているemit関数が、fluxSinkに渡したメッセージを順繰りに処理してくれます。


Producer

package com.example.springcloudstreamkafkademo

import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Source
import org.springframework.cloud.stream.reactive.StreamEmitter
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink

@Component
@EnableBinding(Source::class)
class Producer {

private var fluxSink: FluxSink<RequestMessage>? = null

private var flux: Flux<RequestMessage> = Flux.create<RequestMessage>(
{ emitter -> this.fluxSink = emitter },
FluxSink.OverflowStrategy.BUFFER)

fun output(message: RequestMessage): RequestMessage {
this.fluxSink?.next(message)
return message
}

@StreamEmitter
@Output(Source.OUTPUT)
fun emit(): Flux<RequestMessage> {
return flux
}

}



稼働確認

SpringCloudStreamKafkaDemoApplicationからSpring Bootを起動します。

以下のような、Kafkaの接続情報とともに、outputというTopicにつながっていそうなログでます。

...

2018-06-23 21:33:28.809 INFO 26767 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: output
2018-06-23 21:33:28.812 INFO 26767 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:28.898 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.209 INFO 26767 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-06-23 21:33:29.235 INFO 26767 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.242 INFO 26767 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2018-06-23 21:33:29.254 INFO 26767 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.output' has 1 subscriber(s).
2018-06-23 21:33:29.255 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-06-23 21:33:29.273 INFO 26767 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started _org.springframework.integration.errorLogger
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147482647
2018-06-23 21:33:29.274 INFO 26767 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483547
2018-06-23 21:33:29.360 INFO 26767 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-06-23 21:33:29.360 INFO 26767 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
2018-06-23 21:33:29.363 INFO 26767 --- [ main] .SpringCloudStreamKafkaDemoApplicationKt : Started SpringCloudStreamKafkaDemoApplicationKt in 6.443 seconds
...


APIを叩く

curlでAPIを叩いてみます。

$ curl -X POST -H 'Content-Type: application/json' localhost:8080/api/putText -d '{"id":100, "text":"aa"}'

事前にトピックをコンシュームしておくことで、メッセージが流れてくることが確認できます。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic output

{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}