AkkaActorとルーティング
はじめに
最近↑を読んでいました。
router使ったことなかったので使いつつ、自分の中でまとめてみました。
本文
- なんで必要なの?
- システムのスケールアップ、スケールアウトのため
- スケールアウトのためには同じ動きをするアクターが複数必要になる
[module A] ---> [module B]
[module A] ---> [module B-1]
└-> [module B-2]
└-> [module B-3]
- module Bが複数になりスケールアウトしました。
- しかし、module Aからすると [B-1], [B-2], [B-3]も同じようなものです
- 負荷状況に応じてどのmodule Bへメッセージを送るかを決めるのは誰?
- AではAのしたい本来の仕事から外れるからAにはさせたくないね
- そこでRouter
[module A]---> [routerB] ---> [module B-1]
└-> [module B-2]
└-> [module B-3]
- routerBが[module A]から受け取ったメッセージを[module B-N]へ流すよ
- routerB、つまりRouterがメッセージを実際に処理してくれる場所に、受け取ったメッセージを送る仕事を請け負う
-
そのとき、実際に処理をしてくれる場所の負荷状況を見て、メッセージを送る先をコントロールしたり、実際に処理をしてくれる場所を増やしたり減らしたりっていうのをakka は実装を提供してくれるから使用するときには設定の記述だけで、負荷によるスケールアウトを実現できるよ
-
routerBのことは「ルーター」、module B-Nのようなメッセージが送りつけられるものを「ルーティー」と呼ぶ
-
akkaのRouter
- akkaは2種類のルータを提供してくれている
- プールルータ
- グループルータ
プールルータ
- ルータがルーティを管理する
- 上の例だと、routerBが module B-Nを管理する
- module B-Nを生成したり、終了したら送り先から消したり
- すべてのルーティが同じ方法で生成、分散され、ルーティの特殊な障害回復が必要ないときに使える
グループルータ
- ルーティはルータとは別で生成するようにアプリケーションを作っておく
- ルーティの生成、終了時の処理を詳細にコントロールしたいときに使う
プールルータを使ってみる
- 設定ファイルで数値をいじることによって様々な挙動の調整ができる
- ex. 負荷がどこまで高まったらルーティを増やすか
プロジェクト構成
├── build.sbt
├── project
│ └── build.properties
├── src
│ ├── main
│ │ ├── resources
│ │ │ └── application.conf
│ │ └── scala
│ │ └── Main.scala
confファイル
application.conf
akka.actor.deployment {
/poolRouter {
router = balancing-pool # poolRouterでのルーティングのロジック ラウンドロビンとか他にもロジックがある
nr-of-instances = 5 # ルータによってつくられるメッセージを受け取るアクターの数
}
}
Main.scala
import akka.actor.{Actor, ActorLogging, ActorSystem, Props}
import akka.routing.FromConfig
import scala.io.StdIn
object Main {
def main(args: Array[String]): Unit = {
val system = ActorSystem()
val router = system.actorOf(
FromConfig.props(Props(new HelloActor)),
"poolRouter"
)
for (i <- 0 to Int.MaxValue) {
router ! i
Thread.sleep(1000)
}
}
}
class HelloActor extends Actor with ActorLogging {
override def preStart(): Unit = {
super.preStart()
log.info(self.path.address.toString) // actorがスタートしたときに自らのアドレスをログに残す
}
override def receive: Receive = {
case number: Int => log.info(s"hello: s${number}")
}
}
build.sbt
name := "router-sample"
version := "0.1"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.5.6"
)
build.properties
sbt.version = 1.1.0
動かしてみる
sbt "run"
出てくるログを眺める
- poolRouter/${a-e}までがログにでてくるのが確認できる
-
nr-of-instances = 5
で指定した数が生成されました
[INFO] [01/24/2018 19:34:43.561] [default-BalancingPool-/poolRouter-8] [akka://default/user/poolRouter/$d] akka://default/user/poolRouter/$d
[INFO] [01/24/2018 19:34:43.561] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] akka://default/user/poolRouter/$a
[INFO] [01/24/2018 19:34:43.561] [default-BalancingPool-/poolRouter-7] [akka://default/user/poolRouter/$c] akka://default/user/poolRouter/$c
[INFO] [01/24/2018 19:34:43.561] [default-BalancingPool-/poolRouter-6] [akka://default/user/poolRouter/$b] akka://default/user/poolRouter/$b
[INFO] [01/24/2018 19:34:43.561] [default-BalancingPool-/poolRouter-9] [akka://default/user/poolRouter/$e] akka://default/user/poolRouter/$e
- メッセージが流れ出すと...
-
akka://default/user/poolRouter/$a
ばかりがメッセージを処理しているようだ。 - これはルータのロジックを
application.conf
でbalancing-pool
にしたからだ。 -
balancing-pool
ではルーティのなかからその時処理を行なっていないものが使われる - ログを出すだけだとルーティは1つで全て捌けてしまうのだった
-
[INFO] [01/26/2018 10:53:17.532] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s0
[INFO] [01/26/2018 10:53:18.503] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s1
[INFO] [01/26/2018 10:53:19.507] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s2
[INFO] [01/26/2018 10:53:20.511] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s3
[INFO] [01/26/2018 10:53:21.511] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s4
[INFO] [01/26/2018 10:53:22.515] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s5
[INFO] [01/26/2018 10:53:23.518] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s6
[INFO] [01/26/2018 10:53:24.520] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s7
[INFO] [01/26/2018 10:53:25.523] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$a] hello: s8
- balancing-poolでルーティが活用されるのをみるために修正
- HelloActorの中でメッセージが来るたびにフィボナッチ数列を求めるようになりまあまあ忙しくなります
class HelloActor extends Actor with ActorLogging {
override def preStart(): Unit = {
super.preStart()
log.info(self.path.toString)
}
override def receive: Receive = {
case number: Int =>
log.info(s"hello: s$number fibo: s${fibo(30)}")
}
def fibo(n: BigDecimal): BigDecimal = {
n match {
case zeroOrOne if n == 1 || n == 0 => 1
case num => fibo(n - 2) + fibo(n - 1)
}
}
}
- 実行してログを見てみると
- poolRouter/${a-e}までがログにでてくるのが確認できる
[INFO] [01/26/2018 12:07:41.131] [default-BalancingPool-/poolRouter-7] [akka://default/user/poolRouter/$a] hello: s0 fibo: s1346269
[INFO] [01/26/2018 12:07:41.133] [default-BalancingPool-/poolRouter-8] [akka://default/user/poolRouter/$e] hello: s1 fibo: s1346269
[INFO] [01/26/2018 12:07:41.134] [default-BalancingPool-/poolRouter-6] [akka://default/user/poolRouter/$c] hello: s2 fibo: s1346269
[INFO] [01/26/2018 12:07:41.147] [default-BalancingPool-/poolRouter-5] [akka://default/user/poolRouter/$b] hello: s4 fibo: s1346269
[INFO] [01/26/2018 12:07:41.149] [default-BalancingPool-/poolRouter-9] [akka://default/user/poolRouter/$d] hello: s3 fibo: s1346269
[INFO] [01/26/2018 12:07:41.241] [default-BalancingPool-/poolRouter-6] [akka://default/user/poolRouter/$c] hello: s7 fibo: s1346269
[INFO] [01/26/2018 12:07:41.251] [default-BalancingPool-/poolRouter-8] [akka://default/user/poolRouter/$e] hello: s6 fibo: s1346269
所感
めっちゃ簡単やん...