とりあえずコードを書いて理解する
以下の依存関係をgradleに追加してください
def versions = [
ScalaBinary: "2.13"
]
dependencies {
implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.19")
implementation "com.typesafe.akka:akka-actor-typed_${versions.ScalaBinary}"
}
イントロダクション
Akkaを使用するとリアクティブシステムを構築する際に最低限必要な基本的機能を提供する低レベルコードを書かなくてすみます。
このことを理解するために、Akkaが内部で作成管理するアクターとの関係、アクターのライフサイクル、及び障害対策について紹介します。
ちなみにこれから紹介していくコードはすべてkotlinです。
Akkaアクターの階層
Akkaアクターには階層構造が存在し、制作するアクターには常に何らかの親アクターが存在します。
最初のアクターはActerSystem
によって作成し、アクターを追加で作成するには、ActorContext.spawn()
を呼び出します。
親アクターの中で新しく作成されたアクターはそのアクターの子アクターとなります。
必ず親がいるのならこういった疑問が湧くと思います。ActerSystem
によって最初に作成されたアクターは誰が親なんだ?と
下図に示すようにすべてのアクタは共通の親であるガーディアン(見守る人とか管理する人みたいな意味)
を持っており、これは ActorSystem
の起動時に
アクターを作成するとそのアクターにアクセスするための参照ActorRef
が返されますが、その参照は文字列形式のURLで表されます。
例えば ActorSystem.create(someBehavior, "someActor")
を使ってActorSystemから直接someActorという名前のアクターを作成すると、その参照には /user/someActor
というパスになります。
/user/
の部分がガーディアンアクターのURLです。
更に詳しく言うとActorSystem.create()
によって最初のアクターが起動する前にAkkaは3つのアクターを起動しています。
-
/
ルートガーディアン。これはシステム内のすべてのアクターの親であり、システム自体が終了したときに最後に停止します。
(後で説明しますが、Actorは停止する際に子から再帰的に停止していきます) -
/system
システムガーディアン。Akkaまたは Akkaの上に構築された他のライブラリは、こいつの下で作成されます。
(ログ機能とかAkka Shardingとかで作成されるアクターとかとか。実装の面では気にしなくて良い) -
/user
ユーザーガーディアン。これは、アプリケーション内でユーザーが定義したすべてのアクターを起動する際の頂点的なアクターです。
アクター階層を実際に見る最も簡単な方法は、ActorRef
インスタンスをtoString()
を使って出力することです。
今回のサンプルではアクターを作成し、その参照を表示しこのアクターの子を作成し、その子の参照を表示します。
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.javadsl.AbstractBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.Behaviors
import akka.actor.typed.javadsl.Receive
// 最後に呼ばれるアクター(子供)
class Second private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
.onMessageEquals("apply2") { apply2() }
.build()
}
private fun apply2(): Behavior<String?> {
val secondRef = context.spawn(Behaviors.empty<String>(), "second-actor")
println("Second: $secondRef")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup { context: ActorContext<String> -> Second(context) }
}
}
}
// 最初に呼ぶアクター(今回は親)
class First private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
.onMessageEquals("apply1") { apply1() }
.build()
}
private fun apply1(): Behavior<String> {
val firstRef = context.spawn(Second.create(), "first-actor")
println("First: $firstRef")
firstRef.tell("apply2")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup { context: ActorContext<String> -> First(context) }
}
}
}
// エントリーポイント
fun main() {
val first = ActorSystem.create(First.create(), "first")
first.tell("apply1")
}
最初のアクタにメッセージを出して依頼をする方法に注目してください。
firstRef.tell("apply2")
というようにアクター参照に対してtellメソッドを使うとそのアクターに対してメッセージを送信することができます。
これがメッセージパッシングの基本です。
このコードが実行されると、以下のように出力されるはずです。
First: Actor[akka://testSystem/user/first-actor#-173533558]
Second: Actor[akka://testSystem/user/first-actor/second-actor#-1659852452]
なんでこんな階層構造なんて採用してるんでしょうか?と不思議に思うかもしれません。
階層構造の重要な役割は、アクターのライフサイクル
とスーパーバイザ戦略
を安全に管理することにあります。
この2つはAkkaにおいて極めて重要な概念になるので完璧に理解しておきましょう。
階層構造はメッセージパッシングとは関係ないので、親子以外な関係のアクターにもActorRefさえあればメッセージを送信することができます。
アクーターのライフサイクル
アクターが作成されたら基本的にユーザーが止めてあげないかぎり生き続けます
。(ガベージコレクションされない)
またアクターが停止されるとそのアクターが所持している小アクターも全て再帰的に停止
していきます。
この再帰的な停止により、リソースのクリーンアップなどが非常に簡単になりオープンソケットやファイルによるリソースリークを回避することができます。
実際低レベルのマルチスレッドコードを扱うときに自前でこれを実装するのはとても大変ですし、この機能がないとアクターが残りまくってメモリリークを引き起こします。
アクターを停止するには、ユーザーが定義した停止メッセージをトリガーにアクターのビヘイビア内部でBehaviors.stopped()
を返して停止させるのが推奨パターンです。
また親アクターからcontext.stop(childRef)
を呼び出すことでも可能ですが、あくまでも親が保持している子アクターの停止しかできません。
Akkaアクターは、ライフサイクルの度にいくつかのライフサイクルシグナルを発生させています
たとえば、PostStop
はアクターが停止した直後に送信されるシグナルです。
ではライフサイクルシグナル PostStop
を使って、アクターを停止させたときの挙動を観察する簡単な実験をしてみましょう。
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.javadsl.AbstractBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.Behaviors
import akka.actor.typed.javadsl.Receive
// 最初に停止されるアクター
class StartStopActor2 private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
init {
println("Second Start")
}
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
// PostStopシグナルが送信されたらonPostStop()実行
.onSignal(PostStop::class.java) { _ -> onPostStop() }
.build()
}
private fun onPostStop(): Behavior<String> {
println("Second Stop!")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup {
context: ActorContext<String> -> StartStopActor2(context)
}
}
}
}
// 2番めに停止されるアクター(停止メッセージはこいつが受ける)
class StartStopActor1 private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
init {
println("First Start")
//どこの参照にも入れないけどとりあえずアクター作成
//アクターはガベージコレクションされないから参照に入れる必要が無い。
context.spawn(StartStopActor2.create(), "second")
}
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
// stopメッセージが来たらアクターをBehaveirs.stopped()を返して停止させる
.onMessageEquals("stop") { Behaviors.stopped() }
.onSignal(PostStop::class.java) { _ -> onPostStop() }
.build()
}
private fun onPostStop(): Behavior<String> {
println("First Stop!")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup { context: ActorContext<String> -> StartStopActor1(context)
}
}
}
}
// エントリーポイント
fun main(){
val first = ActorSystem.create(StartStopActor1.create(), "first")
first.tell("stop")
}
出力はこんな感じになるはずです。
作成した順番と逆の順番でstopしていくのがわかるはずです。
first started
second started
second stopped
first stopped
くどいですが重要なので繰り返します。Akkaアクターは親が停止されるとまず子を再帰的に停止していきます。
エラーハンドリング(スーパーバイザ戦略)
親と子は常に接続された状態を保っており、子で例外を投げると親にバブルアウトしていき失敗情報が親の監督システム(スーパバイザ)に伝わります。
スーパバイザは通常親アクターが子アクターを作成する際に定義します。
ちなみにデフォルトのスーパーバイザは子アクターを停止することです。
簡単な実験で、スーパーバイザによる子アクターの再起動を観察してみましょう。
import akka.actor.typed.*
import akka.actor.typed.javadsl.AbstractBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.actor.typed.javadsl.Behaviors
import akka.actor.typed.javadsl.Receive
// 監督される側(子)
class SupervisedActor private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
init {
println("子:起動するよ")
}
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
.onMessageEquals("fail") { fail() }
.onSignal(PreRestart::class.java) { _ -> preRestart() }
.onSignal(PostStop::class.java) { _ -> postStop() }
.build()
}
private fun fail(): Behavior<String> {
println("子:今から例外で異状停止するよ!")
throw RuntimeException()
}
private fun preRestart(): Behavior<String> {
println("子:リスタート")
return this
}
private fun postStop(): Behavior<String> {
println("子:停止")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup { context: ActorContext<String> -> SupervisedActor(context) }
}
}
}
/**
* 監督する側
*/
class SupervisingActor private constructor(
context: ActorContext<String>
): AbstractBehavior<String>(context) {
private val child: ActorRef<String>
init {
child = context.spawn(
// 監視してActorを作成する。ちなみにこれは子が落ちたらその子を再起動させるという意味
Behaviors.supervise(SupervisedActor.create())
.onFailure(SupervisorStrategy.restart()),
"supervised-actor"
)
}
override fun createReceive(): Receive<String> {
return newReceiveBuilder()
.onMessageEquals("failChild") { onFailChild() }
.onMessageEquals("stop"){ Behaviors.stopped() }
.onSignal(PostStop::class.java) { _ -> postStop() }
.build()
}
private fun onFailChild(): Behavior<String> {
child.tell("fail")
return this
}
private fun postStop(): Behavior<String> {
println("親:停止")
return this
}
companion object {
fun create(): Behavior<String> {
return Behaviors.setup { context: ActorContext<String> -> SupervisingActor(context) }
}
}
}
/**
* エントリーポイント
*/
fun main(){
val testSystem: ActorRef<String> = ActorSystem.create(SupervisingActor.create(), "first")
testSystem.tell("failChild")
testSystem.tell("stop")
}
監督された子アクターが再起動しているのがわかると思います
ちなみにPreRestart
シグナルは再起動の前に処理されます。
子:起動するよ
子:今から例外で異状停止するよ!
子:リスタート
子:起動するよ
子:停止
親:停止