Edited at
ScalaDay 14

AkkaのMailboxを自作する

この記事は、Scala Advent Calendar 2018 14日目の記事です。


Mailboxとは

アクターがメッセージを保持するために使用され、内部的にはジョブキューを持っています。

Mailboxが処理されることで、アクターはメッセージを受け取ることができます。アクターはメッセージを受け取ると、任意の処理を実行することが可能で、更に他のアクターへメッセージを送る事ができます。(その場合、メッセージはMailboxへ貯まります)

もちろん送るだけでなく、レスポンスを受け取ることも可能です。

よくあるFutureを使った非同期処理と違うのは、Mailboxを介して非同期処理のやり取りを行うので、必然的にメッセージ駆動ベースの実装となっていくところです。


メッセージの種類

メッセージは基本的に、どんな型でも送受信することができます。メッセージを受け取ったときにmatch ~ caseを使って何の型なのかを判断し、処理を分ける使い方が多いです。


MailboxにRedisを使用してみる

今回自作するのは、裏側にRedisを使ったMailboxです。公式でも昔はRedisを使ったMailboxがあったようなのですが、最新のAkkaのバージョンには対応していないようでした。

actor/

mailbox/
RedisMailbox.scala

メールボックスのディレクトリを作成し、RedisMailbox.scala を作ってみます。


RedisMailbox.scala

package example.actor.mailbox

import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.{Envelope, MailboxType, MessageQueue, ProducesMessageQueue}
import akka.pattern.CircuitBreaker
import akka.serialization.{Serialization, SerializationExtension}
import com.typesafe.config.{Config, ConfigFactory}

trait RedisMailboxTypeQueueSemantics

object RedisMailbox {

class Redis()(val owner: Option[ActorRef], val system: Option[ActorSystem]) extends MessageQueue with RedisMailboxTypeQueueSemantics {
... // A
}
}

class RedisMailbox extends MailboxType with ProducesMessageQueue[RedisMailbox.Redis] {
... // B
}


まずは大枠です。

RedisMailboxTypeQueueSemanticsRequiresMessageQueue によって、アクターが必要としているMailboxの種類を限定する際に使用します。Actorの内部処理に関しては、特に変わったところは無いため割合します。


AnyActor.scala

class AnyActor extends Actor with ActorLogging with RequiresMessageQueue[RedisMailboxTypeQueueSemantics] with InteractivityFormat {

...
}

また、config側で akka.actor.mailbox.requirementsを使うことでも同様のことが出来ます。


application.conf

akka.actor.mailbox.requirements {

"example.actor.mailbox.RedisMailboxTypeQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "example.actor.mailbox.RedisMailbox"
}


Aの部分の作成

ソースコードのAの部分は、MailboxのキューをバックエンドのRedisから送出する処理を書いていきます。


A

    private final val config = ConfigFactory.load()

private final val name = "redis_queue"
private final val clients = RedisClient.clients //<- Redisにアクセスするためのclient
private final val breaker = CircuitBreaker(system.get.scheduler, 10, 30.seconds, 1.minute)

まずは先頭から、Configの読み込みや、Redisのキー、Clientなどを読み込んでおきます。

clientによってはConnection Poolを持っていることがあるので、その場合はAではなく、外側のObjectに置きます。(今回はRedisMailboxを使ったActorは一つだけでしたのでここに置きました。)

CircuitBreakerについてもすみませんが割合させていただきます。


A

    def enqueue(receiver: ActorRef, handle: Envelope): Unit = breaker.withSyncCircuitBreaker {

clients.withClient { client =>
val ref = Serialization.serializedActorPath(handle.sender)
client.rpush(name, s"${handle.message}|%%%|$ref")
}
}
def dequeue(): Envelope = breaker.withSyncCircuitBreaker {
val result: Option[String] = clients.withClient { client =>
client.lpop(name)
}
result match {
case Some(result) => {
val splits = result.split(raw"\|%%%\|", 2)
if (splits.length == 2) {
val message: String = splits(0)
val serialization = SerializationExtension(system.get)
val sender: ActorRef = serialization.system.provider.resolveActorRef(splits(1))
Envelope(message, sender, system.get)
} else {
val message = result
Envelope(message, owner.get, system.get)
}
}
case _ => null
}
}
def numberOfMessages: Int = breaker.withSyncCircuitBreaker {
val result: Option[Long] = clients.withClient { client =>
client.llen(name)
}
result.get.toInt
}
def hasMessages: Boolean = breaker.withSyncCircuitBreaker {
val result: Option[Long] = clients.withClient { client =>
client.llen(name)
}
result.get > 0
}
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}

Aの部分の本体になります。ここでは、それぞれ5つのメソッドが定義されています。


enqueue

enqueueは、その名の通り、キューを入れる処理、メッセージが送られてきたらActorrefをシリアライズしてメッセージと共にRedisへrpushします。今回は、わかりやすくするためにメッセージ部分とActorref部分を|%%%|で区切っていますが、Akka-http経由などで送られてきた内容などがあれば、エンコード処理をしておくと安全です。


dequeue

dequeueは、キューから取り出す処理を行います。もし並列化しても対応できるようにlpopによって取り出します。Redisを使用しているのでシングルスレッドによりボトルネックにはなりますが、 キュー処理の時間 < アクターでの処理時間となれば十分並列化の効果を期待できると思います!

dequeueは内部的には、lpopが空を返すまで処理を続けます。特にアクターが新たにメッセージを送らなくても、キューが溜まっている場合は無限ループのように処理をし続けてくれます。もしもキューの消化が止まってしまった場合は

context.system.scheduler.schedule(Duration.Zero, 1 seconds, AnyActor, "ping")

このようにscheduleを使うことでメッセージの消化連鎖のためのトリガにすることが可能です。これによって、actor基盤を持たないシステムからrpushしてもメッセージを消化させることが可能です。

val sender: ActorRef = serialization.system.provider.resolveActorRef(splits(1))

ここで気をつけてほしいのですが、デシリアライズしたActorRefです。

このActorRefがないと、悲しいことにxxx ? yyyのようなAskを使った処理で、元のActorに戻ってきません。同じアクター内で互いに送り合う分には良いのですが、Askは受け取り用の異なるアクターなので返し先がわからないとメッセージが迷子になってしまうので注意が必要です。


numberOfMessages / hasMessages

こちらは、メッセージがどれくらい溜まっているかを計算し返すためのメソッドになります。

メッセージの消化は、lpopが空を返したときにも止まるので、とくに気をつける部分はありませんでした。


cleanUp

こちらは、クリーンアップ時にメッセージが残っていたら、デッドレターキューに移動させます。

処理されなかったメッセージを退避したり、エラーの原因を解析するために使用されます。

次はBの処理です。


B

import RedisMailbox._

def this(settings: ActorSystem.Settings, config: Config) = {
this()
}

final override def create(
owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new Redis()(owner, system)


Bの処理は、Mailboxの実体を作る処理になります。


最後に

実際の呼び出し部分とConfigの設定は下記のように設定します。

var anyActor = context.actorOf(Props[AnyActor].withDispatcher("custom-dispatcher"), "any-actor")


config

custom-dispatcher {

mailbox-requirement = "example.actor.mailbox.RedisMailboxTypeQueueSemantics"
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}

akka.actor.mailbox.requirements {
"example.actor.mailbox.RedisMailboxTypeQueueSemantics" =
custom-dispatcher-mailbox
}

custom-dispatcher-mailbox {
mailbox-type = "example.actor.mailbox.RedisMailbox"
}

akka {
log-config-on-start = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel= "DEBUG"
log-dead-letters = 0
log-dead-letters-during-shutdown = off
actor {
mailbox {
}
debug {
receive = on
autoreceive = on
lifecycle = on
}
}
}