この記事は、Scala Advent Calendar 2018 14日目の記事です。
Mailboxとは
アクターがメッセージを保持するために使用され、内部的にはジョブキューを持っています。
Mailboxが処理されることで、アクターはメッセージを受け取ることができます。アクターはメッセージを受け取ると、任意の処理を実行することが可能で、更に他のアクターへメッセージを送る事ができます。(その場合、メッセージはMailboxへ貯まります)
もちろん送るだけでなく、レスポンスを受け取ることも可能です。
よくあるFutureを使った非同期処理と違うのは、Mailboxを介して非同期処理のやり取りを行うので、必然的にメッセージ駆動ベースの実装となっていくところです。
メッセージの種類
メッセージは基本的に、どんな型でも送受信することができます。メッセージを受け取ったときにmatch ~ caseを使って何の型なのかを判断し、処理を分ける使い方が多いです。
MailboxにRedisを使用してみる
今回自作するのは、裏側にRedisを使ったMailboxです。公式でも昔はRedisを使ったMailboxがあったようなのですが、最新のAkkaのバージョンには対応していないようでした。
actor/
mailbox/
RedisMailbox.scala
メールボックスのディレクトリを作成し、RedisMailbox.scala
を作ってみます。
package example.actor.mailbox
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.{Envelope, MailboxType, MessageQueue, ProducesMessageQueue}
import akka.pattern.CircuitBreaker
import akka.serialization.{Serialization, SerializationExtension}
import com.typesafe.config.{Config, ConfigFactory}
trait RedisMailboxTypeQueueSemantics
object RedisMailbox {
class Redis()(val owner: Option[ActorRef], val system: Option[ActorSystem]) extends MessageQueue with RedisMailboxTypeQueueSemantics {
... // A
}
}
class RedisMailbox extends MailboxType with ProducesMessageQueue[RedisMailbox.Redis] {
... // B
}
まずは大枠です。
RedisMailboxTypeQueueSemantics
は RequiresMessageQueue によって、アクターが必要としているMailboxの種類を限定する際に使用します。Actorの内部処理に関しては、特に変わったところは無いため割合します。
class AnyActor extends Actor with ActorLogging with RequiresMessageQueue[RedisMailboxTypeQueueSemantics] with InteractivityFormat {
...
}
また、config側で akka.actor.mailbox.requirements
を使うことでも同様のことが出来ます。
akka.actor.mailbox.requirements {
"example.actor.mailbox.RedisMailboxTypeQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "example.actor.mailbox.RedisMailbox"
}
Aの部分の作成
ソースコードのAの部分は、MailboxのキューをバックエンドのRedisから送出する処理を書いていきます。
private final val config = ConfigFactory.load()
private final val name = "redis_queue"
private final val clients = RedisClient.clients //<- Redisにアクセスするためのclient
private final val breaker = CircuitBreaker(system.get.scheduler, 10, 30.seconds, 1.minute)
まずは先頭から、Configの読み込みや、Redisのキー、Clientなどを読み込んでおきます。
clientによってはConnection Poolを持っていることがあるので、その場合はAではなく、外側のObjectに置きます。(今回はRedisMailboxを使ったActorは一つだけでしたのでここに置きました。)
CircuitBreakerについてもすみませんが割合させていただきます。
def enqueue(receiver: ActorRef, handle: Envelope): Unit = breaker.withSyncCircuitBreaker {
clients.withClient { client =>
val ref = Serialization.serializedActorPath(handle.sender)
client.rpush(name, s"${handle.message}|%%%|$ref")
}
}
def dequeue(): Envelope = breaker.withSyncCircuitBreaker {
val result: Option[String] = clients.withClient { client =>
client.lpop(name)
}
result match {
case Some(result) => {
val splits = result.split(raw"\|%%%\|", 2)
if (splits.length == 2) {
val message: String = splits(0)
val serialization = SerializationExtension(system.get)
val sender: ActorRef = serialization.system.provider.resolveActorRef(splits(1))
Envelope(message, sender, system.get)
} else {
val message = result
Envelope(message, owner.get, system.get)
}
}
case _ => null
}
}
def numberOfMessages: Int = breaker.withSyncCircuitBreaker {
val result: Option[Long] = clients.withClient { client =>
client.llen(name)
}
result.get.toInt
}
def hasMessages: Boolean = breaker.withSyncCircuitBreaker {
val result: Option[Long] = clients.withClient { client =>
client.llen(name)
}
result.get > 0
}
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
Aの部分の本体になります。ここでは、それぞれ5つのメソッドが定義されています。
enqueue
enqueue
は、その名の通り、キューを入れる処理、メッセージが送られてきたらActorref
をシリアライズしてメッセージと共にRedisへrpushします。今回は、わかりやすくするためにメッセージ部分とActorref
部分を|%%%|
で区切っていますが、Akka-http経由などで送られてきた内容などがあれば、エンコード処理をしておくと安全です。
dequeue
dequeue
は、キューから取り出す処理を行います。もし並列化しても対応できるようにlpop
によって取り出します。Redisを使用しているのでシングルスレッドによりボトルネックにはなりますが、 キュー処理の時間 < アクターでの処理時間
となれば十分並列化の効果を期待できると思います!
dequeue
は内部的には、lpop
が空を返すまで処理を続けます。特にアクターが新たにメッセージを送らなくても、キューが溜まっている場合は無限ループのように処理をし続けてくれます。もしもキューの消化が止まってしまった場合は
context.system.scheduler.schedule(Duration.Zero, 1 seconds, AnyActor, "ping")
このようにschedule
を使うことでメッセージの消化連鎖のためのトリガにすることが可能です。これによって、actor基盤を持たないシステムからrpush
してもメッセージを消化させることが可能です。
val sender: ActorRef = serialization.system.provider.resolveActorRef(splits(1))
ここで気をつけてほしいのですが、デシリアライズしたActorRef
です。
このActorRef
がないと、悲しいことにxxx ? yyy
のようなAsk
を使った処理で、元のActorに戻ってきません。同じアクター内で互いに送り合う分には良いのですが、Ask
は受け取り用の異なるアクターなので返し先がわからないとメッセージが迷子になってしまうので注意が必要です。
numberOfMessages / hasMessages
こちらは、メッセージがどれくらい溜まっているかを計算し返すためのメソッドになります。
メッセージの消化は、lpop
が空を返したときにも止まるので、とくに気をつける部分はありませんでした。
cleanUp
こちらは、クリーンアップ時にメッセージが残っていたら、デッドレターキューに移動させます。
処理されなかったメッセージを退避したり、エラーの原因を解析するために使用されます。
次はBの処理です。
import RedisMailbox._
def this(settings: ActorSystem.Settings, config: Config) = {
this()
}
final override def create(
owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new Redis()(owner, system)
Bの処理は、Mailboxの実体を作る処理になります。
最後に
実際の呼び出し部分とConfigの設定は下記のように設定します。
var anyActor = context.actorOf(Props[AnyActor].withDispatcher("custom-dispatcher"), "any-actor")
custom-dispatcher {
mailbox-requirement = "example.actor.mailbox.RedisMailboxTypeQueueSemantics"
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
akka.actor.mailbox.requirements {
"example.actor.mailbox.RedisMailboxTypeQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "example.actor.mailbox.RedisMailbox"
}
akka {
log-config-on-start = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel= "DEBUG"
log-dead-letters = 0
log-dead-letters-during-shutdown = off
actor {
mailbox {
}
debug {
receive = on
autoreceive = on
lifecycle = on
}
}
}