0
0

More than 5 years have passed since last update.

Spring Boot WebFluxとKotlinの組み合わせでSpring-Cloud-Streamを使ってKafkaにメッセージを書き込む

Last updated at Posted at 2018-06-23

概要

Spring BootのWebFluxでSpring-Cloud-Streamを使ってKafkaにメッセージを書き込む方法

環境

  • 開発環境: Mac
  • IDE: IntelliJ
  • 言語: Kotlin

事前準備

MacにKafkaをインストールする

以下の非常にわかりやすい手順を参考にMacにKafkaをインストールします

手順

プロジェクト作成

IntelliJを使って、SPRING INITIALIZRからSpring Bootのプロジェクトを作成します。

File -> New -> Project...

image.png

Project Metadata

  • Language: Kotlin

image.png

Dependencies

  • Spring Boot: 2.0.3
  • Web -> Reactive Web
  • Cloud Messaging -> Reactive Cloud Stream

image.png

Project name, location

name, locationを適当にいれてFinish

image.png

※プロジェクト作成中はしばらく待ちます

作成が完了したらこんな感じになります。

image.png

pomの確認と修正

SPRING INITIALIZRで作成されたプロジェクトのpomの設定内容を確認

spring-cloud.versionには、Finchley.RELEASEが指定されているはず

  <properties>
    ...
    <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
  </properties>

<dependencies>spring-boot-starter-webfluxspring-cloud-stream-reactiveの指定があるはず

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    ...
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-reactive</artifactId>
    </dependency>
    ...
  </dependencies>

pomにkafkaのコネクターの設定を追加

<dependencies>spring-cloud-starter-stream-kafkaを追加する。
※この設定がないとKafkaに接続しないので注意

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

<repositories>の要素を追加して、spring-cloud-starter-stream-kafkaを読み込めるようにする

  <repositories>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

コードの追加

ファイル構成

Routes.kt, Handler.kt, Producer.ktを追加

21:03:59 ~/IdeaProjects/spring-cloud-stream-kafka-demo/src/main/kotlin/com/example/springcloudstreamkafkademo 
$ tree
.
├── Handler.kt
├── Producer.kt
├── Routes.kt
└── SpringCloudStreamKafkaDemoApplication.kt

Routes.kt

/api/putTextのパスで、JSONのリクエストを受け付け、ハンドラー(Handler)を呼び出します

Routes
package com.example.springcloudstreamkafkademo

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType

@Configuration
class Routes(
        private val handler: Handler
) {

    @Bean
    fun router() = org.springframework.web.reactive.function.server.router {
        "/api".nest {
            accept(MediaType.APPLICATION_JSON).nest {
                POST("/putText", handler::output)
            }
        }
    }
}

Handler.kt

リクエストのJSONをオブジェクト(RequestMessage)に変換し、Kafkaプロデューサーに渡します。

Handler
package com.example.springcloudstreamkafkademo

import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse
import org.springframework.web.reactive.function.server.body
import reactor.core.publisher.Mono

data class RequestMessage(
        val id: Long,
        val text: String
)

@Component
class Handler(private val producer: Producer) {

    fun output(req: ServerRequest) =
            req.bodyToMono(RequestMessage::class.java)
                    .flatMap { ServerResponse.ok().body(Mono.just(producer.output(it))) }

}

Producer.kt

ここが肝(のよう)です
説明は、Spring Cloud Stream Reference Guideにあります。
が、さまざまな試行錯誤の後、うまくできた方法なので他にもいいやり方があるかもしれません。

output関数で、fluxSinkにメッセージを受け渡していますが、fluxSinkはFlux生成時に取得したemitterで、fluxをsubscribeしているemit関数が、fluxSinkに渡したメッセージを順繰りに処理してくれます。

Producer
package com.example.springcloudstreamkafkademo

import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.messaging.Source
import org.springframework.cloud.stream.reactive.StreamEmitter
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink

@Component
@EnableBinding(Source::class)
class Producer {

    private var fluxSink: FluxSink<RequestMessage>? = null

    private var flux: Flux<RequestMessage> = Flux.create<RequestMessage>(
            { emitter -> this.fluxSink = emitter },
            FluxSink.OverflowStrategy.BUFFER)

    fun output(message: RequestMessage): RequestMessage {
        this.fluxSink?.next(message)
        return message
    }

    @StreamEmitter
    @Output(Source.OUTPUT)
    fun emit(): Flux<RequestMessage> {
        return flux
    }

}

稼働確認

SpringCloudStreamKafkaDemoApplicationからSpring Bootを起動します。
以下のような、Kafkaの接続情報とともに、outputというTopicにつながっていそうなログでます。

...
2018-06-23 21:33:28.809  INFO 26767 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: output
2018-06-23 21:33:28.812  INFO 26767 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9092]
    client.id = 
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2018-06-23 21:33:28.898  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-06-23 21:33:28.898  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.209  INFO 26767 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

2018-06-23 21:33:29.235  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.1
2018-06-23 21:33:29.235  INFO 26767 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : c0518aa65f25317e
2018-06-23 21:33:29.242  INFO 26767 --- [           main] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2018-06-23 21:33:29.254  INFO 26767 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.output' has 1 subscriber(s).
2018-06-23 21:33:29.255  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-06-23 21:33:29.273  INFO 26767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2018-06-23 21:33:29.273  INFO 26767 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147482647
2018-06-23 21:33:29.274  INFO 26767 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483547
2018-06-23 21:33:29.360  INFO 26767 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-06-23 21:33:29.360  INFO 26767 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2018-06-23 21:33:29.363  INFO 26767 --- [           main] .SpringCloudStreamKafkaDemoApplicationKt : Started SpringCloudStreamKafkaDemoApplicationKt in 6.443 seconds 
...

APIを叩く

curlでAPIを叩いてみます。

$ curl -X POST -H 'Content-Type: application/json' localhost:8080/api/putText -d '{"id":100, "text":"aa"}'

事前にトピックをコンシュームしておくことで、メッセージが流れてくることが確認できます。

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic output
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
{"id":100,"text":"aa"}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0