2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RabbitMQでPublisherとConsumer(Scala編)

Posted at

始めに

前回はGolangでPublisherとConsumerを書いてみました。
今回はScalaで書いてみます。
使用するライブラリはStreamで扱えるのがあったりしますが、基本を押さえたいので公式のJavaクライアントであるamqp-clientを使ってみることにします。

構成

Scala: 2.12
sbt: 1.1
amqp-client: 5.2

成果物

sbt

まずはsbtの定義を載せておきます。
PublisherとConsumerのマルチプロジェクトとしています。
JSONの変換にspray-json、設定の読み込みにlightbend/config、警告がうるさいのでlogbackを追加しています。

build.sbt
def commonSettings(_name: String) = Seq(
  scalaVersion := "2.12.6",
  scalacOptions ++= Seq(
    "-deprecation",
    "-feature",
    "-unchecked",
    "-language:_",
    "-Xlint",
    "-Xfatal-warnings",
    "-Ywarn-dead-code",
    "-Ywarn-numeric-widen",
    "-Ywarn-unused",
    "-Ywarn-unused-import",
    "-Ywarn-value-discard"
  ),
  version := "0.1-SNAPSHOT",
  name := _name
)

lazy val common = (project in file("common"))
  .settings(commonSettings("scala-rabbitmq-example-common"))
  .settings(
    libraryDependencies ++= Seq(
      "io.spray" %% "spray-json" % "1.3.4"
    )
  )

lazy val publisher = (project in file("publisher"))
  .settings(commonSettings("scala-rabbitmq-example-publisher"))
  .settings(
    libraryDependencies ++= Seq(
      "com.rabbitmq" % "amqp-client" % "5.2.0",
      "io.spray" %% "spray-json" % "1.3.4",
      "com.typesafe" % "config" % "1.3.2",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
  )
  .dependsOn(common)

lazy val consumer = (project in file("consumer"))
  .settings(commonSettings("scala-rabbitmq-example-consumer"))
  .settings(
    libraryDependencies ++= Seq(
      "com.rabbitmq" % "amqp-client" % "5.2.0",
      "io.spray" %% "spray-json" % "1.3.4",
      "com.typesafe" % "config" % "1.3.2",
      "ch.qos.logback" % "logback-classic" % "1.2.3"
    )
  )
  .dependsOn(common)

モデル

共通のメッセージモデルを定義します。

com.example.common.model.scala
package com.example.common

import spray.json._

object Model {

  final case class Protocol(message: String, timestamp: Long)

}

// Model <--> Json
trait ModelJsonProtocol extends DefaultJsonProtocol {

  import Model._

  implicit val protocolFormat: RootJsonFormat[Protocol] = jsonFormat2(Protocol)

}

Publisher

Publisherを書きます。
まずは読み込ませるapplication.confから。内容は見たまんまRabbitMQのホストとポートの指定です。

publisher/src/main/resources/application.conf
publisher {
  rabbitMQHost: "localhost"
  rabbitMQHost: ${?RABBITMQ_HOST}
  rabbitMQPort: 5672
  rabbitMQPort: ${?RABBITMQ_PORT}
}

続いてApp。

publisher/src/main/scala/com/example/publisher/Main.scala
package com.example.publisher

import java.time.{LocalDate, ZoneId}

import scala.util.{Failure, Success, Try}

import com.example.common.Model.Protocol
import com.example.common.ModelJsonProtocol
import com.rabbitmq.client.ConnectionFactory
import com.typesafe.config.ConfigFactory
import spray.json._

object Main extends App with ModelJsonProtocol {
  // コンフィグロード
  val config = ConfigFactory.load()
  config.checkValid(ConfigFactory.defaultReference(), "publisher")

  val rabbitMQHost = config.getString("publisher.rabbitMQHost")
  val rabbitMQPort = config.getInt("publisher.rabbitMQPort")

  println("publisher start")

  // ファクトリを使うらしい・・・
  val factory = new ConnectionFactory()
  factory.setHost(rabbitMQHost)
  factory.setPort(rabbitMQPort)

  // コネクションを繋いで・・・
  val connection = factory.newConnection()

  // チャンネル開いて・・・
  val channel = connection.createChannel()

  // Exchangeを作って・・・
  channel.exchangeDeclare("test", "fanout", false, true, null)

  // とりあえず3回・・・
  Seq.range(1, 4).foreach { i =>
    // モデル作って・・・
    val p =
      Protocol(s"Hello. No$i", LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toEpochSecond)

    // Publish!!
    Try(channel.basicPublish("test", "", null, p.toJson.compactPrint.getBytes)) match {
      case Success(_) =>
        println(s"[INFO] published. msg: $p")
      case Failure(ex) =>
        println(s"[ERROR] ${ex.getMessage}")
    }
  }

  // チャンネルを閉じて・・・
  channel.close()
  
  // コネクションも閉じて・・・
  connection.close()

  println("publisher stop")
}

・・・Javaで書いてるのと何も変わらん気がする。ファクトリ~チャンネルまでの定義辺りをSingletonObjectにでも押し込んでしまえばもうちょっとScalaっぽくなるんだろうか?

Consumer

Consumerを書きます。
こちらもapplication.confを置きますが、Publisherとほぼ同じなので省略して、Appのみで。

consumer/src/main/scala/com/example/consumer/Main.scala
package com.example.consumer

import scala.io.StdIn
import scala.util.{Failure, Success, Try}

import com.example.common.Model.Protocol
import com.example.common.ModelJsonProtocol
import com.rabbitmq.client.{AMQP, ConnectionFactory, DefaultConsumer, Envelope}
import com.typesafe.config.ConfigFactory
import spray.json._

object Main extends App with ModelJsonProtocol {
  // コンフィグロード
  val config = ConfigFactory.load()
  config.checkValid(ConfigFactory.defaultReference(), "consumer")

  val rabbitMQHost = config.getString("consumer.rabbitMQHost")
  val rabbitMQPort = config.getInt("consumer.rabbitMQPort")

  println("consumer start. press enter to stop")

  // ファクトリを使うらしい・・・
  val factory = new ConnectionFactory()
  factory.setHost(rabbitMQHost)
  factory.setPort(rabbitMQPort)

  // コネクションを繋いで・・・
  val connection = factory.newConnection()

  // チャンネル開いて・・・
  val channel = connection.createChannel()

  // Exchangeを作って・・・
  channel.exchangeDeclare("test", "fanout", false, true, null)

  // Queueを作って・・・
  val queName = channel.queueDeclare().getQueue

  // QueueとExchangeをBindして・・・
  channel.queueBind(queName, "test", "")

  // Consumerを定義して・・・
  val consumer = new DefaultConsumer(channel) {
    override def handleDelivery(consumerTag: String,
                                envelope: Envelope,
                                properties: AMQP.BasicProperties,
                                body: Array[Byte]): Unit = {
      val str = new String(body, "UTF-8")
      Try(str.parseJson.convertTo[Protocol]) match {
        case Success(p) =>
          println(s"[INFO] consumed. tag: ${envelope.getDeliveryTag}, msg: $p")
        case Failure(ex) =>
          println(s"[ERROR] ${ex.getMessage}")
      }
    }
  }

  // チャンネルにConsumerを設定して・・・
  channel.basicConsume(queName, true, consumer)

  // 待機して・・・
  StdIn.readLine()

  // チャンネルを閉じて・・・
  channel.close()

  // コネクションを閉じて・・・
  connection.close()

  println("consumer stop")
}

やっぱりJavaじゃん。
channel.exchangeDeclare(...)は前回と同様でExchangeが既に有る場合は、フラグのチェックのみ行うようです(PublisherとConsumerのフラグが違うと例外発生)。
最初new DefaultConsumer(channel)でもう繋がってると思って、channel.basicConsume(queName, true, consumer)入れてなかったら全く動きませんでした(戒め)。

実行

前回同様にそれぞれ別のコンソールから実行してみますと言いたいところですが、ぶっちゃけIntelliJでsbt shellを複数立ち上げる方法が分かんないんでこんな感じに出るよってことで・・・。

publisher
publisher start
[INFO] published. msg: Protocol(Hello. No1,1524841200)
[INFO] published. msg: Protocol(Hello. No2,1524841200)
[INFO] published. msg: Protocol(Hello. No3,1524841200)
publisher stop
consumer
consumer start. press enter to stop
[INFO] consumed. tag: 1, msg: Protocol(Hello. No1,1524841200)
[INFO] consumed. tag: 2, msg: Protocol(Hello. No2,1524841200)
[INFO] consumed. tag: 3, msg: Protocol(Hello. No3,1524841200)

consumer stop

終わりに

Scala編と言いつつ、ほとんどJavaじゃねーかって感じでした(なげやり)。
次はC#で書きます。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?