メッセージの遅延の伝播
Actor ModelのActorはmailboxに届けられたメッセージを自分のスレッドで一つずつ処理するので、何らかの障害が起こって処理の遅延が発生した時に処理の遅延が後続のメッセージの処理にも伝播します。
例えば、ある処理が平均50msの処理時間がかかるときに、メッセージの送り手(Producer)が50ms以上の間隔でメッセージを送り続けたりすると、レイテンシはどんどん大きくなってしまいます。このとき、producerが40msの間隔でメッセージを送り続けると、1秒後には250msの遅延が発生します。
mailboxの容量
ここで発生しうるメッセージの遅延時間は以下のようになります。
(mailboxに溜まっているメッセージの数) × (メッセージの1つあたりの処理時間)
つまりmailboxに積まれるメッセージの数を制限することでメッセージの遅延時間も制限することができます。mailboxの容量を制限した場合、容量を超えたメッセージの処理は失敗になるので、メッセージの送り手の方でtimeoutによるエラー処理を行うか内部で失敗を検知して回復処理を行う必要があります。
Akkaでのmailboxの制限の仕方
AkkaのActorでmailboxを制限するには、mailboxの容量を制限するAcorにRequiresMessageQueueトレイトをmix-inして必要な設定を行います。
// メールボックスの容量を制限するActor
class Worker extends Actor
with RequiresMessageQueue[BoundedMessageQueueSemantics] with ActorLogging {
import Worker._
lazy val counter = context.actorSelection("/user/master/counter")
def receive = {
case work: Work =>
Thread.sleep(150L)
counter ! Finished
}
}
# application.conf
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
bounded-mailbox {
# mailboxの型
mailbox-type = "akka.dispatch.BoundedMailbox"
# mailboxの容量
mailbox-capacity = 4
# mailboxにメッセージを積むときにこの時間以内にmailboxが空かなかったらtimeoutになる。
# timeoutになったメッセージはdead letterに送られる。
mailbox-push-timeout-time = 50ms
}
タイムアウト
メールボックスから溢れて失敗したメッセージは何もしないとDead Letterに送られるだけなので、リカバリなどの対処が必要な場合は送り手の方でタイムアウトして何らかの処理を行う必要があります。
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import java.util.concurrent.TimeoutException
...
val timeout = 50.milliseconds
worker ? work onFailure {
case te: TimeoutException =>
// 何らかの処理
}
ここでask patternを用いる代わりにCircuit Breakerを用いることでProducerがConsumerにかける負荷を制限することができます。
参考文献
Reactive Design Patterns - 2.2.5 Bounding Latency even when Things go Wrong