Akka実践バイブルをゆっくり読み解く企画の第10章です。
第10章 メッセージチャネル
これまでのメッセージ送信は、メッセージ送信元のアクターが送信先のアクターを知っていることが前提であった。ここでは、メッセージ送信元が意図しない送信先にメッセージを届ける方法について扱う。
チャネルの種類
ポイントツーポイントチャネル
これまでの章で扱ってきたメッセージ送信は、全てポイントツーポイントチャネルを使用している。ポイントは、『送信側が次にどこにメッセージを送信すれば良いかを把握している』ということにある。
基本的にはポイントツーポイントチャネルで十分である。
パブリッシュ・サブスクライブチャネル
いわゆる「パブリッシャ/サブスクライバモデル」を実現する。
パブリッシャはメッセージを配信し、それを購読している不特定多数のサブスクライバがメッセージを受信して処理を行う。
// メッセージの購読
// 第1引数:サブスクライバのActorRef
// 第2引数:パブリッシャのClass
system.eventStream.subscribe(deliverOrder.ref, classOf[Order])
// メッセージの配信
// 第1引数:配信メッセージ
val msg = new Order()
system.eventStream.publish(msg)
// メッセージ購読の部分解除
// 第1引数:サブスクライバのActorRef
// 第2引数:パブリッシャのClass
system.eventStream.unsubscribe(deliverOrder.ref, classOf[Order])
// メッセージ購読の全解除
// 第1引数:サブスクライバのActorRef
system.eventStream.unsubscribe(deliverOrder.ref)
class DeliverOrder() extends Actor {
def receive = {
case msg: Order => {
// 購読処理
}
}
}
カスタムイベントバス
特定条件でのみパブリッシュ・サブスクライブを行いたい場合は、独自のパブリッシュ・サブスクライブチャネルを作成することもできる。
このために、AkkaではEventBus
が用意されている。
trait EventBus {
// パブリッシュするイベントの型
type Event
// イベントを送信するときのサブスクライバ選択に使用する分類子
// 例えばClassifierがBooleanの場合はtrue時にのみ購読する
// ClassifierがStringの場合はpublishされた文字列と購読対象の文字列が一致する場合のみ購読する
type Classifier
// イベントバスに登録するサブスクライバの型
type Subscriber
def subscribe(subscriber: Subscriber, to: Classifier): Boolean
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean
def unsubscribe(subscriber: Subscriber): Unit
def publish(event: Event): Unit
}
EventBus
の実装として3つのトレイトが用意されている。
- LookupClassification
- SubchannelClassification
- ScanningClassification
Classifier
には購読条件を指定することとなる。条件付けの方法は以下のように行う。
(Akka Documentより)
import akka.event.EventBus
import akka.event.LookupClassification
final case class MsgEnvelope(topic: String, payload: Any)
/**
* Publishes the payload of the MsgEnvelope when the topic of the
* MsgEnvelope equals the String specified when subscribing.
*/
class LookupBusImpl extends EventBus with LookupClassification {
type Event = MsgEnvelope
type Classifier = String
type Subscriber = ActorRef
// is used for extracting the classifier from the incoming events
override protected def classify(event: Event): Classifier = event.topic
// will be invoked for each event for all subscribers which registered themselves
// for the event’s classifier
override protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event.payload
}
// must define a full order over the subscribers, expressed as expected from
// `java.lang.Comparable.compare`
override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
a.compareTo(b)
// determines the initial size of the index data structure
// used internally (i.e. the expected number of different classifiers)
override protected def mapSize: Int = 128
}
val lookupBus = new LookupBusImpl
// MsgEnvelopeのtopicが"greetings"のメッセージを購読する
lookupBus.subscribe(testActor, "greetings")
lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
lookupBus.publish(MsgEnvelope("greetings", "hello"))
expectMsg("hello")
特殊チャネル
デッドレターチャネル
メッセージが配信できない等、意図した配信先にメッセージを届けることができない場合にデッドレターチャネルにメッセージが届く。メッセージが正しく届いている限りはデッドレターチャネルは使用されない。
デッドレターの主な使用用途は以下の2つ。
- 終了したアクターへのメッセージ配信の検知
- エラー処理
終了したアクターへのメッセージ配信の検知
終了したアクターにメッセージが送信される場合、そのメッセージは受信されることがないのでデッドレターチャネルに送られる。
val actor = ...
val msg = new Order()
// アクターの終了
actor ! PoisonPill
// 終了したアクターへのメッセージ送信
// このメッセージはデッドレターへと送られる
actor ! msg
エラー処理
object match {
case NormalCase1 => {
// 正常時の処理
...
}
case NormalCase2 => {
// 正常時の処理
...
}
case _ => {
// 例外時の処理
val msg = new Order()
// デッドレターにメッセージを送信する①
// ただし、これだとメッセージ送信元の情報が自アクターになってしまい、
// 本来の送信元の情報が消失してしまう
system.deadLetters ! msg
// デッドレターにメッセージを送信する②
// 自分でDeadLetterオブジェクトにラップしてデッドレターに渡すと
// DeadLetterオブジェクトへのラップ処理がスキップされるため
// メッセージ送信元の情報を保持したまま購読者に情報を伝えることができる
val dead = DeadLetter(msg, actorFrom, actorTo)
system.deadLetters ! dead
}
デッドレターの使い方
デッドレターチャネルに送られたメッセージはDeadLetter
というcase class
にラップされたオブジェクトとなる。
/**
* message メッセージ内容
* sender メッセージの送信元
* recipient メッセージの送信先 ←本来受信してほしかったActor
*/
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters
DeadLeter
をサブスクライブするようにすると、DeadLetter
にメッセージが届いた際にそれを検知して処理することができるようになる。
// DeadLetterを購読するアクターのActorRef
val deadLetterMonitor = ...
system.eventStream.subscribe(
deadLetterMonitor.ref,
classOf[DeadLetter]) // DeadLetterを購読
保証配信チャネル
メッセージ配信の一般的なルールでは、メッセージは「1回配信される」「配信されない」のいずれかを満たすようになっている。これではシステムが求める信頼性を満たすことができない場合に、メッセージ消失を防ぐために何らかの保証配信を行う必要がある。
Akkaでは、完全な保証配信を実装していない。その代わりに、「メッセージの消失を防ぐ」ことを実現するための仕組みを提供することで、一定レベルの保証配信を実現することができるようになっている。
ReliableProxy
ReliableProxy
が起動すると、異なるノード上の2つのアクターシステム間にトンネルが作成される。トンネルの入口をProxy
、出口(代理の受信者)をEgress
と呼び、メッセージの本来の送信元/送信先の間をこいつらが
Proxy
がリモートノードに存在するアクターへのメッセージ配信に失敗した場合に、配信に成功するまで再送し続ける。また、Egress
は、メッセージを受信した際に、メッセージの本来の受信者に1度だけメッセージを転送する。これによって、「メッセージが必ず1回配信される」ことを実現する。
ただ、メッセージの配信先のアクターが終了してしまうと受信者不在となって、いくらReliableProxy
が頑張ってもメッセージが受信されない。この場合、配信先のアクターの終了を伝達させてReliableProxy
が終了するようにすることで、一方的にProxy
経由でメッセージが配信されないようにして解決する。
val pathToEcho = "akka.tcp://actorSystem@127.0.0.1:2553/user/echo"
// 500ミリ秒単位で再送するReliableProxy
val proxy = system.actorOf(ReliableProxy.props(pathToEcho, 500.millis), "proxy")
proxy ! msg
ReliableProxy
によってトンネルを生成するため、メッセージの配信元のではその旨を実装する必要があるが、受信側ではどのように配信されているかを意識する必要がないようになっている。つまり、受信側では特別な実装を行う必要がない。
個人的まとめ
今回登場したチャネルの中でも、特にDeadLetter
による例外検知とReliableProxy
によるリトライは、適切に業務を成立させるために十分な設計をした上で適切に使用する必要があると感じた。ただ、最新版のAkkaではReliableProxy
はdeprecated
になっていて、代わりにAtLeastOnceDelivery
を使用するらしい。AtLeastOnceDelivery
はチラッと見てみたけど、persistenceId
なるものを定義する必要があったり少しReliableProxy
とは使い方が異なるようなので、またの機会があれば・・・