Akka Typedに入門する
Akka 2.6からAkka Typed
が正式に導入されました。
Akka Typed
のGetting Stated Guideを動かしてAkkaの基本をおさらいします。これからAkkaに入門する人の助けになればと思い、読み解いた過程を記していきます。
お題は、IOTのデバイス管理をするチュートリアルです。以下のような階層のActor
たちを作っていきます。
コードは基本的に公式ドキュメントのコードを写経していきつつ、わからないところは調べて解説を入れていきます。ところどころで貼り付けるAkkaのコードは、メソッドコメントを参照するために貼り付けています。
写経したコードはGitHubにアップしています。
環境
build.sbt
の中身は以下だけです。
name := "akka-typed-practice"
version := "0.1"
scalaVersion := "2.13.1"
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.1"
Supervisorを起動する
まずは空っぽの処理で動かせるようにしてきます。そのために、アクターシステムのトップであるSupervisorを作っていきます。
package practice.akka.iot
import akka.actor.typed.ActorSystem
object IotApp {
def main(args: Array[String]): Unit = {
ActorSystem[Nothing](IotSupervisor(), "iot-sample")
}
}
ActorSystem#apply
/**
* Scala API: Create an ActorSystem
*/
def apply[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup()))
メソッドのコメントに『Scala API: Create an ActorSystem』と書かれているとおり、こいつを通してActor
を作っていきます。
ActorSystem
のapply
は、パラメーターにBehavior[T]
と任意のアクターの名前を取ります。Behavior
は、Actor
がメッセージを受信した際の振る舞いを規定します(参考)。
あと、型パラメーターT
を取るようですが、T
が何を意味しているのかはよくわからないので、いったんは無視。
IotSupervisorを空実装する
エントリーポイントから呼び出すIotSupervisor
が無いためコンパイルすらできないので定義しておきます。
package practice.akka.iot
import akka.actor.typed.{ Behavior, PostStop, Signal }
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
object IotSupervisor {
def apply(): Behavior[Nothing] =
Behaviors.setup[Nothing](context => new IotSupervisor(context))
}
class IotSupervisor(context: ActorContext[Nothing]) extends AbstractBehavior[Nothing](context) {
context.log.info("IoT Application started")
override def onMessage(msg: Nothing): Behavior[Nothing] = {
Behaviors.unhandled
}
override def onSignal: PartialFunction[Signal, Behavior[Nothing]] = {
case PostStop =>
context.log.info("IoT Application stopped")
this
}
}
AbstractBehavior#setup
/**
* `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
* the actor is started, as opposed to [[Behaviors.receive]] that creates the behavior instance
* immediately before the actor is running. The `factory` function pass the `ActorContext`
* as parameter and that can for example be used for spawning child actors.
*
* `setup` is typically used as the outer most behavior when spawning an actor, but it
* can also be returned as the next behavior when processing a message or signal. In that
* case it will be started immediately after it is returned, i.e. next message will be
* processed by the started behavior.
*/
def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] =
BehaviorImpl.DeferredBehavior(factory)
setup
メソッドは、Behavior
のファクトリーメソッドです。頻出ワードです。
アプリ側のBehavior
の定義はAbstractBehavior
を継承して行います。AbstractBehavior
を継承すると、onMessage
メソッドの実装を求められます。また、サンプルコードではonSignal
メソッドもオーバーライドして使用しています。
onMessage(msg: T): Behavior[T]
onSignal: PartialFunction[Signal, Behavior[T]]
AbstractBehavior#onMessage
/**
* Implement this method to process an incoming message and return the next behavior.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
* <ul>
* <li>returning `stopped` will terminate this Behavior</li>
* <li>returning `this` or `same` designates to reuse the current Behavior</li>
* <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
* </ul>
*
*/
@throws(classOf[Exception])
def onMessage(msg: T): Behavior[T]
メッセージを受け取った時の処理を実装します。戻り値は3つの中から適切な戻り値を選択します。
Behaviors.stopped
-
this
またはBehaviors.same
Behaviors.unhandled
stopped
Behaviorは停止される。
this または same
Behaviorは再利用される。
unhandled
メッセージがまだ処理されていないことを通知します。
→どういうこと?
AbstractBehavior#onSignal
/**
* Override this method to process an incoming [[akka.actor.typed.Signal]] and return the next behavior.
* This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages
* can initiate a behavior change.
*
* The returned behavior can in addition to normal behaviors be one of the canned special objects:
*
* * returning `stopped` will terminate this Behavior
* * returning `this` or `same` designates to reuse the current Behavior
* * returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
*
* By default, partial function is empty and does not handle any signals.
*/
@throws(classOf[Exception])
def onSignal: PartialFunction[Signal, Behavior[T]] = PartialFunction.empty
シグナルの処理を定義し、次の振る舞い(Behavior)を返す。シグナルの種類は、onMessageの戻り値の種類と同じ。つまり、onMessageの戻り値がシグナルとなり、それぞれのシグナルに応じた処理を定義することができます。
とりあえず動かしてみる
sbt:akka-typed-practice> run
[info] running practice.akka.iot.IotApp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
とりあえず何かが動きました。ただ、IoT Application started
が出力されていません。IoTSupervisor
が呼び出されたらログに出力されると思ったんだけど???答えはBehaviors
のsetup
メソッドコメントにありました。
Creation of the behavior instance is deferred until the actor is started
インスタンス生成はアクターが実際に開始されるまで遅延されるようです。ログが出力されない理由はわかった気がするけど、じゃあいつアクターは開始するの?アクターの処理が呼び出される時なのかな?
止まってても仕方がないので、いったんそういうものだと思って次に進みましょう。
Deviceアクター
ヒエラルキのトップであるSupervisor
はいったん作れたので、次は末端にあたるDevice
を作っていきます。
Deviceアクターで実現することは以下の2つです。
- 温度を測定して収集する
- 問いかけたら直近の温度を答える
注意点は、デバイス開始後すぐは温度測定結果を保持していないかもしれないことです。
それでは早速やっていきます。具体的な実装内容は以下のとおりです。
- 現在の温度のリクエストを待つ
- 以下のいずれかの方法でリクエストに答える
3. 現在の温度を返す
4. まだ温度が計測できていないことを返す
まずは、温度計測結果の取得からです。処理する内容はCommand
を継承してcase class
で示します。また、replyTo: ActorRef[RespondTemperature]
は応答を返す際に使用します。つまり、『温度計測結果をどこに返してほしいか』を要求時に指定しています。
Device.scala
package practice.akka.iot
import akka.actor.typed.ActorRef
object Device {
sealed trait Command
final case class ReadTemperature(replyTo: ActorRef[RespondTemperature]) extends Command
final case class RespondTemperature(value: Option[Double])
}
アクターはどのように定義されるべきか?
ここで一時中断です。公式でもかなりのボリュームを割いて説明しています。(これに該当する内容の説明が日本語サイトに載っています。)
ここでは、『アクターは分散環境で実行される』ことを前提とした「メッセージ配信の特徴」と「メッセージ順序の特徴」が述べられています。詳細はリンクを見てください。
(再)Deviceアクター
上で実装した時は、分散環境で実行されることを考慮することができていませんでした。そこで、メッセージにIDを付与することで、どの要求に対するレスポンスが返ってきたのかを判断できるようにします。
Device.scala(修正版)
package practice.akka.iot
import akka.actor.typed.ActorRef
object Device {
sealed trait Command
final case class ReadTemperature(replyTo: ActorRef[RespondTemperature]) extends Command
final case class RespondTemperature(value: Option[Double])
}
さらにDeviceアクターを実装していきます。
Supervisorの実装時と同様に、Behavior
を生成できるようにしていきます。
Device.scala(最終版)
package practice.akka.iot
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import practice.akka.iot.Device.Command
object Device {
def apply(groupId: String, deviceId: String): Behavior[Command] =
Behaviors.setup(context => new Device(context, groupId, deviceId))
sealed trait Command
final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(context: ActorContext[Command], groupId: String, deviceId: String)
extends AbstractBehavior[Command](context) {
import practice.akka.iot.Device._
var lastTemperatureReading: Option[Double] = None
context.log.info(s"Device actor $groupId-$deviceId started")
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case ReadTemperature(requestId, replyTo) => {
replyTo ! RespondTemperature(requestId, lastTemperatureReading)
this
}
}
}
override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
case PostStop =>
context.log.info(s"Device actor $groupId-$deviceId started")
this
}
}
onMessage
では、ReadTemperature
のメッセージを受け取ったら、replyTo
で指定された先に直近の計測結果を返すようになっています。直近の計測結果は、変数lastTemperatureReading
としてアクター内に状態として保存されています。Deviceアクターの実装前に確認した注意点のとおり、起動直後は温度の計測結果が取得できていない可能性があるのでNone
が設定されています。
唐突に登場するgroupId
とdeviceId
についてですが、今回のシステムは冒頭のヒエラルキの図のとおりM個のグループの中にあるN個のデバイスを対象にするシステムなので、デバイスを特定するための必要なIDを指定できるようにしているだけです。
アクターのテスト
Akkaにはテストキットが付属されているので、テストキットを使ってDeviceアクターのテストを書く。公式のとおりに書こうと思ったけど、すぐに真っ赤に・・・そっか、公式のTestingのページを参考にライブラリーを追加します。
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.6.1" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test
これでテストが書けるようになりました。
DeviceSpec.scala
package practice.akka.iot
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike
class DeviceSpec extends ScalaTestWithActorTestKit with WordSpecLike {
import Device._
"Device actor" must {
"reply with empty reading if no temperature is known" in {
val prob = createTestProbe[RespondTemperature]()
val deviceActor = spawn(Device("group", "device"))
deviceActor ! Device.ReadTemperature(requestId = 42, prob.ref)
val response = prob.receiveMessage()
response.requestId should ===(42)
response.value should ===(None)
}
}
}
ActorTestKitBase#createTestProbe
/**
* See corresponding method on [[ActorTestKit]]
*/
def createTestProbe[M](): TestProbe[M] = testKit.createTestProbe[M]()
ActorTestKit
メソッドを見ろということなので、こちらも見てみます。
/**
* Shortcut for creating a new test probe for the testkit actor system
* @tparam M the type of messages the probe should accept
*/
def createTestProbe[M](): TestProbe[M] = TestProbe()(system)
テストコード内のprob
は、createTestProbe
メソッドによって生成されたテスト用のメールボックスです。prob
からアクターの処理結果を返してもらえるようです。テストコードの中でも、prob.receiveMessage()
で処理結果を返してもらっています。
ActorTestKitBase#spawn
/**
* See corresponding method on [[ActorTestKit]]
*/
def spawn[T](behavior: Behavior[T]): ActorRef[T] = testKit.spawn(behavior)
こちらもActorTestKit
を見てみます。
/**
* Spawn the given behavior. This is created as a child of the test kit
* guardian
*/
def spawn[T](behavior: Behavior[T]): ActorRef[T] =
spawn(behavior, Props.empty)
/**
* Spawn the given behavior. This is created as a child of the test kit
* guardian
*/
def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration)
spawn
はBehavior
からアクターを生成する処理です。テストコードの中でも、spawn
によって生成されたdeviceActor
を使ってメッセージを送信しています。ReadTemperature
メッセージのreplyTo
にprob
を使用することでメッセージの返信先をテスト用のメールボックスにしているわけですね。
(再々)Deviceアクター
テストもとりあえず書けるようになったので、Deviceアクターのもう1つの機能である温度計測結果の保持を実現していきます。
ここでも分散環境での実行を意識して以下のように定義します。
final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded]) extends Command
final case class TemperatureRecorded(requestId: Long)
Device.scala(書き込みも可能なバージョン)
package practice.akka.iot
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import practice.akka.iot.Device.Command
object Device {
def apply(groupId: String, deviceId: String): Behavior[Command] =
Behaviors.setup(context => new Device(context, groupId, deviceId))
sealed trait Command
final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
final case class RespondTemperature(requestId: Long, value: Option[Double])
final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded])
extends Command
final case class TemperatureRecorded(requestId: Long)
}
class Device(context: ActorContext[Command], groupId: String, deviceId: String)
extends AbstractBehavior[Command](context) {
import practice.akka.iot.Device._
var lastTemperatureReading: Option[Double] = None
context.log.info(s"Device actor $groupId-$deviceId started")
override def onMessage(msg: Command): Behavior[Command] = {
msg match {
case RecordTemperature(requestId, value, replyTo) => {
context.log.info(s"Recorded temperature reading $value with $requestId")
lastTemperatureReading = Some(value)
replyTo ! TemperatureRecorded(requestId)
this
}
case ReadTemperature(requestId, replyTo) => {
replyTo ! RespondTemperature(requestId, lastTemperatureReading)
this
}
}
}
override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
case PostStop =>
context.log.info(s"Device actor $groupId-$deviceId started")
this
}
}
これにテストを加えます。
val recordProbe = createTestProbe[TemperatureRecorded]()
val readProbe = createTestProbe[RespondTemperature]()
val deviceActor = spawn(Device("group", "device"))
deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))
deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
val response1 = readProbe.receiveMessage()
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
24.0で書き込んだ後にReadTemperature
メッセージで温度計測結果を取得したら24.0が返ってくることを確認できました。
DeviceSpec.scala(書き込みテスト追加バージョン)
package practice.akka.iot
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike
class DeviceSpec extends ScalaTestWithActorTestKit with WordSpecLike {
import Device._
"Device actor" must {
"reply with empty reading if no temperature is known" in {
val prob = createTestProbe[RespondTemperature]()
val deviceActor = spawn(Device("group", "device"))
deviceActor ! Device.ReadTemperature(requestId = 42, prob.ref)
val response = prob.receiveMessage()
response.requestId should ===(42)
response.value should ===(None)
}
"reply with latest temperature reading" in {
val recordProbe = createTestProbe[TemperatureRecorded]()
val readProbe = createTestProbe[RespondTemperature]()
val deviceActor = spawn(Device("group", "device"))
deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))
deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
val response1 = readProbe.receiveMessage()
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor ! Device.RecordTemperature(requestId = 3, 55.0, recordProbe.ref)
recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 3))
deviceActor ! Device.ReadTemperature(requestId = 4, readProbe.ref)
val response2 = readProbe.receiveMessage()
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
}
}
ここまでのまとめとこれからやること
-
Supervisorを作る - DeviceManagerを作る
- DeviceGroupを作る
-
Deviceを作る -
アクターのテストを書く -
Behavior[T]
のT
が何を意味するのかを明らかにする