Scala
Akka
actor
リアクティブ
akka-persistence

エンタープライズのためのAkka 〜 Akkaで信頼できるメッセージを送る

More than 1 year has passed since last update.

本記事はTIS Engineer Advent Calendar 2015、16日目の記事です。
AkkaAkka RemotingAkka PersistantAtLeasetOnceDelivelyあたりについて書きます。

はじめに

早いもので2015年もあと少しで過ぎ去ろうとしていますが、Web界隈ではリアクティブで盛り上がった年でした。そして、来る2016年早々に開催されるScala Matsuriもリアクティブネタ目白押しです。

そこで、リアクティブシステムはエンタープライズのようなガッチリしたシステムでも使えるのかをメッセージの信頼性や到達保証という観点で考えてみることにします。

リアクティブシステムはReactive Manifestoが論じている「Responsive(即応性)」, 「Resilient(耐障害性)」, 「Elastic(弾力性)」, 「Messsage Driven(メッセージ駆動)」なシステムで、代表的なのが、Play FrameworkAkkaなどTypesafe社のプラットフォームです。なぜリアクティブシステムが必要かとか、なぜ並行性が今注目されるのかはこちらを見て頂くとして、今回はAkkaのメッセージの性質に着目します。

Akkaのメッセージ

AkkaはActorベースのメッセージ駆動アーキテクチャ、いわゆるActorモデルがベースになっています。Actor同士のやりとりはすべてメッセージで行い、Actorは別のActorにメッセージを送ったあとは、その応答を待つような同期的な処理をするのではなく、非同期に自分のやりたい処理を進めて行きます。メッセージの送り先となったActorにはメッセージを受信したら、どんな振る舞いを行うかが定義されていて、受信したメッセージに応じた処理を実行します。この処理は送り主となったActorとは、別スレッド、時にはサーバさえも別で、並行にそして分散して処理されます。ざっくりですが、これがこれまで苦労して実装してきた並行処理を、同期処理を実装する感覚で容易に実装するできるActorモデルです。

さて、ここで気になるのは、別のActorに送ったメッセージの行く末です。送り主はその結果を気にせず自分の処理を非同期で進めて行くと言いましたが、送信したメッセージはきちんと届いているのでしょうか?同じJVM上のコンポーネントを呼び出しているのであれば、それほど気にすべき事ではありませんが、相手がリモートActor、つまりはネットワークを介して別サーバ上に配備されているコンポーネントの場合はネットワーク障害等の問題でメッセージが送信先Actorに到達しない事も十分に考えられます。

例えば、以前紹介したソーラーファームモニターのように毎秒パネルから送られている電力を監視・ビジュアライズし故障を検知するようなシステムであれば、一時的にメッセージが受信できなかったとしても、その後、最新の電力を受信できれば何の問題もありません。「今」発電している電力、「今」故障していないかが重要だからです。
しかし、そのメッセージがボーナスを会社から自分の口座に振り込むというメッセージだった場合はどうでしょうか?ネットワークの問題で振り込まれず、次回の賞与まで我慢という話にはならないでしょう。このメッセージは必ず送信先のActorに到達し処理されなければなりません。エンタープライズのがっちりシステムにとっては当然の話です。

前置きはここまでとして、Typesafe Reactive Platformを支えるAkkaのメッセージは必ず届くのでしょうか?

メッセージの信頼性

Message Delivery Reliability日本語訳)によるとメッセージの信頼性は3種類に分類されます。

No 種類 内容
1 at-most-once 最大1回。
1回だけ送信するけど届くかどうかは
分からない。
2 at-least-once 少なくとも1回。
必ず1回は届くけど、2回とか3回とか
重複して送ってしまうかも知れない。
3 exactly-once 正確に1回。
重複もなく必ず1回だけ届く。

3番目が最もエンタープライズなかんじのやつですね。下に行くほど堅い処理になり、その分コストがかかりそうということは想像がつきますが、それぞれAkkaではどのように実現するのでしょうか?簡単なメッセージのやり取りを例に試してみたいと思います。

at-most-once

小学校の先生(Teacher Actor)が九九の問題をガンガン出して、生徒(Student Actor)が順に解いていくサンプルを動かしてみます。(ソースコード

at-most-once1.png

まずは、普通にAkka Remotingでメッセージを送ってみます。

実装

  • ライブラリ
build.sbt
"com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT",
"com.typesafe.akka" %% "akka-remote" % "2.4-SNAPSHOT"

akka-actorとakka-remoteを使います。

  • Teacher Actorの実装
TeacherActor
  var exerciseId = 0
  val scheduler = context.system.scheduler.schedule(0 milliseconds, 3 seconds, self, CreateExercise)

  def receive: Receive = {
    case CreateExercise =>
      exerciseId += 1
      self ! Exercise(exerciseId, Multiply(Random.nextInt(9) + 1, Random.nextInt(9) + 1))

    case Exercise(exerciseId, multiply) if (exerciseId > 9) =>
      scheduler.cancel()

    case Exercise(exerciseId, multiply) =>
      log.info(s" $exerciseId : ${multiply} = ?")
      student !  StudentActor.Msg(exerciseId, multiply)
  }

TeacherActorは3秒毎に計算問題を作成し、生成した問題をStudentActorに送ります。

  • Student Actorの実装
StudentActor
  def receive: Receive = {
    case Msg(deliveryId, Multiply(n1, n2)) =>
       log.info(s" $deliveryId :  calculating ... ${Multiply(n1, n2)} = ${n1 * n2}")
  }

SutudentActorは計算問題を受信すると問題を解きます。receiveメソッドに定義されているのが受信されるメッセージに対応する振る舞いです。

実行結果

log
akka-message-delivery-reliability $sbt "run-main school.atmostonce.MathApp"
[TeacherActor] -  1 : 8 * 1 = ?
[StudentActor] -  1 :  calculating ... 8 * 1 = 8
[TeacherActor] -  2 : 1 * 3 = ?
[StudentActor] -  2 :  calculating ... 1 * 3 = 3
[TeacherActor] -  3 : 7 * 2 = ?
[StudentActor] -  3 :  calculating ... 7 * 2 = 14
[TeacherActor] -  4 : 6 * 2 = ?
[StudentActor] -  4 :  calculating ... 6 * 2 = 12
[TeacherActor] -  5 : 1 * 8 = ?
[StudentActor] -  5 :  calculating ... 1 * 8 = 8

1問目「1 : 8 * 1 = ?」という問題がTeacherActorから送られ、StudentActorによって「1 : calculating ... 8 * 1 = 8」のように問題が説かれました。このように、TeacherActorからStudentActorへ5問の計算問題が正しく送られています。1つのコンソールにログが表示されているためシリアルに実行されているように見えますが、TeacherActorとStudentActorは並行に処理されています。
何事もなければ、このようにすべてのメッセージがTeacherActorからStudentActorに送信され計算が行われますが、TeacherActorとStudentActorは別のサーバにいます。ネットワーク等の問題でメッセージが届かなかった場合はどうなるのでしょうか?

Akka Persistence

StudentActorへのメッセージの到達確認の前に、受信結果を分かりやすくするため、処理した情報をAkka Persistantを使って永続化するようにしておきます。

実装

  • ライブラリ
build.sbt
"com.typesafe.akka" %% "akka-persistence" % "2.4.0",
"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8",

3つのライブラリを追加します。

  • StudentActorの設定
student.conf
akka {
  persistence {
    snapshot-store {
      plugin = "akka.persistence.snapshot-store.local"
      local.dir = "target/persistence/student"
    }
    journal {
      plugin = "akka.persistence.journal.leveldb"
      leveldb.native = false
      leveldb.dir = "target/persistence/journal/student"
    }
  }
}

StudentActorの永続化する情報は「target/persistence/student」に保存されます。TeacherActorは同様に「target/persistence/teacher」に永続化します。

  • StudentActorの実装
StudentActor
class StudentActor extends ActorLogging with PersistentActor {

  def persistenceId: String = "student-id-1"
  var state = StudentState()

  def receiveCommand: Receive = {
    case Msg(deliveryId, Multiply(n1, n2)) =>
      log.info(s" $deliveryId :  calculating ... ${Multiply(n1, n2)} = ${n1 * n2}")
      persist(Msg(deliveryId, Multiply(n1, n2)))(updateState)

    case "print" => log.info(s"  state = $state")
  }

  val receiveRecover: Receive = {
    case msg: Msg => updateState(msg)
  }

  def updateState(msg: Msg): Unit = {
    state = state.updated(msg)
  }
}

final case class StudentState(received: List[Long] = Nil) {
  def updated(s: Msg): StudentState = copy(received :+ s.deliveryId)
  def contains(s: Msg) = received.contains(s.deliveryId)
}

Actorトレイトの代わりにPersistentActorトレイトをミックスインし、persistenceId、receiveCommand、receiveRecoverを実装します。StudentActorでは受信したメッセージ「Msg(deliveryId, Multiply(n1, n2))」をそのまま永続化します。今回スナップショットの保存は行いません。TeacherActorからすべてのメッセージを送信すると最後に永続化した情報を出力します。

実行結果:正常実行時

log
akka-message-delivery-reliability $sbt "run-main school.akkapersistence.MathApp"
[TeacherActor] -  1 : 5 * 5 = ?
[StudentActor] -  1 :  calculating ... 5 * 5 = 25
[TeacherActor] -  2 : 1 * 9 = ?
[StudentActor] -  2 :  calculating ... 1 * 9 = 9
[TeacherActor] -  3 : 5 * 3 = ?
[StudentActor] -  3 :  calculating ... 5 * 3 = 15
[TeacherActor] -  4 : 2 * 6 = ?
[StudentActor] -  4 :  calculating ... 2 * 6 = 12
[TeacherActor] -  5 : 4 * 3 = ?
[StudentActor] -  5 :  calculating ... 4 * 3 = 12
[StudentActor] -   result = List(1, 2, 3, 4, 5)

最後にStudentActorで計算問題に回答したメッセージNoが出力され1,2,3,4,5と全5問を解いたことが分かります。

では、StudentActorを途中でダウンさせてみます。StudentActorをCtrl+Cで強引に落してみると、StudentActorがダウンしている間に送信されたメッセージはDeadLetterとなり、再度起動させると、何事もなかったかのように起動後に送信されたメッセージを処理しはじめます。

実行結果:障害発生時

  • TeacherActor
log
akka-message-delivery-reliability $sbt "run-main school.akkapersistence.MathApp teacher"
[TeacherActor] -  1 : 5 * 8 = ?
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[TeacherActor] -  2 : 1 * 7 = ?
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.akkapersistence.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#108113105] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  3 : 1 * 6 = ?
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.akkapersistence.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#108113105] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  4 : 2 * 3 = ?
[TeacherActor] -  5 : 7 * 3 = ?
  • StudentActor
log
akka-message-delivery-reliability $sbt "run-main school.akkapersistence.MathApp student"
[StudentActor] -  1 :  calculating ... 5 * 8 = 40
^Cakka-message-delivery-reliability $sbt "run-main school.akkapersistence.MathApp student"
[StudentActor] -  4 :  calculating ... 2 * 3 = 6
[StudentActor] -  5 :  calculating ... 7 * 3 = 21
[StudentActor] -   result = List(1, 4, 5)

1件目のメッセージを受信した後、StudentActorをストップさせ、すぐに起動しました。実行結果を確認すると、StudentActorがダウンしていた間のメッセージ「2 : 1 * 7 = ?」と「3 : 1 * 6 = ?」は処理されておらず(永続化されていない)ロストしていることが分かります。
Akkaでのデフォルト実装は最も軽量なat-most-onceで実装されているためです。at-most-onceは最大1回のメッセージを送信し、今回のように、状況によっては届かないという事態が発生してしまいます。

at-most-once2.png

at-least-once

StudentActorがダウンしている間に送られたメッセージをロストせず、必ず送り届けるというのがat-least-onceです。AkkaにはAtLeasetOnceDelivelyというトレイトがあるので、TeacherActorはActorトレイトの代わりにAtLeastOnceDeliveryトレイトをミックスインします。

実装

  • TeacherActorの実装
TeacherActor
class TeacherActor(destination: ActorSelection) extends ActorLogging with AtLeastOnceDelivery {

  def persistenceId: String = "teacher-id-1"

  def receiveCommand: Receive = {
    case CreateExercise =>
      exerciseId += 1
      self ! Exercise(exerciseId, Multiply(Random.nextInt(9) + 1, Random.nextInt(9) + 1))

    case Exercise(exerciseId, multiply) if (exerciseId > 5) =>
      scheduler.cancel()
      student ! "print"

    case Exercise(_, multiply) =>
      persist(MsgSent(multiply))(updateState)

    case Confirm(deliveryId) =>
      log.info(s" $deliveryId :   received")
      persist(MsgConfirmed(deliveryId))(updateState)

    case UnconfirmedWarning(metadata) =>
      log.warning(s"  unconfirmedWarning[$metadata]")
  }

  val receiveRecover: Receive = Actor.emptyBehavior

  def updateState(evt: Evt): Unit = evt match {
    case sent: MsgSent =>
      deliver(student) { deliveryId =>
        log.info(s" $deliveryId : ${sent.multiply} = ?")
        state = state.updated(sent)
        StudentActor.Msg(deliveryId, sent.multiply)
      }
    case confirm: MsgConfirmed => confirmDelivery(confirm.deliveryId)
  }

}

AtLeastOnceDeliveryトレイトはPersistentActorトレイトをミックスインしていますので、StudentActorと同様にpersistenceId、receiveCommand、receiveRecoverを実装します。AtLeastOnceDelivery(PersistentActor)を使用するときは、「!」でメッセージを送信するのではなく、persistでメッセージを永続化し、AtLeastOnceDelivery#deliver()で送信します。送信したメッセージがStudentActorに届いたことを確認するため、Confirmメッセージを受信し、persist(永続化)します。

  • StudentActorの実装
StudentActor
  def receiveCommand: Receive = {
    case Msg(deliveryId, Multiply(n1, n2)) =>
      log.info(s" $deliveryId :  calculating ... ${Multiply(n1, n2)} = ${n1 * n2}")
      persist(Msg(deliveryId, Multiply(n1, n2)))(updateState)
      sender() ! TeacherActor.Confirm(deliveryId)
    case "print" => log.info(s"  $state")
  }

StudentActorはメッセージを受信した後、sender()に対して確認メッセージを送り、TeacherActorで受信します。一定時間内(デフォルト5秒)に受信できなかった場合は、メッセージを再送します。この仕組みにより、StudentActorがダウンしていてもメッセージが届くまで(当然上限はありますが)再送を繰り返すことにより、必ずメッセージが処理されることになります。AtLeastOnceDeliveryを使うとアプリケーション側でメッセージの到達確認や再送を実装する必要はありません。

では、動きを確認してみましょう。

実行結果

  • TeacherActor
log
akka-message-delivery-reliability $sbt "run-main school.atleastonce.MathApp teacher"
[TeacherActor] -  1 : 6 * 5 = ?
[TeacherActor] -  1 :   received
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[TeacherActor] -  2 : 8 * 4 = ?
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://StudentSystem@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.atleastonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#-885507212] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://StudentSystem@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.atleastonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#-885507212] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  3 : 8 * 6 = ?
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.atleastonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#-885507212] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.atleastonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#-885507212] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  3 :   received
[TeacherActor] -  2 :   received
[TeacherActor] -  4 : 6 * 7 = ?
[TeacherActor] -  4 :   received
[TeacherActor] -  5 : 4 * 4 = ?
[TeacherActor] -  5 :   received
  • StudentActor
log
akka-message-delivery-reliability $sbt "run-main school.atleastonce.MathApp student"
[StudentActor] -  1 :  calculating ... 6 * 5 = 30
^Cakka-message-delivery-reliability $sbt "run-main school.atleastonce.MathApp student"
[StudentActor] -  3 :  calculating ... 8 * 6 = 48
[StudentActor] -  2 :  calculating ... 8 * 4 = 32
[StudentActor] -  4 :  calculating ... 6 * 7 = 42
[StudentActor] -  5 :  calculating ... 4 * 4 = 16
[StudentActor] -   result = List(1, 3, 2, 4, 5)

2つのActorシステムを起動し、先ほどと同じように途中でStudentActorをストップさせてみました。StudentActorがダウンしている間のメッセージはDeadLetterになります。StudentActorを再び起動させるとダウンしていた間に送られたメッセージ「2 : 8 * 4 = ?」と「3 : 8 * 6 = ?」を受信しつつ、新たに送られたメッセージ「4 : 6 * 7 = ?」と「5 : 4 * 4 = ?」も処理しました。TeacherActorを見るとStudentActorがダウンしてい間は確認メッセージは受信できず、再開後にまとめて受信していることが分かります。ただし、この場合順序は保証されないので注意が必要です。これでat-least-onceが実現できていることが確認できました。

at-least-once1.png

exactly-once

次に、もしStudentActorから送られる確認メッセージが届かなかった場合にどうなるかを考えてみます。
AtLeastOnceDeliveryでは、確認メッセージを受信したことで送信先にメッセージが届いたことを認識するので、ネットワーク障害等で確認メッセージを受信できなかった状態を擬似的に起こすため、ランダムに確認メッセージの受信を失敗させてみます。

実装

  • TeacherActor
TeacherActor
    case Confirm(deliveryId) =>
      // デリバリーIDが偶数の場合、1/2の確率で確認メッセージを受信できない
      if (deliveryId % 2 == 0 && Random.nextInt(2) == 0) {
        log.info(s" $deliveryId :   lost confirmation message")
      } else {
        log.info(s" $deliveryId :   received")
        persist(MsgConfirmed(deliveryId))(updateState)
      }

確認メッセージは2分の1の確率で受信に失敗し、「lost confirmation message」というログを出力するようにします。

実行結果

log
akka-message-delivery-reliability $sbt "run-main school.atleastonce.MathApp"
[TeacherActor] -  1 : 9 * 8 = ?
[StudentActor] -  1 :  calculating ... 9 * 8 = 72
[TeacherActor] -  1 :   received
[TeacherActor] -  2 : 8 * 7 = ?
[StudentActor] -  2 :  calculating ... 8 * 7 = 56
[TeacherActor] -  2 :   lost confirmation message
[TeacherActor] -  3 : 6 * 2 = ?
[StudentActor] -  3 :  calculating ... 6 * 2 = 12
[TeacherActor] -  3 :   received
[StudentActor] -  2 :  calculating ... 8 * 7 = 56
[TeacherActor] -  2 :   received
[TeacherActor] -  4 : 8 * 4 = ?
[StudentActor] -  4 :  calculating ... 8 * 4 = 32
[TeacherActor] -  4 :   received
[TeacherActor] -  5 : 2 * 3 = ?
[StudentActor] -  5 :  calculating ... 2 * 3 = 6
[TeacherActor] -  5 :   received
[StudentActor] -   result = List(1, 2, 3, 2, 4, 5)

2番目のメッセージ「2 : 8 * 7 = ?」に注目して下さい。TeacherActorは確認メッセージの受信に失敗(lost confirmation message)し、メッセージを再送しています。これによりStudentActorは同じ問題を2度解いていることが最終行の処理済データから確認できます。
実際はメッセージは届いていたのに、確認メッセージの受信に失敗しただけなので、再送することによりメッセージが重複してしまいます。AtLeastOnceDeliveryでは、少なくとも1度メッセージが届くことを保証しますが、正確に1回のみ届くことは保証されません。exactly-onceではないのです。

at-least-once2.png

exactly-onceを実現するために、StudentActorで重複したメッセージは処理しないようにします。今回受信したメッセージのクリア処理は考えず、シンプルに蓄積した情報の中に配信メッセージが存在した場合は、重複と判断します。

  • StudentActorの実装
StudentActor
    case Msg(deliveryId, Multiply(n1, n2)) =>
      if (state.contains(Msg(deliveryId, Multiply(n1, n2))))
        log.info(s" $deliveryId :  already calculated")
      else {
        log.info(s" $deliveryId :  calculating ... ${Multiply(n1, n2)} = ${n1 * n2}")
        persist(Msg(deliveryId, Multiply(n1, n2)))(updateState)
      }
      sender() ! TeacherActor.Confirm(deliveryId)

永続化のために蓄積したstateにメッセージが存在する場合は、重複メッセージと判断し、計算処理を行いません(ログに「already calculated」を出力)。

実行結果

log
akka-message-delivery-reliability $sbt "run-main school.exactlyonce.MathApp"
[TeacherActor] -  1 : 7 * 6 = ?
[StudentActor] -  1 :  calculating ... 7 * 6 = 42
[TeacherActor] -  1 :   received
[TeacherActor] -  2 : 1 * 6 = ?
[StudentActor] -  2 :  calculating ... 1 * 6 = 6
[TeacherActor] -  2 :   lost confirmation message
[TeacherActor] -  3 : 5 * 7 = ?
[StudentActor] -  3 :  calculating ... 5 * 7 = 35
[TeacherActor] -  3 :   received
[StudentActor] -  2 :  already calculated
[TeacherActor] -  2 :   received
[TeacherActor] -  4 : 1 * 6 = ?
[StudentActor] -  4 :  calculating ... 1 * 6 = 6
[TeacherActor] -  4 :   received
[TeacherActor] -  5 : 8 * 8 = ?
[StudentActor] -  5 :  calculating ... 8 * 8 = 64
[TeacherActor] -  5 :   received
[StudentActor] -   result = List(1, 2, 3, 4, 5)

これでexactly-onceが実装できました。2番目のメッセージ「2 : 1 * 6 = ?」はTeacherActorで確認メッセージの受信に失敗し、再送していますが、StudentActorで重複メッセージと判断し計算を行っていません。先生から出された問題はただ一度だけ生徒が解くようになりました。

exactly-once1.png

最後に、StudentActorがダウンしたときの動作も確認しておきます。

実行結果

  • TeacherActor
log
akka-message-delivery-reliability $sbt "run-main school.exactlyonce.MathApp teacher"
[TeacherActor] -  1 : 4 * 9 = ?
[TeacherActor] -  1 :   received
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Disassociated]
[TeacherActor] -  2 : 6 * 1 = ?
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://StudentSystem@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.exactlyonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#79590160] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[ReliableDeliverySupervisor] - Association with remote system [akka.tcp://StudentSystem@127.0.0.1:2552] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://StudentSystem@127.0.0.1:2552]] Caused by: [Connection refused: /127.0.0.1:2552]
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.exactlyonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#79590160] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  3 : 2 * 1 = ?
[RemoteActorRefProvider$RemoteDeadLetterActorRef] - Message [school.exactlyonce.StudentActor$Msg] from Actor[akka://TeacherSystem/user/teacherActor#79590160] to Actor[akka://TeacherSystem/deadLetters] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[TeacherActor] -  2 :   lost confirmation message
[TeacherActor] -  3 :   received
[TeacherActor] -  2 :   received
[TeacherActor] -  4 : 9 * 4 = ?
[TeacherActor] -  4 :   lost confirmation message
[TeacherActor] -  4 :   lost confirmation message
[TeacherActor] -  5 : 7 * 5 = ?
[TeacherActor] -  5 :   received
[TeacherActor] -  4 :   received

  • StudentActor
log
akka-message-delivery-reliability $sbt "run-main school.exactlyonce.MathApp student"
[StudentActor] -  1 :  calculating ... 4 * 9 = 36
^Cakka-message-delivery-reliability $sbt "run-main school.exactlyonce.MathApp student"
[StudentActor] -  2 :  calculating ... 6 * 1 = 6
[StudentActor] -  3 :  calculating ... 2 * 1 = 2
[StudentActor] -  2 :  already calculated
[StudentActor] -  4 :  calculating ... 9 * 4 = 36
[StudentActor] -  4 :  already calculated
[StudentActor] -  5 :  calculating ... 7 * 5 = 35
[StudentActor] -  4 :  already calculated
[StudentActor] -   result = List(1, 2, 3, 4, 5)

1番目のメッセージ「1 : 7 * 6 = ?」を受信した後、StudetnActorをダウンさせましたが、最終的にTeacherActorからのメッセージを重複なく必ず一度処理していることが分かります。

おわりに

at-most-once、at-least-once、exactly-onceと順に実装、処理コスト(やり取りするメッセージ数)が共に増えていきました。Akkaが一番得意とするのはat-most-onceで、必要に応じてAtLeastOnceDeliveryトレイトを使ってat-least-onceを実装し、exactly-onceは業務に特化したID等を用いて重複排除を実装するのが良さそうです。

No 種類 実装方法
1 at-most-once Akkaのデフォルト実装
2 at-least-once AtLeastOnceDeliveryを使って
到達確認メッセージを受け取り
3 exactly-once 2の対応を行った上で業務IDを
用いて重複排除

このあたりは、Message Delivery Reliability-Discussion: Why No Guaranteed Delivery?とか、そこからリンクの貼られているNobody Needs Reliable Messagingに詳しく書かれています。Nobody Needs Reliable Messagingにあるように、「処方箋ID」のような業務IDは必ず存在するし、更新処理等を伴う場合は必ず重複チェックは実装するものなので、重複の排除は業務側で実装すべきだと考えられます。Akka側でやろうとすると余計なコストがかかってしまいそうですし。

というわけで、今回はAkkaのメッセージの信頼性について考えていました。必要に応じて信頼度を上げ、確実なメッセージ送信の実装ができ、エンタープライズのがっちりな場面でも利用できることがお分かり頂けたことでしょう。

説明で使用したソースコードの全体はGitHubで公開しています。
実装にあたり、Typesafe Activatorの2つのサンプルを参考にさせて頂きました。