Scala
Akka

Akka実践バイブルをゆっくり読み解く 第9章メッセージのルーティング

Akka実践バイブルをゆっくり読み解く企画の第9章です。
4ヶ月かけてようやく折り返し地点です。

第9章 メッセージのルーティング

処理を分散させる上でルーティングは避けて通れません。ましてや、アクター自身が状態を持つことができてしまうので、自由に分散させるわけにもいきません。今回の章も前回に引き続き、エンタプライズインテグレーションパターン(EIP)が出てきます。今回使用するのは、EIPのルーターパターンです。

ルーティング

スケールアップしたりスケールアウトする際に不可欠となる。ルーティングを使う理由には3つある。

理由 詳細
パフォーマンス 処理を並列化することによって、処理時間を短縮する。処理を並列で実施可能にするために、メッセージは異なるインスタンスに分割すべき。
受信したメッセージの内容 メッセージの持つ属性の値に応じてメッセージを別のタスクに移管する。これは8章で学習済。
ルーターの状態 処理を行うための条件を満たしていない場合は、全てのメッセージをクリーニングステップに転送すべき。条件が満たしている場合は通常どおりの処理を行う。

メッセージのルーティングを行う理由が『パフォーマンス』や『スケーリング』である場合は、Akkaに組み込まれている最適化されたルーターを使用するべきで、『メッセージの内容』や『状態』が主要な関心事である場合は、通常のアクターを使用すべき。

ルーターパターン

登場人物は以下の2人。

  1. ルーター:メッセージのルーティングを行う
  2. ルーティー:ルーターが選択するタスク振り分け先。ルーティングされる対象。

ルーターを使った負荷分散

個々のアクターの負荷を分散させるために、ルーターはメッセージが来た際に現在利用可能なプロセスにメッセージを送信する。次のメッセージが来たら、またその時に利用可能なプロセスを探してメッセージを送信する。この処理を、Akkaでは「ルーティングロジック」と「ルーターアクター」によって実現する。

「ルーターアクター」は「ルーティングロジック」を利用してルーティーを決定する。「ルーティングロジック」は複数存在し、どのルーティーを選択するかをそれぞれの選択ロジックに応じて決定する。Akkaに組み込まれている「ルーターアクター」には以下の2種類が存在する。

プールルーター

ルーターがルーティーを管理する。ルーターがルーティーを生成する。全てのルーティーを同じ方法で生成・分散し、ルーティーの特殊な障害回復が必要ない場合に使用する。シンプルに扱うことができるが、ルーティーを生成するロジックをカスタマイズすることはできない。

グループルーター

ルーターはルーティーを管理しない。ルーティーの生成も直接行わず、ルーターはルーティーを利用しているにすぎない。ルーティーのライフサイクルを特別な方法で制御する必要がある場合や、ルーティーがインスタンス化される場所をより詳細に制御したい場合に使用する。

ルーティングロジック

プールルーターでもグループルーターでも様々なルーティングロジックを使用することが可能だが、プールルーターにしか対応していないルーティングロジックも存在する。

SmallestMailboxRoutingLogic

メールボックスが最小のルーティーを選択するロジック。グループルーターはメールボックスのサイズを把握する事が出来ないため、グループルーターでは使用することができない。

BalancingPool

「ルーティングロジック」ではなく、「ルーターの特性」となるが、メッセージをアイドル状態のルーティーに配信する。これは他のルーターと内部的に異なり、グループルーターではこのような特性を持ったルーターを用意することができない。

プールルーターによるルーティング

プールの使用方法には、設定ファイルを使う方法とソースコード内で設定を行う方法の2つがある。
以下は、設定ファイルによるプールルーターの生成。

val router = system.actorOf(FromConfig.props(Props(new GetLicense(endProbe.ref))), "poolRouter")

設定ファイルには以下のように記述する。

設定ファイル
akka.actor.deployment {
  # ルーターのフルネーム上記コードの文字列"poolRouter"に対応
  /poolRouter {
    # ルーターが使用するロジック
    router = balancing-pool
    # ルーティーの数
    nr-of-instance = 5
  }
}

ルーターを用意する場合は、まずはKillまたはPoisonPillメッセージを送信する。これにより、ルーターが終了させられることによって配下のルーティーたちも連鎖的に終了することになり、初期化される。

ルーターは通常1つのメッセージを1つのルーティーに送信するが、Broadcastメッセージを使用することで全てのルーティーにメッセージを送信することもできる。ただし、BalancingPoolだけは仕組み上、Broadcastが機能しないので注意が必要。

リモートルーティー

当然、ルーティーをリモートサーバー上で動作するアクターに指定することもできる。この場合、RemoteRouterConfigを使用する。リモートサーバー上にルーティーを配置する場合は、ラウンドロビン式で順番にルーティーが均等に配置される。

動的なサイズ変更

akka.actor.deployment.ルーター名.resizerの設定によってルーティーのサイズを動的に変更することができる。ルーティー数の上限/下限や増減のタイミング、スピードを設定することができる。

増減のタイミング

メールボックス内にpressure-thresholdで設定された数のメッセージが溜まっている状態を「プールが圧迫されている」状態とする。プールが圧迫された時に新しいルーティーが生成される。
逆に、ビジー状態のルーティーがbackoff-thresholdによって設定された割合の数を下回ると、ルーティーが減らされる。ルーティーが10個生成されているとして、backoff-thresholdが0.3の時にビジーなルーティーが3つを下回るとルーティーが減らされる。

増減のスピード

rampup-rate/backoff-rateで設定された割合のルーティーが増減される。設定される割合が大きくなればなるほど一度に増減されるルーティーの数が大きくなる。

監督

プールルーターの場合、ルーティーから見たルーターはスーパーバイザーとなる。つまり、ルーティーがクラッシュしてしまうとそれがルーターにエスカレーションされてしまう。この時にルーティーを再起動するのではなく、ルーターを再起動してしまう。これは、ルーターの生成時に独自の戦略を与えることで、障害の発生したルーティーだけが再起動されるようにすることができる。

グループルーターによるルーティング

プールルーターは生成時にルーティーの数を指定するのに対して、グループルーターは生成時にルーティーのパスを指定する。グループルーターも設定ファイルを使う方法とソースコード内で設定を行う方法の2つがある。
以下は設定ファイルによるグループルーターの生成。

val router = system.actorOf(FromConfig.props(), "groupRouter")

設定ファイルには以下のように記述する。

設定ファイル
akka.actor.deployment {
  # ルーターのフルネーム上記コードの文字列"groupRouter"に対応
  /groupRouter {
    # ルーターが使用するロジック
    router = round-robin-group
    # ルーティーのアクターパス
    routes.paths = [
      "user/Creator/GetLicense0",
      "user/Creator/GetLicense1",
      ...
    ]
  }
}

グループルーターがプールルーターと異なるのは、ルーティーが終了する際の挙動にある。

ルーター 挙動
プールルーター ルーティーの終了をルーターが検知して、プールからルーティーを削除する。
グループルーター 自動検知しない。終了したルーティーにもメッセージを送信しようとする。

グループルーターの場合は、ルーティーの終了時に新しいアクターを生成してルーティーを再生成してやる必要がある。

override def preStart(): Unit = {
  super.preStart()
  (0 until  nrActors).map(nr => { // nrActorsにはルーティー数を設定する
    val child = context.actorOf(Props(new GetLicense(nextStep)), "GetLicense"+nr)
    // 生成したルーティーに対してwatchを使用する
    // これによって、アクターが終了した時にTerminatedのメッセージを受信することができるようになる
    context.watch(child)
  })
}

def receive = {
  case Terminated(child) => {
    // watchによってアクターの終了を検知することができるようになっている
    // Terminatedを受信したら新しいアクターを生成してwatchを使用する
    val newChild = context.actorOf(Props(new GetLicense(nextStep)), child.path.name)
    context.watch(newChild)
  }
}

動的なサイズ変更

プールルーターのように設定ファイルで簡単にサイズを変更することはできない。手動にて変更することは可能だが、書籍には『数多くの落とし穴があるため、利用はなるべく控えてください』って書いてある。実際、書籍のサンプルコードもちょっとややこい。
それでもサイズ変更をしたい場合に使用するのは以下の機能。

メッセージ メソッド(代表的だと思われるものをチョイス) 概要
GetRoutees getRoutees(): java.util.List[Routee] ルーター内の全てのルーティーをListで返す
AddRoutee(routee: Routee) addRoutee(routee: Routee): Unit ルーティーを追加する
RemoveRoutee(routee: Routee) removeRoutee(routee: Routee): Router ルーティーを削除する

実際にルーティーのサイズ変更を行う場合は、直接メソッドを呼ぶのではなくメッセージにて操作を行う。

メセージの追加の例
router ! AddRoutee(ActorRefRoutee(child))    // childはルーティー

メッセージのパラメータにRouteeを渡す必要があるが、これはActorRefから変換する必要がある。方法としては以下の3つの方法がある。基本的にはActorSelectionRouteeを使用する。

  • ActorRefRoutee(ref: ActorRef)
  • ActorSelectionRoutee(selection: ActorSelection)
  • SeveralRoutees(routees: immutable.IndexedSeq[Routee])

ConsistentHashingルーター

処理を各ルーティーに分散するにあたって、アクターが状態を持る場合は分散先を考慮する必要がある。例えば、8章のアグリゲータパターンのように、同一IDのメッセージは同一ルーティーに振り分けないと永遠にマッチングが完了しなくなってしまう。このような場合は、ハッシュ化させたキーによって振り分け先を決定するようにする。

  1. メッセージをメッセージキーに変換する
  2. メッセージキーをハッシュコードに変換する
  3. ハッシュコードから仮想ノードへマッピングする
  4. 仮想ノードからルーティーへマッピングする

「メッセージキー」の型は問わず、同一種類のメッセージが同一のキーを持ちさえすれば良い。8章のアグリゲータパターンではidがこのキーに該当する。

ConsistentHashingルーターを実現するための具体的な方法は数種類提供されている。

ルーター内で部分関数を指定

ルーターを生成する際に、メッセージキーを選択する部分関数を定義しておく。これにより、部分関数で定義したキーによってルーティーの振り分け先が決定されるようになる。

ルーター内に定義
def hashMapping: ConsistentHashing = {
  case msg: GatherMessage => msg.id
}
ルーター生成
val router = 
  system.actorOf(
    ConsistentHashingPool(
      10,
      virtualNodesFactor = 10,   // ルーティーごとに仮想ノード数を指定する
      hashMapping = hashMapping  // 上で宣言した部分関数を指定
    ).props(Props(new SimpleGather(endProbe.ref))),
    name = "routerMapping"
  ) 

メッセージがハッシュマッピングを行う

ConsistentHashableトレイトを継承することで、メッセージ自身にキーへの変換ロジックをもたせることが可能となる。この場合、ルーター生成時にhashMappingに部分関数を指定する必要はない。

// キーへの変換ロジックは、メッセージとなるクラス内でoverrideして実装する
override def consistentHashKey: Any = id

送信元がハッシュマッピングを行う場合

メッセージの送信時にConsistentHashableEnvelopeを使用する。ただし、以下の理由により送信元/送信先が蜜結合となってしまうため、非推奨とされている。

  • メッセージ送信先のActorRefConsistentHashableRouterを使っている必要がある
  • メッセージの送信元によって分散方法が決定される
val router = ...
// hashKeyにはキーを指定する。このキーによって振り分け先が決定される。
router ! ConsistentHashableEnvelope(message = メッセージ本体, hashKey = "ID")

アクターを使ったルーターパターンの実装

冒頭にも記述してあるとおり、『メッセージの内容』や『状態』が主要な関心事である場合は、通常のアクターを使用すべき。実は、通常のアクターでもルーターパターンを実装することができる。なお、『メッセージの内容』によるルーティングは8章でも登場しているような、メッセージ内容に応じてフローを決定するルーティングを指している。

状態ベースのルーティング

ルーターの状態に応じてルーティングの振る舞いを変更する。以下の例では、ON/OFFの状態に応じてメッセージの送信先を変更している。また、ON/OFFの切り替えはRouteStateOnまたはRouteStateOffメッセージを送信することによって行う。

状態によるルーティング
// ON用のメッセージ
case class RouteStateOn()
// OFF用のメッセージ
case class RouteStateOff()

class SwitchRouter(normalFlow: ActorRef, cleanUp: ActorRef) extends Actor {

  // 状態がONの時のReceive
  def on: Receive = {
    // 状態をOFFにする 
    case RouteStateOff => context.become(off)
    // 状態がONの場合は、通常のフローにメッセージを送信する
    case msg: AnyRef => normalFlow ! msg
  }

  // 状態がOFFの時のReceive
  def off: Receive = {
    // 状態をONにする
    case RouteStateOn => context.become(on)
    // 状態がOFFの場合は、クリーンアップ用の処理にメッセージを送信する
    case msg: AnyRef => cleanUp ! msg
  }

  // receiveの初期状態はOFF
  def receive = {
    case msg: AnyRef => off(msg)
  }
}

become(behavior: Actor.Receive): Unitreceiveに設定したい関数を渡すと、その関数がreceive関数となる。これによって状態による挙動の変化を実現することができる。
また、becomeに対してunbecome(): Unitという関数も用意されており、unbecomeを使用するとbecomeで設定したreceive関数への設定をクリアして元の関数(上記の場合であればoff)が使用されるようになる。

個人的まとめ

大量のリクエストが想定されてパフォーマンスが厳しくなることが予想されるサービスにおいて、ルーターで気軽に処理先を増減させられるようにしておくのはとても便利。ただ、グループルーターはやっぱりよくわからん。動的なサイズ変更さえしなければ比較的シンプルに細かいルーティングができるんだろうけど、グループルーターが必要なシーンが全然想像つかないなー・・・。