始めに
前回はGolangでPublisherとConsumerを書いてみました。
今回はScalaで書いてみます。
使用するライブラリはStreamで扱えるのがあったりしますが、基本を押さえたいので公式のJavaクライアントであるamqp-clientを使ってみることにします。
構成
Scala: 2.12
sbt: 1.1
amqp-client: 5.2
成果物
sbt
まずはsbtの定義を載せておきます。
PublisherとConsumerのマルチプロジェクトとしています。
JSONの変換にspray-json、設定の読み込みにlightbend/config、警告がうるさいのでlogbackを追加しています。
def commonSettings(_name: String) = Seq(
scalaVersion := "2.12.6",
scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
"-language:_",
"-Xlint",
"-Xfatal-warnings",
"-Ywarn-dead-code",
"-Ywarn-numeric-widen",
"-Ywarn-unused",
"-Ywarn-unused-import",
"-Ywarn-value-discard"
),
version := "0.1-SNAPSHOT",
name := _name
)
lazy val common = (project in file("common"))
.settings(commonSettings("scala-rabbitmq-example-common"))
.settings(
libraryDependencies ++= Seq(
"io.spray" %% "spray-json" % "1.3.4"
)
)
lazy val publisher = (project in file("publisher"))
.settings(commonSettings("scala-rabbitmq-example-publisher"))
.settings(
libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "5.2.0",
"io.spray" %% "spray-json" % "1.3.4",
"com.typesafe" % "config" % "1.3.2",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
)
.dependsOn(common)
lazy val consumer = (project in file("consumer"))
.settings(commonSettings("scala-rabbitmq-example-consumer"))
.settings(
libraryDependencies ++= Seq(
"com.rabbitmq" % "amqp-client" % "5.2.0",
"io.spray" %% "spray-json" % "1.3.4",
"com.typesafe" % "config" % "1.3.2",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
)
.dependsOn(common)
モデル
共通のメッセージモデルを定義します。
package com.example.common
import spray.json._
object Model {
final case class Protocol(message: String, timestamp: Long)
}
// Model <--> Json
trait ModelJsonProtocol extends DefaultJsonProtocol {
import Model._
implicit val protocolFormat: RootJsonFormat[Protocol] = jsonFormat2(Protocol)
}
Publisher
Publisherを書きます。
まずは読み込ませるapplication.confから。内容は見たまんまRabbitMQのホストとポートの指定です。
publisher {
rabbitMQHost: "localhost"
rabbitMQHost: ${?RABBITMQ_HOST}
rabbitMQPort: 5672
rabbitMQPort: ${?RABBITMQ_PORT}
}
続いてApp。
package com.example.publisher
import java.time.{LocalDate, ZoneId}
import scala.util.{Failure, Success, Try}
import com.example.common.Model.Protocol
import com.example.common.ModelJsonProtocol
import com.rabbitmq.client.ConnectionFactory
import com.typesafe.config.ConfigFactory
import spray.json._
object Main extends App with ModelJsonProtocol {
// コンフィグロード
val config = ConfigFactory.load()
config.checkValid(ConfigFactory.defaultReference(), "publisher")
val rabbitMQHost = config.getString("publisher.rabbitMQHost")
val rabbitMQPort = config.getInt("publisher.rabbitMQPort")
println("publisher start")
// ファクトリを使うらしい・・・
val factory = new ConnectionFactory()
factory.setHost(rabbitMQHost)
factory.setPort(rabbitMQPort)
// コネクションを繋いで・・・
val connection = factory.newConnection()
// チャンネル開いて・・・
val channel = connection.createChannel()
// Exchangeを作って・・・
channel.exchangeDeclare("test", "fanout", false, true, null)
// とりあえず3回・・・
Seq.range(1, 4).foreach { i =>
// モデル作って・・・
val p =
Protocol(s"Hello. No$i", LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond)
// Publish!!
Try(channel.basicPublish("test", "", null, p.toJson.compactPrint.getBytes)) match {
case Success(_) =>
println(s"[INFO] published. msg: $p")
case Failure(ex) =>
println(s"[ERROR] ${ex.getMessage}")
}
}
// チャンネルを閉じて・・・
channel.close()
// コネクションも閉じて・・・
connection.close()
println("publisher stop")
}
・・・Javaで書いてるのと何も変わらん気がする。ファクトリ~チャンネルまでの定義辺りをSingletonObjectにでも押し込んでしまえばもうちょっとScalaっぽくなるんだろうか?
Consumer
Consumerを書きます。
こちらもapplication.confを置きますが、Publisherとほぼ同じなので省略して、Appのみで。
package com.example.consumer
import scala.io.StdIn
import scala.util.{Failure, Success, Try}
import com.example.common.Model.Protocol
import com.example.common.ModelJsonProtocol
import com.rabbitmq.client.{AMQP, ConnectionFactory, DefaultConsumer, Envelope}
import com.typesafe.config.ConfigFactory
import spray.json._
object Main extends App with ModelJsonProtocol {
// コンフィグロード
val config = ConfigFactory.load()
config.checkValid(ConfigFactory.defaultReference(), "consumer")
val rabbitMQHost = config.getString("consumer.rabbitMQHost")
val rabbitMQPort = config.getInt("consumer.rabbitMQPort")
println("consumer start. press enter to stop")
// ファクトリを使うらしい・・・
val factory = new ConnectionFactory()
factory.setHost(rabbitMQHost)
factory.setPort(rabbitMQPort)
// コネクションを繋いで・・・
val connection = factory.newConnection()
// チャンネル開いて・・・
val channel = connection.createChannel()
// Exchangeを作って・・・
channel.exchangeDeclare("test", "fanout", false, true, null)
// Queueを作って・・・
val queName = channel.queueDeclare().getQueue
// QueueとExchangeをBindして・・・
channel.queueBind(queName, "test", "")
// Consumerを定義して・・・
val consumer = new DefaultConsumer(channel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: Array[Byte]): Unit = {
val str = new String(body, "UTF-8")
Try(str.parseJson.convertTo[Protocol]) match {
case Success(p) =>
println(s"[INFO] consumed. tag: ${envelope.getDeliveryTag}, msg: $p")
case Failure(ex) =>
println(s"[ERROR] ${ex.getMessage}")
}
}
}
// チャンネルにConsumerを設定して・・・
channel.basicConsume(queName, true, consumer)
// 待機して・・・
StdIn.readLine()
// チャンネルを閉じて・・・
channel.close()
// コネクションを閉じて・・・
connection.close()
println("consumer stop")
}
やっぱりJavaじゃん。
channel.exchangeDeclare(...)
は前回と同様でExchangeが既に有る場合は、フラグのチェックのみ行うようです(PublisherとConsumerのフラグが違うと例外発生)。
最初new DefaultConsumer(channel)
でもう繋がってると思って、channel.basicConsume(queName, true, consumer)
入れてなかったら全く動きませんでした(戒め)。
実行
前回同様にそれぞれ別のコンソールから実行してみますと言いたいところですが、ぶっちゃけIntelliJでsbt shellを複数立ち上げる方法が分かんないんでこんな感じに出るよってことで・・・。
publisher start
[INFO] published. msg: Protocol(Hello. No1,1524841200)
[INFO] published. msg: Protocol(Hello. No2,1524841200)
[INFO] published. msg: Protocol(Hello. No3,1524841200)
publisher stop
consumer start. press enter to stop
[INFO] consumed. tag: 1, msg: Protocol(Hello. No1,1524841200)
[INFO] consumed. tag: 2, msg: Protocol(Hello. No2,1524841200)
[INFO] consumed. tag: 3, msg: Protocol(Hello. No3,1524841200)
consumer stop
終わりに
Scala編と言いつつ、ほとんどJavaじゃねーかって感じでした(なげやり)。
次はC#で書きます。