始めに
Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。
前回はGolangで試してみました。
今回はScala編です。Scalaの場合、Javaクライアントを使うことになりそうですが、AkkaにAkka-Stream-KafkaというAkka-Stream用の便利なラッパーが作られているのでそちらを使ってみます。
*Kafka自体やAkka-Stream自体の解説はしません。
構成##
IntelliJ: 2018
Scala: 2.12
sbt: 1.1
Akka: 2.5
Akka-Stream: 2.5
Akka-Stream-Kafka: 0.2
Spray-Json: 1.3
成果物##
sbt
依存周りを解決しておきます。Akka-Stream-Kafka関連以外にメッセージのシリアライズ・デシリアライズにspray-jsonを利用します。
name := "scala-kafka-example"
version := "0.1"
scalaVersion := "2.12.5"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.12",
"com.typesafe.akka" %% "akka-stream" % "2.5.12",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.20",
"io.spray" %% "spray-json" % "1.3.3"
)
mainClass in Compile := Some("com.example.Main")
メッセージオブジェクト
最初に送受信するメッセージを定義し、シリアライズ・デシリアライズできるようにしておきます。
package com.example
import spray.json.{DefaultJsonProtocol, RootJsonFormat}
object Message {
// 送信メッセージ
final case class SendMessage(message: String, timestamp: Long)
// 受信メッセージ
final case class ConsumedMessage(message: String, timestamp: Long)
}
// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {
import Message._
implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)
}
Producer
プロデューサーから書いてみます。
package com.example
// import省略
object Main extends App with MessageJsonProtocol {
import Message._
implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")
// プロデューサー設定
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
// プロデューサーstream
val (pKillSwitch, pDone) = Source
.tick(1.seconds, 10.seconds, None)
.viaMat(KillSwitches.single)(Keep.right)
.map { _ =>
val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
None)
}
.via(Producer.flow(producerSettings))
.map { result =>
println(s"success send. message: ${result.message.record.value()}")
result
}
.toMat(Sink.ignore)(Keep.both)
.run()
pDone.onComplete {
case Success(_) =>
println("done producer.")
case Failure(ex) =>
println(s"fail send. reason: ${ex.getMessage}")
}
println("start")
StdIn.readLine()
pKillSwitch.shutdown()
println("end")
}
Kafkaのサーバーアドレスは「application.conf」から引っ張っています。
10秒ごとにstreamがメッセージを生成して送信します。
この辺りを参考にstreamのKillSwitchを定義しています。例外が発生するとonCompleteに落ちてくるはず・・・。
Consumer
続いてコンシューマーを書きます。
package com.example
// import省略
object Main extends App with MessageJsonProtocol {
import Message._
implicit val system: ActorSystem = ActorSystem("kafka-example")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContext = system.dispatcher
val conf = ConfigFactory.load()
val bootstrapServers = conf.getString("kafka.bootstrapServers")
// コンシューマー設定
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId("test")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// コンシューマーstream
val (cKillSwitch, cDone) = Consumer
.committableSource(consumerSettings, Subscriptions.topics("test.B"))
.viaMat(KillSwitches.single)(Keep.right)
.map { consumed =>
val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
println(s"consumed. message: ${msg.message}")
consumed
}
.mapAsync(1) { msg =>
msg.committableOffset.commitScaladsl()
}
.toMat(Sink.ignore)(Keep.both)
.run()
cDone.onComplete {
case Success(_) =>
println("done consumer.")
case Failure(ex) =>
println(s"fail consume. reason: ${ex.getMessage}")
}
println("start")
StdIn.readLine()
cKillSwitch.shutdown()
println("end")
}
概ねプロデューサーと同じなので特別言うことなし。
streamが停止しない限りは勝手にKafkaに接続して指定topicを受信してくれます。
実行
プロデューサーとコンシューマーを組み合わせて(Main.scala)ビルドして実行すると、下記のように出力されるはずです(Actor系の警告を削除しています)。
start
success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end
終わりに
前回同様に閉じた世界でやってますが、プロデューサー側のstreamに口を持たせたい場合は「Source.single」とか「Source.queue」を利用したらいいのかなぁって感じです(参考)。