目的
AkkaStreams用のamqpアダプタである、Reactive Rabbitを使ってみます。
用語(あやしい)
Akka Streams: Reactive StreamsのAkka実装。Reactive Streamsは、BackPressure制御を行う非同期ストリーム処理のことらしい。
BackPressure: Subscriberが自分が処理できる量をPublisherに通知しながら、処理を溢れさせずに、リソースを余らせずに、効率的にストリームを処理する仕組みらしい。
Source(publisher): 出力を持つステージ。ダウンストリームのステージがデータを受け取れる状態になれば、データを放出する。
Sink(subscriber): 入力を持つステージ。データを要求し受け入れる。負荷によってはアップストリームの処理をスローダウンさせる事がある。
Flow: データの変換を行い、SourceとSinkをそれぞれ結合する?
ステージ(Processing Stage):フローやフローグラフを構築する、全てのビルディングブロックのための総称?
実験
環境
ライブラリなど | バージョン |
---|---|
scala | 2.11.7 |
akka | 2.4.0 |
akka-stream | 1.0 |
reactive-rabbit | 1.0.2 |
rabbitmq | 3.5.6 |
作成したサンプルプログラムの説明
RabbitMQに下記の2つのキューを作成する
- events
- results
以下の2つのフローを用意します。
- PreparingFlow: 10個のメッセージ「Message[メッセージ番号]」 を作り、RabbitMQのeventsキューに入れる
- ProcessingFlow: eventsキューからメッセージを取り出し、メッセージの内容を「Message[メッセージ番号] is processed by [自分の番号]」に書き換えて、resultsキューに入れる。ProcessingFlowは、3つ作る。(メッセージの取り合いをさせる)
サンプルプログラム
sbtでビルドするためのBuild.scala
import sbt._
import sbt.Keys._
object Build extends sbt.Build {
lazy val root = Project(
id = "akka-test",
base = file("."),
settings = Defaults.defaultSettings ++ packSettings ++
Seq(
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature"),
scalaVersion := "2.11.7",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.0",
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0",
"io.scalac" %% "reactive-rabbit" % "1.0.2"
)
)
)
}
Reactive Rabbitを使用したコード本体(もうちょい書き直すべきか・・・)
import akka.actor.ActorSystem
import akka.util.ByteString
import io.scalac.amqp.Connection
import io.scalac.amqp.Message
import akka.stream.scaladsl.{Flow, Source, Sink}
import akka.stream.ActorMaterializer
object HelloReactiveRabbit extends App {
// RabbitMQへのコネクションを作る。デフォルトだとlocalのRabbitMQに接続されるっぽい。
val connection = Connection()
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
// ProcessingFlowを3つつくる。
val stages = (0 to 2).map { i =>
// eventsからの入力アダプタを作る
val in = connection.consume(queue = "events")
// resultへの出力アダプタを作る
val out = connection.publishDirectly("results")
val flow: Flow[Message, Message, Unit] = {
Flow[Message].map { m => {
//メッセージの内容を「Message[メッセージ番号] is processed by [自分のフロー番号]」に書き換える
val body = m.body.decodeString("utf-8")
val str = body + " is processed by " + i
println(str)
Message(body = ByteString(str))
};
}
}
Source(in). // eventsキューからの取出し
map { m => m.message }. // flowの中でやってしまっても良かったかも
via(flow). // メッセージ書き換え
to(Sink(out)). // resultsキューへの格納
run()
}
val events = connection.publishDirectly(queue = "events")
Source(1 to 10). // Message[1-10]を作る。
map { m => Message(body = ByteString("Message" + m)) }.
to(Sink(events)). //eventsに格納する。
run()
}
コンソール出力(printlnで出力されている)
Message2 is processed by 1
Message1 is processed by 0
Message3 is processed by 2
Message4 is processed by 0
Message6 is processed by 2
Message5 is processed by 1
Message8 is processed by 1
Message9 is processed by 2
Message7 is processed by 0
Message10 is processed by 0
その他
残念ながら下記のissueの問題が解決しないと使えないのではという印象。現状勝手にAckされてしまうのでメッセージが失われてしまう可能性がある。
https://github.com/ScalaConsultants/reactive-rabbit/issues/13