Scala
Akka

Akka実践バイブルをゆっくり読み解く 第10章メッセージチャネル

Akka実践バイブルをゆっくり読み解く企画の第10章です。

第10章 メッセージチャネル

これまでのメッセージ送信は、メッセージ送信元のアクターが送信先のアクターを知っていることが前提であった。ここでは、メッセージ送信元が意図しない送信先にメッセージを届ける方法について扱う。

チャネルの種類

ポイントツーポイントチャネル

これまでの章で扱ってきたメッセージ送信は、全てポイントツーポイントチャネルを使用している。ポイントは、『送信側が次にどこにメッセージを送信すれば良いかを把握している』ということにある。
基本的にはポイントツーポイントチャネルで十分である。

パブリッシュ・サブスクライブチャネル

いわゆる「パブリッシャ/サブスクライバモデル」を実現する。
パブリッシャはメッセージを配信し、それを購読している不特定多数のサブスクライバがメッセージを受信して処理を行う。

publish/subscribe
// メッセージの購読
// 第1引数:サブスクライバのActorRef
// 第2引数:パブリッシャのClass
system.eventStream.subscribe(deliverOrder.ref, classOf[Order])

// メッセージの配信
// 第1引数:配信メッセージ
val msg = new Order()
system.eventStream.publish(msg)

// メッセージ購読の部分解除
// 第1引数:サブスクライバのActorRef
// 第2引数:パブリッシャのClass
system.eventStream.unsubscribe(deliverOrder.ref, classOf[Order])

// メッセージ購読の全解除
// 第1引数:サブスクライバのActorRef
system.eventStream.unsubscribe(deliverOrder.ref)
サブスクライバ
class DeliverOrder() extends Actor {

  def receive = {
    case msg: Order => {
      // 購読処理
    }
  }
}

カスタムイベントバス

特定条件でのみパブリッシュ・サブスクライブを行いたい場合は、独自のパブリッシュ・サブスクライブチャネルを作成することもできる。
このために、AkkaではEventBusが用意されている。

EventBus.scala
trait EventBus {
  // パブリッシュするイベントの型
  type Event
  // イベントを送信するときのサブスクライバ選択に使用する分類子
  // 例えばClassifierがBooleanの場合はtrue時にのみ購読する
  // ClassifierがStringの場合はpublishされた文字列と購読対象の文字列が一致する場合のみ購読する
  type Classifier
  // イベントバスに登録するサブスクライバの型
  type Subscriber

  def subscribe(subscriber: Subscriber, to: Classifier): Boolean

  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean

  def unsubscribe(subscriber: Subscriber): Unit

  def publish(event: Event): Unit
}

EventBusの実装として3つのトレイトが用意されている。

  • LookupClassification
  • SubchannelClassification
  • ScanningClassification

Classifierには購読条件を指定することとなる。条件付けの方法は以下のように行う。
(Akka Documentより)

EventBusの定義
import akka.event.EventBus
import akka.event.LookupClassification

final case class MsgEnvelope(topic: String, payload: Any)

/**
 * Publishes the payload of the MsgEnvelope when the topic of the
 * MsgEnvelope equals the String specified when subscribing.
 */
class LookupBusImpl extends EventBus with LookupClassification {
  type Event = MsgEnvelope
  type Classifier = String
  type Subscriber = ActorRef

  // is used for extracting the classifier from the incoming events
  override protected def classify(event: Event): Classifier = event.topic

  // will be invoked for each event for all subscribers which registered themselves
  // for the event’s classifier
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.payload
  }

  // must define a full order over the subscribers, expressed as expected from
  // `java.lang.Comparable.compare`
  override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
    a.compareTo(b)

  // determines the initial size of the index data structure
  // used internally (i.e. the expected number of different classifiers)
  override protected def mapSize: Int = 128
}
利用する側
val lookupBus = new LookupBusImpl
// MsgEnvelopeのtopicが"greetings"のメッセージを購読する
lookupBus.subscribe(testActor, "greetings")
lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
lookupBus.publish(MsgEnvelope("greetings", "hello"))
expectMsg("hello")

特殊チャネル

デッドレターチャネル

メッセージが配信できない等、意図した配信先にメッセージを届けることができない場合にデッドレターチャネルにメッセージが届く。メッセージが正しく届いている限りはデッドレターチャネルは使用されない。
デッドレターの主な使用用途は以下の2つ。

  • 終了したアクターへのメッセージ配信の検知
  • エラー処理

終了したアクターへのメッセージ配信の検知

終了したアクターにメッセージが送信される場合、そのメッセージは受信されることがないのでデッドレターチャネルに送られる。

終了したアクターへのメッセージ配信
val actor = ...
val msg = new Order()

// アクターの終了
actor ! PoisonPill
// 終了したアクターへのメッセージ送信
// このメッセージはデッドレターへと送られる
actor ! msg

エラー処理

例外処理としてデッドレターを利用する
object match {
  case NormalCase1 => {
    // 正常時の処理
    ... 
  }
  case NormalCase2 => {
    // 正常時の処理
    ... 
  }
  case _ => {
    // 例外時の処理
    val msg = new Order()

    // デッドレターにメッセージを送信する①
    // ただし、これだとメッセージ送信元の情報が自アクターになってしまい、
    // 本来の送信元の情報が消失してしまう
    system.deadLetters ! msg

    // デッドレターにメッセージを送信する②
    // 自分でDeadLetterオブジェクトにラップしてデッドレターに渡すと
    // DeadLetterオブジェクトへのラップ処理がスキップされるため
    // メッセージ送信元の情報を保持したまま購読者に情報を伝えることができる
    val dead = DeadLetter(msg, actorFrom, actorTo)
    system.deadLetters ! dead
  }

デッドレターの使い方

デッドレターチャネルに送られたメッセージはDeadLetterというcase classにラップされたオブジェクトとなる。

DeadLetterの宣言
/**
 * message   メッセージ内容
 * sender    メッセージの送信元
 * recipient メッセージの送信先 ←本来受信してほしかったActor
 */
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters

DeadLeterをサブスクライブするようにすると、DeadLetterにメッセージが届いた際にそれを検知して処理することができるようになる。

Deadletterの購読
// DeadLetterを購読するアクターのActorRef
val deadLetterMonitor = ...

system.eventStream.subscribe(
  deadLetterMonitor.ref,
  classOf[DeadLetter])      // DeadLetterを購読

保証配信チャネル

メッセージ配信の一般的なルールでは、メッセージは「1回配信される」「配信されない」のいずれかを満たすようになっている。これではシステムが求める信頼性を満たすことができない場合に、メッセージ消失を防ぐために何らかの保証配信を行う必要がある。
Akkaでは、完全な保証配信を実装していない。その代わりに、「メッセージの消失を防ぐ」ことを実現するための仕組みを提供することで、一定レベルの保証配信を実現することができるようになっている。

ReliableProxy

ReliableProxyが起動すると、異なるノード上の2つのアクターシステム間にトンネルが作成される。トンネルの入口をProxy、出口(代理の受信者)をEgressと呼び、メッセージの本来の送信元/送信先の間をこいつらが
Proxyがリモートノードに存在するアクターへのメッセージ配信に失敗した場合に、配信に成功するまで再送し続ける。また、Egressは、メッセージを受信した際に、メッセージの本来の受信者に1度だけメッセージを転送する。これによって、「メッセージが必ず1回配信される」ことを実現する。
ただ、メッセージの配信先のアクターが終了してしまうと受信者不在となって、いくらReliableProxyが頑張ってもメッセージが受信されない。この場合、配信先のアクターの終了を伝達させてReliableProxyが終了するようにすることで、一方的にProxy経由でメッセージが配信されないようにして解決する。

ReliableProxyの生成
val pathToEcho = "akka.tcp://actorSystem@127.0.0.1:2553/user/echo"
// 500ミリ秒単位で再送するReliableProxy
val proxy = system.actorOf(ReliableProxy.props(pathToEcho, 500.millis), "proxy")

proxy ! msg

ReliableProxyによってトンネルを生成するため、メッセージの配信元のではその旨を実装する必要があるが、受信側ではどのように配信されているかを意識する必要がないようになっている。つまり、受信側では特別な実装を行う必要がない。

個人的まとめ

今回登場したチャネルの中でも、特にDeadLetterによる例外検知とReliableProxyによるリトライは、適切に業務を成立させるために十分な設計をした上で適切に使用する必要があると感じた。ただ、最新版のAkkaではReliableProxydeprecatedになっていて、代わりにAtLeastOnceDeliveryを使用するらしい。AtLeastOnceDeliveryはチラッと見てみたけど、persistenceIdなるものを定義する必要があったり少しReliableProxyとは使い方が異なるようなので、またの機会があれば・・・