LoginSignup
3
3

More than 5 years have passed since last update.

Reactive Rabbitを試す

Last updated at Posted at 2015-11-03

目的

AkkaStreams用のamqpアダプタである、Reactive Rabbitを使ってみます。

用語(あやしい)

Akka Streams: Reactive StreamsのAkka実装。Reactive Streamsは、BackPressure制御を行う非同期ストリーム処理のことらしい。
BackPressure: Subscriberが自分が処理できる量をPublisherに通知しながら、処理を溢れさせずに、リソースを余らせずに、効率的にストリームを処理する仕組みらしい。
Source(publisher): 出力を持つステージ。ダウンストリームのステージがデータを受け取れる状態になれば、データを放出する。
Sink(subscriber): 入力を持つステージ。データを要求し受け入れる。負荷によってはアップストリームの処理をスローダウンさせる事がある。
Flow: データの変換を行い、SourceとSinkをそれぞれ結合する?
ステージ(Processing Stage):フローやフローグラフを構築する、全てのビルディングブロックのための総称?

実験

環境

ライブラリなど バージョン
scala 2.11.7
akka 2.4.0
akka-stream 1.0
reactive-rabbit 1.0.2
rabbitmq 3.5.6

作成したサンプルプログラムの説明

RabbitMQに下記の2つのキューを作成する

  • events
  • results

以下の2つのフローを用意します。

  • PreparingFlow: 10個のメッセージ「Message[メッセージ番号]」 を作り、RabbitMQのeventsキューに入れる
  • ProcessingFlow: eventsキューからメッセージを取り出し、メッセージの内容を「Message[メッセージ番号] is processed by [自分の番号]」に書き換えて、resultsキューに入れる。ProcessingFlowは、3つ作る。(メッセージの取り合いをさせる)

サンプルプログラム

sbtでビルドするためのBuild.scala

Build.scala
import sbt._
import sbt.Keys._

object Build extends sbt.Build {

  lazy val root = Project(
    id = "akka-test",
    base = file("."),
    settings = Defaults.defaultSettings ++ packSettings ++
      Seq(
        scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"),
        scalaVersion := "2.11.7",
        libraryDependencies ++= Seq(
          "com.typesafe.akka" %% "akka-actor" % "2.4.0",
          "com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0",
          "io.scalac" %% "reactive-rabbit" % "1.0.2"
        )
      )
  )
}

Reactive Rabbitを使用したコード本体(もうちょい書き直すべきか・・・)

HelloReactiveRabbit.scala
import akka.actor.ActorSystem
import akka.util.ByteString
import io.scalac.amqp.Connection
import io.scalac.amqp.Message
import akka.stream.scaladsl.{Flow, Source, Sink}
import akka.stream.ActorMaterializer

object HelloReactiveRabbit extends App {

  // RabbitMQへのコネクションを作る。デフォルトだとlocalのRabbitMQに接続されるっぽい。
  val connection = Connection()

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()

  // ProcessingFlowを3つつくる。
  val stages = (0 to 2).map { i =>
    // eventsからの入力アダプタを作る
    val in = connection.consume(queue = "events")
    // resultへの出力アダプタを作る
    val out = connection.publishDirectly("results")
    val flow: Flow[Message, Message, Unit] = {
      Flow[Message].map { m => {
        //メッセージの内容を「Message[メッセージ番号] is processed by [自分のフロー番号]」に書き換える
        val body = m.body.decodeString("utf-8")
        val str = body + " is processed by " + i
        println(str)
        Message(body = ByteString(str))
      };
      }
    }

    Source(in). // eventsキューからの取出し
      map { m => m.message }. // flowの中でやってしまっても良かったかも
      via(flow). // メッセージ書き換え
      to(Sink(out)). // resultsキューへの格納
      run()
  }

  val events = connection.publishDirectly(queue = "events")
  Source(1 to 10). // Message[1-10]を作る。 
    map { m => Message(body = ByteString("Message" + m)) }.
    to(Sink(events)). //eventsに格納する。
    run()
}

コンソール出力(printlnで出力されている)

Message2 is processed by 1
Message1 is processed by 0
Message3 is processed by 2
Message4 is processed by 0
Message6 is processed by 2
Message5 is processed by 1
Message8 is processed by 1
Message9 is processed by 2
Message7 is processed by 0
Message10 is processed by 0

その他

残念ながら下記のissueの問題が解決しないと使えないのではという印象。現状勝手にAckされてしまうのでメッセージが失われてしまう可能性がある。
https://github.com/ScalaConsultants/reactive-rabbit/issues/13

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3