前編に引き続き、Akka TypedのGetting Stated Guideを動かしてAkkaの基本への理解を深めたいと思います。
前編ではPart2とPart3を学んだので、後編では残りのPart4とPart5を学んでいきます。
ここからはAkka Typed
の実装チュートリアルではなく、アクターモデルを設計するためのチュートリアルというニュアンスの方が強いです。
DeviceManagerとDeviceGroupの役割
それぞれの役割を以下のとおり定めます。
- DeviceManager
- グループIDとデバイスIDを受信した時に、そのグループが既に存在する場合はグループにメッセージを転送する。
- もし管理下にそのグループが存在しない場合は、アクターを生成してからメッセージを転送する。
- DeviceGroup
- 指定されたDeviceアクターの登録要求を受け取り、既に登録済の場合は既存のアクターの
ActorRef
で応答する。 - もし管理下にそのDeviceアクターが存在しない場合は、アクターを新規に生成してからその
ActorRef
で応答する。
- 指定されたDeviceアクターの登録要求を受け取り、既に登録済の場合は既存のアクターの
各センサーは、メッセージを直接送信することのできるDeviceアクターのActorRef
を持っていることとします。
DeviceGroupアクター
DeviceGroupアクターで実現することは以下の2つです。
- Deviceアクターの登録処理。
- グループ内のDeviceアクターを追跡し、Deviceアクターが停止したらグループから取り除く。
Deviceアクターの登録要求処理
本題に入る前に、サンプルコード内にCommand
が幾度となく登場する。これが、Device.Command
とDeviceGroup.Command
が混在しており、とてもややこしい(この後、DeviceManager.Command
も登場してカオス)。Device.Command
はDeviceCommand
、DeviceGroup.Command
はDeviceGroupCommand
とすることとします。Deviceアクターのコードも全て書き換え済。
また、DeviceManagerアクターのメッセージを受け取って処理を行うこととなるため、先にDeviceManagerアクターのメッセージだけは用意しておきます。
sealed trait DeviceManagerCommand
final case class RequestTrackDevice(groupId: String, deviceId: String, replyTo: ActorRef[DeviceRegistered]) extends DeviceManagerCommand with DeviceGroupCommand
final case class DeviceRegistered(device: ActorRef[DeviceCommand])
final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList]) extends DeviceManagerCommand with DeviceGroupCommand
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
デバイスの登録
現時点では難しいことはやっていません。デバイスIDを含むメッセージを受信し、作成済であればMap
に登録済のActorRef
を返し、未作成であればアクターを生成してMap
に登録しています。なお、受信したメッセージがこのグループIDのグループと異なる場合はワーニングを出力しています。
DeviceGroup.scala
package practice.akka.iot
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import practice.akka.iot.Device.DeviceCommand
import practice.akka.iot.DeviceGroup.DeviceGroupCommand
object DeviceGroup {
def apply(groupId: String): Behavior[DeviceGroupCommand] =
Behaviors.setup(context => new DeviceGroup(context, groupId))
trait DeviceGroupCommand
private final case class DeviceTerminated(device: ActorRef[DeviceCommand], groupId: String, deviceId: String)
extends DeviceGroupCommand
}
class DeviceGroup(context: ActorContext[DeviceGroupCommand], groupId: String)
extends AbstractBehaviorDeviceGroupCommand {
import DeviceGroup._
import DeviceManager.{ DeviceRegistered, ReplyDeviceList, RequestDeviceList, RequestTrackDevice }
private var deviceIdToActor = Map.empty[String, ActorRef[DeviceCommand]]
context.log.info(s"DeviceGroup $groupId started")
override def onMessage(msg: DeviceGroupCommand): Behavior[DeviceGroupCommand] =
msg match {
case trackMsg @ RequestTrackDevice(groupId
, deviceId, replyTo) =>
deviceIdToActor.get(deviceId) match {
case Some(deviceActor) =>
replyTo ! DeviceRegistered(deviceActor)
case None =>
context.log.info(s"Creating device actor for ${trackMsg.deviceId}")
val deviceActor = context.spawn(Device(groupId, deviceId), s"device-$deviceId")
deviceIdToActor += deviceId -> deviceActor
replyTo ! DeviceRegistered(deviceActor)
}
this
case RequestTrackDevice(gId, _, _) =>
context.log.warn(s"Ignoring TrackDevice request for $gId. This actor is responsible for $groupId.")
this
}
override def onSignal: PartialFunction[Signal, Behavior[DeviceGroupCommand]] = {
case PostStop =>
context.log.info(s"DeviceGroup $groupId stopped")
this
}
}
</div></details>
テストも書きます。ここでは、まず以下の部分に着目します。
```scala:DeviceGoupSpec.scala
"ignore requests for wrong groupId" in {
val probe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("wrongGroup", "device1", probe.ref)
probe.expectNoMessage(500.milliseconds)
}
group
というグループID用のDeviceGroupアクターにwrongGroup
というグループIDを指定してメッセージを送信してしまいました。
DeviceGroupアクターでは、以下のように実装されています。
case RequestTrackDevice(gId, _, _) =>
context.log.warn(s"Ignoring TrackDevice request for $gId. This actor is responsible for $groupId.")
this
replyTo ! XXX
が実装されていません。つまり、誤ったグループIDを指定してメッセージを送信した場合は、DeviceGroupアクターから何の情報も発生されません。なので、テストでは誤ったメッセージを送信後、500ミリ秒経っても何もメッセージ返ってこないことを確認しています。
もう一点、以下の部分にも着目します。
"return same actor for same deviceId" in {
val probe = createTestProbe[DeviceRegistered]()
val groupActor = spawn(DeviceGroup("group"))
groupActor ! RequestTrackDevice("group", "device1", probe.ref)
val registered1 = probe.receiveMessage()
groupActor ! RequestTrackDevice("group", "device1", probe.ref)
val registered2 = probe.receiveMessage()
registered1.device should ===(registered2.device)
}
ここでは、同一のデバイスIDについては同一のActorRef
が返されることを確認しています。
デバイスの除去
デバイスがデバイスグループから取り除かれるのは、Deviceアクターが停止している場合です。そのため、Deviceアクターが停止する際に、親に停止を通知する必要があります。
case trackMsg @ RequestTrackDevice(`groupId`, deviceId, replyTo) =>
deviceIdToActor.get(deviceId) match {
case Some(deviceActor) =>
replyTo ! DeviceRegistered(deviceActor)
case None =>
context.log.info(s"Creating device actor for ${trackMsg.deviceId}")
val deviceActor = context.spawn(Device(groupId, deviceId), s"device-$deviceId")
context.watchWith(deviceActor, DeviceTerminated(deviceActor, groupId, deviceId))
deviceIdToActor += deviceId -> deviceActor
replyTo ! DeviceRegistered(deviceActor)
}
this
case ...(省略)
case DeviceTerminated(_,_,deviceId) =>
context.log.warn(s"Device actor for $deviceId has been terminated")
deviceIdToActor -= deviceId
this
ActorContext#watchWith
/**
* Register for termination notification with a custom message once the Actor identified by the
* given [[ActorRef]] terminates. This message is also sent when the watched actor
* is on a node that has been removed from the cluster when using using Akka Cluster.
*
* `watchWith` is idempotent if it is called with the same `msg` and not mixed with `watch`.
*
* It will fail with an [[IllegalStateException]] if the same subject was watched before using `watch` or `watchWith` with
* another termination message. To change the termination message, unwatch first.
*
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def watchWith[U](other: ActorRef[U], msg: T): Unit
watchWith
は、指定されたActorRef
(今回はDeviceアクター)が停止した際にメッセージ(今回はDeviveTerminated
)発信を登録します。これによってデバイスアクターが停止した際にDeviceTerminated
を受信することができるので、ここでMap
からデバイスを取り除いています。
TypedじゃないAkkaでもwatch
がありましたが、こちらではTerminated
というメッセージしか発信できなかったため、任意のパラメーターを含むメッセージにすることはできませんでしたが、watchWith
を使用すれば今回のサンプルのように任意の値(今回のサンプルではgroupId
)を含んだメッセージを発信できるようになります。
DeviceManagerアクター
DeviceManager.scala
package practice.akka.iot
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import practice.akka.iot.Device.DeviceCommand
import practice.akka.iot.DeviceGroup.DeviceGroupCommand
import practice.akka.iot.DeviceManager.DeviceManagerCommand
object DeviceManager {
def apply(): Behavior[DeviceManagerCommand] =
Behaviors.setup(context => new DeviceManager(context))
sealed trait DeviceManagerCommand
final case class RequestTrackDevice(groupId: String, deviceId: String, replyTo: ActorRef[DeviceRegistered])
extends DeviceManagerCommand
with DeviceGroupCommand
final case class DeviceRegistered(device: ActorRef[DeviceCommand])
final case class RequestDeviceList(requestId: Long, groupId: String, replyTo: ActorRef[ReplyDeviceList])
extends DeviceManagerCommand
with DeviceGroupCommand
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
private final case class DeviceGroupTerminated(groupId: String) extends DeviceManagerCommand
}
class DeviceManager(context: ActorContext[DeviceManagerCommand])
extends AbstractBehavior[DeviceManagerCommand](context) {
import DeviceManager._
var groupIdToActor = Map.empty[String, ActorRef[DeviceGroupCommand]]
context.log.info("DeviceManager started")
override def onMessage(msg: DeviceManagerCommand): Behavior[DeviceManagerCommand] =
msg match {
case trackMsg @ RequestTrackDevice(groupId, _, replyTo) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref ! trackMsg
case None =>
context.log.info(s"Creating device group actor for $groupId")
val groupActor = context.spawn(DeviceGroup(groupId), "group-" + groupId)
context.watchWith(groupActor, DeviceGroupTerminated(groupId))
groupActor ! trackMsg
groupIdToActor += groupId -> groupActor
}
this
case req @ RequestDeviceList(requestId, groupId, replyTo) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref ! req
case None =>
replyTo ! ReplyDeviceList(requestId, Set.empty)
}
this
case DeviceGroupTerminated(groupId) =>
context.log.info(s"Device group actor for $groupId has been terminated")
groupIdToActor -= groupId
this
}
override def onSignal: PartialFunction[Signal, Behavior[DeviceManagerCommand]] = {
case PostStop =>
context.log.info("DeviceManager stopped")
this
}
}
ここまで来ると目新しいキーワードは多くはありません。DeviceGroup−Deviceの関係がDeviceManager−DeviceGroupになるだけです。これでデバイスグループを新規に作成し、特定のデバイスグループ配下のデバイスを一覧で取得できるようにもなりました。
デバイスの温度を一気に取得する
デバイスは随時動的に増減する可能性があるため、以下の方法で実現していきます。
- 要求時、DeviceGroupアクターはその時点で存在するDeviceアクターのスナップショットを取得し、それらの温度を取得する。
- 要求の到着後に作成されたDeviceは無視する。
- スナップショット内のアクターが要求処理中に応答せず停止した場合は、メッセージの送信元に停止の事実を伝える。
ただし、Deviceアクターの応答が遅かったりバグによる無限ループに陥っている等の場合にずっと待ってはいられないので、以下の方針を追加します。
- スナップショット内のアクターは、以下のいずれかによって処理完了とみなす。
2. 応答があること
3. 「停止していること」を確認すること - 予め定めたタイムアウトの時間を超える
これらのことから、Deviceアクターは以下の4つの内いずれかの状態を取ると定義することができます。
- 温度を返すことができる。これを
Temperature
とする。 - 応答はあったが、まだ温度を返すことはできない。これを
TemperatureNotAvailable
とする。 - 応答前に既に停止している。これを
DeviceNotAvailable
とする。 - 期限超過(=タイムアウト)。これを
DeviceTimedOut
とする。
これらをメッセージプロトコルとして追加します。これらはDeviceGroupアクターが受け取るメッセージとなるのでDeviceGroupアクターに実装したくなるのですが、連続してリクエストが行われた場合に1回目のリクエストと2回目のリクエストを別のリクエストとして区別できなくなるので、DeviceGroupアクターへの処理の手続きを定義するためのアクターを作成します。これは、これまで実装していたのがエンティティ的なアクターであったのに対し、手続きを定義したユースケース的なアクターを実装していくことを意味します。このユースケース的なアクターをDeviceGroupQueryアクターとして定義します。
DeviceGroupQueryアクター
DeviceGroupQuery.scala
package practice.akka.iot
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors, TimerScheduler }
import akka.actor.typed.{ ActorRef, Behavior }
import practice.akka.iot.Device.DeviceCommand
import practice.akka.iot.DeviceGroupQuery.DeviceGroupQueryCommand
import scala.concurrent.duration.FiniteDuration
object DeviceGroupQuery {
def apply(deviceIdToActor: Map[String, ActorRef[DeviceCommand]],
requestId: Long,
requester: ActorRef[DeviceManager.RespondAllTemperatures],
timeout: FiniteDuration): Behavior[DeviceGroupQueryCommand] = {
Behaviors.setup(context => {
Behaviors.withTimers { timers =>
new DeviceGroupQuery(deviceIdToActor, requestId, requester, timeout, context, timers)
}
})
}
trait DeviceGroupQueryCommand
private case object CollectionTimeout extends DeviceGroupQueryCommand
final case class WrappedRespondTemperature(response: Device.RespondTemperature) extends DeviceGroupQueryCommand
private final case class DeviceTerminated(deviceId: String) extends DeviceGroupQueryCommand
}
class DeviceGroupQuery(deviceIdToActor: Map[String, ActorRef[Device.DeviceCommand]],
requestId: Long,
requester: ActorRef[DeviceManager.RespondAllTemperatures],
timeout: FiniteDuration,
context: ActorContext[DeviceGroupQueryCommand],
timers: TimerScheduler[DeviceGroupQueryCommand])
extends AbstractBehavior[DeviceGroupQueryCommand](context) {
import DeviceGroupQuery._
import DeviceManager.DeviceNotAvailable
import DeviceManager.DeviceTimedOut
import DeviceManager.RespondAllTemperatures
import DeviceManager.Temperature
import DeviceManager.TemperatureNotAvailable
import DeviceManager.TemperatureReading
timers.startSingleTimer(CollectionTimeout, CollectionTimeout, timeout)
private val respondTemperatureAdapter = context.messageAdapter(WrappedRespondTemperature.apply)
private var repliesSoFar = Map.empty[String, TemperatureReading]
private var stillWaiting = deviceIdToActor.keySet
deviceIdToActor.foreach {
case (deviceId, device) =>
context.watchWith(device, DeviceTerminated(deviceId))
device ! Device.ReadTemperature(0, respondTemperatureAdapter)
}
override def onMessage(msg: DeviceGroupQueryCommand): Behavior[DeviceGroupQueryCommand] =
msg match {
case WrappedRespondTemperature(response) => onRespondTemperature(response)
case DeviceTerminated(deviceId) => onDeviceTerminated(deviceId)
case CollectionTimeout => onCollectionTimeout()
}
private def onRespondTemperature(response: Device.RespondTemperature): Behavior[DeviceGroupQueryCommand] = {
val reading = response.value match {
case Some(value) => Temperature(value)
case None => TemperatureNotAvailable
}
val deviceId = response.deviceId
repliesSoFar += (deviceId -> reading)
stillWaiting -= deviceId
respondWhenAllCollected()
}
private def onDeviceTerminated(deviceId: String): Behavior[DeviceGroupQueryCommand] = {
if (stillWaiting(deviceId)) {
repliesSoFar += (deviceId -> DeviceNotAvailable)
stillWaiting -= deviceId
}
respondWhenAllCollected()
}
private def onCollectionTimeout(): Behavior[DeviceGroupQueryCommand] = {
repliesSoFar ++= stillWaiting.map(deviceId => deviceId -> DeviceTimedOut)
stillWaiting = Set.empty
respondWhenAllCollected()
}
private def respondWhenAllCollected(): Behavior[DeviceGroupQueryCommand] = {
if (stillWaiting.isEmpty) {
requester ! RespondAllTemperatures(requestId, repliesSoFar)
Behaviors.stopped
} else {
this
}
}
}
クエリーアクターは、手続きに必要とする情報を保持した上でタイムアウトの定義も行う必要があります。タイムアウトについては、タイムアウト時に発信するメッセージをAkkaの機能を使うことで定義することができます。
Behaviors#withTimers
/**
* Support for scheduled `self` messages in an actor.
* It takes care of the lifecycle of the timers such as cancelling them when the actor
* is restarted or stopped.
*
* @see [[TimerScheduler]]
*/
def withTimers[T](factory: TimerScheduler[T] => Behavior[T]): Behavior[T] =
TimerSchedulerImpl.withTimers(factory)
TimerSchedule
を渡すことで、Behavior
に自分自身に向けたメッセージを発信することができるようになります。どのようなメッセージを発信するかはstartSingleTimer
で定義します。
TimerScheduler#startSingleTimer
/**
* Start a timer that will send `msg` once to the `self` actor after
* the given `delay`.
*
* Each timer has a key and if a new timer with same key is started
* the previous is cancelled. It is guaranteed that a message from the
* previous timer is not received, even if it was already enqueued
* in the mailbox when the new timer was started.
*/
def startSingleTimer(key: Any, msg: T, delay: FiniteDuration): Unit
今回はtimers.startSingleTimer(CollectionTimeout, CollectionTimeout, timeout)
のように使用するので、2つ目のCollectionTimeout
が発信するメッセージとなります。
1つ目のCollectionTimeout
はkey
となっています。同キーを指定して`startSingleTimer'を呼び出すことで、前回の同キーで定義したタイマーをキャンセルしてタイマーを再セットすることができます。
ActorContext#messageAdapter
/**
* Create a message adapter that will convert or wrap messages such that other Actor’s
* protocols can be ingested by this Actor.
*
* You can register several message adapters for different message classes.
* It's only possible to have one message adapter per message class to make sure
* that the number of adapters are not growing unbounded if registered repeatedly.
* That also means that a registered adapter will replace an existing adapter for
* the same message class.
*
* A message adapter will be used if the message class matches the given class or
* is a subclass thereof. The registered adapters are tried in reverse order of
* their registration order, i.e. the last registered first.
*
* A message adapter (and the returned `ActorRef`) has the same lifecycle as
* this actor. It's recommended to register the adapters in a top level
* `Behaviors.setup` or constructor of `AbstractBehavior` but it's possible to
* register them later also if needed. Message adapters don't have to be stopped since
* they consume no resources other than an entry in an internal `Map` and the number
* of adapters are bounded since it's only possible to have one per message class.
* *
* The function is running in this actor and can safely access state of it.
*
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def messageAdapter[U: ClassTag](f: U => T): ActorRef[U]
アダプターは各メッセージクラス毎に1つだけ作成可能で、以下のように実装することでActorRef[WrappedRespondTemperature
を生成することができます。
private val respondTemperatureAdapter = context.messageAdapter(WrappedRespondTemperature.apply)
DeviceGroupアクターに機能追加
DeviceGroupアクターに全てのデバイスの温度を取得する処理を追加します。
class DeviceGroup(context: ActorContext[DeviceGroupCommand], groupId: String)
extends AbstractBehavior[DeviceGroupCommand](context) {
override def onMessage(msg: DeviceGroupCommand): Behavior[DeviceGroupCommand] =
msg match {
case RequestAllTemperatures(requestId, gId, replyTo) =>
if (gId == groupId) {
context.spawnAnonymous(
DeviceGroupQuery(deviceIdToActor, requestId, requester = replyTo, 3.seconds)
)
this
} else
Behaviors.unhandled
}
ActorContext#spawnAnonymous
/**
* Create a child Actor from the given [[akka.actor.typed.Behavior]] under a randomly chosen name.
* It is good practice to name Actors wherever practical.
*
* *Warning*: This method is not thread-safe and must not be accessed from threads other
* than the ordinary actor message processing thread, such as [[scala.concurrent.Future]] callbacks.
*/
def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U]
与えられたBehavior
の子アクターを生成します。
今回は、指定されたグループIDでDeviceGroupQueryアクターを生成しています。これにより、DeviceGroupQuryアクターの内部では配下のDeviceたちに対してDevice.ReadTemperature
メッセージを発信しています。
ここまででやったこと
-
Supervisorを作る -
DeviceManagerを作る -
DeviceGroupQueryを作る -
DeviceGroupを作る -
Deviceを作る -
アクターのテストを書く -
Behavior[T]
のT
が何を意味するのかを明らかにする
Behavior[T]
のT
って何?
前編完了時の宿題に残っていました。Deviceアクターを例にして確認します。
object Device {
def apply(groupId: String, deviceId: String): Behavior[DeviceCommand] =
Behaviors.setup(context => new Device(context, groupId, deviceId))
sealed trait DeviceCommand
// DeviceCommand型のメッセージ①
final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends DeviceCommand
final case class RespondTemperature(requestId: Long, deviceId: String, value: Option[Double])
// DeviceCommand型のメッセージ②
final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded])
extends DeviceCommand
final case class TemperatureRecorded(requestId: Long)
// DeviceCommand型のメッセージ③
case object Passivate extends DeviceCommand
}
class Device(context: ActorContext[DeviceCommand], groupId: String, deviceId: String)
extends AbstractBehavior[DeviceCommand](context) {
override def onMessage(msg: DeviceCommand): Behavior[DeviceCommand] = {
msg match {
// DeviceCommand型のメッセージ①
case RecordTemperature(requestId, value, replyTo) => ???
// DeviceCommand型のメッセージ②
case ReadTemperature(requestId, replyTo) => ???
// DeviceCommand型のメッセージ③
case Passivate => ???
}
}
}
Behavior[T]
のT
は、onMessage
のパラメータであるmsg
の型を示していることが理解できました。上記の例であれば、Behavior
のT
はDeviceCommand
なので、onMessage
のmsg
の型もDeviceCommand
型となります。このようにonMessage
が受け取るメッセージが型付けられ、型安全なメッセージのやり取りを行うことができることがAkka Typed
によって得られる恩恵です。
まとめ
文法的な基本は前半に詰まっており、後半はアクタープログラムを設計する際の考え方についてのチュートリアルでした。頭の中で整理するだけではメッセージの関係がごちゃごちゃになってしまうので、図にしてメッセージの流れを確認しながらチュートリアルするのがオススメです。
Getting Started Guideが完了してからが本番です。大量のコンテンツが待っているので、心が折れてしまわないようにまずは基本をしっかり固めて臨みたいと思います。