5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

KafkaでProducerとConsumer(Scala編)

Last updated at Posted at 2018-04-17

始めに

Kafkaを使うことになりそうなのでいくつかの言語ごとにProducerとConsumerを記述してみる。

前回はGolangで試してみました。

今回はScala編です。Scalaの場合、Javaクライアントを使うことになりそうですが、AkkaにAkka-Stream-KafkaというAkka-Stream用の便利なラッパーが作られているのでそちらを使ってみます。

*Kafka自体やAkka-Stream自体の解説はしません。

構成##

IntelliJ: 2018
Scala: 2.12
sbt: 1.1
Akka: 2.5
Akka-Stream: 2.5
Akka-Stream-Kafka: 0.2
Spray-Json: 1.3

成果物##

sbt

依存周りを解決しておきます。Akka-Stream-Kafka関連以外にメッセージのシリアライズ・デシリアライズにspray-jsonを利用します。

build.sbt
name := "scala-kafka-example"

version := "0.1"

scalaVersion := "2.12.5"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.5.12",

  "com.typesafe.akka" %% "akka-stream" % "2.5.12",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.20",

  "io.spray" %%  "spray-json" % "1.3.3"
)

mainClass in Compile := Some("com.example.Main")

メッセージオブジェクト

最初に送受信するメッセージを定義し、シリアライズ・デシリアライズできるようにしておきます。

message.scala
package com.example

import spray.json.{DefaultJsonProtocol, RootJsonFormat}

object Message {

  // 送信メッセージ
  final case class SendMessage(message: String, timestamp: Long)

  // 受信メッセージ
  final case class ConsumedMessage(message: String, timestamp: Long)

}

// メッセージ <-> json
trait MessageJsonProtocol extends DefaultJsonProtocol {

  import Message._

  implicit val sendMessageFormat: RootJsonFormat[SendMessage] = jsonFormat2(SendMessage)
  implicit val consumedMessageFormat: RootJsonFormat[ConsumedMessage] = jsonFormat2(ConsumedMessage)

}

Producer

プロデューサーから書いてみます。

main.scala
package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")
  
  // プロデューサー設定
  val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
    .withBootstrapServers(bootstrapServers)

  // プロデューサーstream
  val (pKillSwitch, pDone) = Source
    .tick(1.seconds, 10.seconds, None)
    .viaMat(KillSwitches.single)(Keep.right)
    .map { _ =>
      val msg = SendMessage("Hello", ZonedDateTime.now().toEpochSecond)
      ProducerMessage.Message(new ProducerRecord[String, String]("test.B", msg.toJson.compactPrint),
                              None)
    }
    .via(Producer.flow(producerSettings))
    .map { result =>
      println(s"success send. message: ${result.message.record.value()}")
      result
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  pDone.onComplete {
    case Success(_) =>
      println("done producer.")
    case Failure(ex) =>
      println(s"fail send. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  pKillSwitch.shutdown()

  println("end")
}

Kafkaのサーバーアドレスは「application.conf」から引っ張っています。
10秒ごとにstreamがメッセージを生成して送信します。
この辺りを参考にstreamのKillSwitchを定義しています。例外が発生するとonCompleteに落ちてくるはず・・・。

Consumer

続いてコンシューマーを書きます。

main.scala
package com.example

// import省略

object Main extends App with MessageJsonProtocol {

  import Message._

  implicit val system: ActorSystem = ActorSystem("kafka-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executor: ExecutionContext = system.dispatcher

  val conf = ConfigFactory.load()
  val bootstrapServers = conf.getString("kafka.bootstrapServers")
  
  // コンシューマー設定
  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers(bootstrapServers)
    .withGroupId("test")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  // コンシューマーstream
  val (cKillSwitch, cDone) = Consumer
    .committableSource(consumerSettings, Subscriptions.topics("test.B"))
    .viaMat(KillSwitches.single)(Keep.right)
    .map { consumed =>
      val msg = consumed.record.value().parseJson.convertTo[ConsumedMessage]
      println(s"consumed. message: ${msg.message}")
      consumed
    }
    .mapAsync(1) { msg =>
      msg.committableOffset.commitScaladsl()
    }
    .toMat(Sink.ignore)(Keep.both)
    .run()

  cDone.onComplete {
    case Success(_) =>
      println("done consumer.")
    case Failure(ex) =>
      println(s"fail consume. reason: ${ex.getMessage}")
  }

  println("start")

  StdIn.readLine()

  cKillSwitch.shutdown()

  println("end")

}

概ねプロデューサーと同じなので特別言うことなし。
streamが停止しない限りは勝手にKafkaに接続して指定topicを受信してくれます。

実行

プロデューサーとコンシューマーを組み合わせて(Main.scala)ビルドして実行すると、下記のように出力されるはずです(Actor系の警告を削除しています)。

start
success send. message: {"message":"Hello","timestamp":1523958246}
success consume. message: Hello, timestamp: 1523958246
success send. message: {"message":"Hello","timestamp":1523958256}
success consume. message: Hello, timestamp: 1523958256
success send. message: {"message":"Hello","timestamp":1523958266}
success consume. message: Hello, timestamp: 1523958266
end

終わりに

前回同様に閉じた世界でやってますが、プロデューサー側のstreamに口を持たせたい場合は「Source.single」とか「Source.queue」を利用したらいいのかなぁって感じです(参考)。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?