Edited at

KafkaでProducerとConsumer(Scala編)

More than 1 year has passed since last update.


始めに

Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。

前回はGolangで試してみました。

今回はScala編です。Scalaの場合、Javaクライアントを使うことになりそうですが、AkkaにAkka-Stream-KafkaというAkka-Stream用の便利なラッパーが作られているのでそちらを使ってみます。

*Kafka自体やAkka-Stream自体の解説はしません。


構成

IntelliJ: 2018

Scala: 2.12

sbt: 1.1

Akka: 2.5

Akka-Stream: 2.5

Akka-Stream-Kafka: 0.2

Spray-Json: 1.3


成果物

https://github.com/lightstaff/scala-kafka-example


sbt

依存周りを解決しておきます。Akka-Stream-Kafka関連以外にメッセージのシリアライズ・デシリアライズにspray-jsonを利用します。


build.sbt

name := "scala-kafka-example"

version := "0.1"

scalaVersion := "2.12.5"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",

"com.typesafe.akka" %% "akka-stream" % "2.5.12",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.20",

"io.spray" %% "spray-json" % "1.3.3"
)

mainClass in Compile := Some("com.example.Main")



メッセージオブジェクト

最初に送受信するメッセージを定義し、シリアライズ・デシリアライズできるようにしておきます。


message.scala

package com.example

import spray.json.{DefaultJsonProtocol, RootJsonFormat}

object Message {

// 送信メッセージ
final case class SendMessage(message: String, timestamp: Long)

// 受信メッセージ
final case class ConsumedMessage(message: String, timestamp: Long)

}

// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {

import Message._

implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)

}



Producer

プロデューサーから書いてみます。


main.scala

package com.example

// import省略

object Main extends App with MessageJsonProtocol {

import Message._

implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher

val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")

// プロデューサー設定
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)

// プロデューサーstream
val (pKillSwitch, pDone) = Source
.tick(1.seconds, 10.seconds, None)
.viaMat(KillSwitches.single)(Keep.right)
.map { _ =>
val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
None)
}
.via(Producer.flow(producerSettings))
.map { result =>
println(s"success send. message: ${result.message.record.value()}")
result
}
.toMat(Sink.ignore)(Keep.both)
.run()

pDone.onComplete {
case Success(_) =>
println("done producer.")
case Failure(ex) =>
println(s"fail send. reason: ${ex.getMessage}")
}

println("start")

StdIn.readLine()

pKillSwitch.shutdown()

println("end")
}


Kafkaのサーバーアドレスは「application.conf」から引っ張っています。

10秒ごとにstreamがメッセージを生成して送信します。

この辺りを参考にstreamのKillSwitchを定義しています。例外が発生するとonCompleteに落ちてくるはず・・・。


Consumer

続いてコンシューマーを書きます。


main.scala

package com.example

// import省略

object Main extends App with MessageJsonProtocol {

import Message._

implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher

val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")

// コンシューマー設定
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

// コンシューマーstream
val (cKillSwitch, cDone) = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test.B"))
.viaMat(KillSwitches.single)(Keep.right)
.map { consumed =>
val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
println(s"consumed. message: ${msg.message}")
consumed
}
.mapAsync(1) { msg =>
msg.committableOffset.commitScaladsl()
}
.toMat(Sink.ignore)(Keep.both)
.run()

cDone.onComplete {
case Success(_) =>
println("done consumer.")
case Failure(ex) =>
println(s"fail consume. reason: ${ex.getMessage}")
}

println("start")

StdIn.readLine()

cKillSwitch.shutdown()

println("end")

}


概ねプロデューサーと同じなので特別言うことなし。

streamが停止しない限りは勝手にKafkaに接続して指定topicを受信してくれます。


実行

プロデューサーとコンシューマーを組み合わせて(Main.scala)ビルドして実行すると、下記のように出力されるはずです(Actor系の警告を削除しています)。

start

success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end


終わりに

前回同様に閉じた世界でやってますが、プロデューサー側のstreamに口を持たせたい場合は「Source.single」とか「Source.queue」を利用したらいいのかなぁって感じです(参考)。