始めに

Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。

前回はGolangで試してみました。

今回はScala編です。Scalaの場合、Javaクライアントを使うことになりそうですが、AkkaにAkka-Stream-KafkaというAkka-Stream用の便利なラッパーが作られているのでそちらを使ってみます。

*Kafka自体やAkka-Stream自体の解説はしません。

構成

IntelliJ: 2018
Scala: 2.12
sbt: 1.1
Akka: 2.5
Akka-Stream: 2.5
Akka-Stream-Kafka: 0.2
Spray-Json: 1.3

成果物

https://github.com/lightstaff/scala-kafka-example

sbt

依存周りを解決しておきます。Akka-Stream-Kafka関連以外にメッセージのシリアライズ・デシリアライズにspray-jsonを利用します。

build.sbt
name := "scala-kafka-example"

version := "0.1"

scalaVersion := "2.12.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.12",

  "com.typesafe.akka" %% "akka-stream" % "2.5.12",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.20",

  "io.spray" %%  "spray-json" % "1.3.3"
)

mainClass in Compile := Some("com.example.Main")

メッセージオブジェクト

最初に送受信メッセージするメッセージを定義し、シリアライズ・デシリアライズできるようにしておきます。

message.scala
package com.example

import spray.json.{DefaultJsonProtocol, RootJsonFormat}

object Message {

  // 送信メッセージ
  final case class SendMessage(message: String, timestamp: Long)

  // 受信メッセージ
  final case class ConsumedMessage(message: String, timestamp: Long)

}

// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {

  import Message._

  implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
  implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)

}

Producer

プロデューサーから書いてみます。

main.scala
package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")

  // プロデューサー設定
  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  // プロデューサーstream
  val (pKillSwitch, pDone) = Source
    .tick(1.seconds, 10.seconds, None)
    .viaMat(KillSwitches.single)(Keep.right)
    .map { _ =>
      val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
      ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
                              None)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      println(s"success send. message: ${result.message.record.value()}")
      result
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  pDone.onComplete {
    case Success(_) =>
      println("done producer.")
    case Failure(ex) =>
      println(s"fail send. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  pKillSwitch.shutdown()

  println("end")
}

Kafkaのサーバーアドレスは「application.conf」から引っ張っています。
10秒ごとにstreamがメッセージを生成して送信します。
この辺りを参考にstreamのKillSwitchを定義しています。例外が発生するとonCompleteに落ちてくるはず・・・。

Consumer

続いてコンシューマーを書きます。

main.scala
package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")

  // コンシューマー設定
  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("test")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  // コンシューマーstream
  val (cKillSwitch, cDone) = Consumer
    .committableSource(consumerSettings, Subscriptions.topics("test.B"))
    .viaMat(KillSwitches.single)(Keep.right)
    .map { consumed =>
      val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
      println(s"consumed. message: ${msg.message}")
      consumed
    }
    .mapAsync(1) { msg =>
      msg.committableOffset.commitScaladsl()
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  cDone.onComplete {
    case Success(_) =>
      println("done consumer.")
    case Failure(ex) =>
      println(s"fail consume. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  cKillSwitch.shutdown()

  println("end")

}

概ねプロデューサーと同じなので特別言うことなし。
streamが停止しない限りは勝手にKafkaに接続して指定topicを受信してくれます。

実行

プロデューサーとコンシューマーを組み合わせて(Main.scala)ビルドして実行すると、下記のように出力されるはずです(Actor系の警告を削除しています)。

start
success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end

終わりに

前回同様に閉じた世界でやってますが、プロデューサー側のstreamに口を持たせたい場合は「Source.single」とか「Source.queue」を利用したらいいのかなぁって感じです(参考)。

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.