LoginSignup
10
10

More than 1 year has passed since last update.

Getting Started Guideを通してAkka Typedの基本を理解する(前編)

Last updated at Posted at 2020-01-02

Akka Typedに入門する

Akka 2.6からAkka Typedが正式に導入されました。
Akka TypedGetting Stated Guideを動かしてAkkaの基本をおさらいします。これからAkkaに入門する人の助けになればと思い、読み解いた過程を記していきます。

お題は、IOTのデバイス管理をするチュートリアルです。以下のような階層のActorたちを作っていきます。

arch_tree_diagram.png

コードは基本的に公式ドキュメントのコードを写経していきつつ、わからないところは調べて解説を入れていきます。ところどころで貼り付けるAkkaのコードは、メソッドコメントを参照するために貼り付けています。
写経したコードはGitHubにアップしています。

環境

build.sbtの中身は以下だけです。

build.sbt
name := "akka-typed-practice"

version := "0.1"

scalaVersion := "2.13.1"

libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6.1"

Supervisorを起動する

まずは空っぽの処理で動かせるようにしてきます。そのために、アクターシステムのトップであるSupervisorを作っていきます。

IotApp.scala
package practice.akka.iot

import akka.actor.typed.ActorSystem

object IotApp {

  def main(args: Array[String]): Unit = {
    ActorSystem[Nothing](IotSupervisor(), "iot-sample")
  }
}

ActorSystem#apply

ActorSystem.scala
  /**
   * Scala API: Create an ActorSystem
   */
  def apply[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
    createInternal(name, guardianBehavior, Props.empty, ActorSystemSetup.create(BootstrapSetup()))

メソッドのコメントに『Scala API: Create an ActorSystem』と書かれているとおり、こいつを通してActorを作っていきます。
ActorSystemapplyは、パラメーターにBehavior[T]と任意のアクターの名前を取ります。Behaviorは、Actorがメッセージを受信した際の振る舞いを規定します(参考)。
あと、型パラメーターTを取るようですが、Tが何を意味しているのかはよくわからないので、いったんは無視。

IotSupervisorを空実装する

エントリーポイントから呼び出すIotSupervisorが無いためコンパイルすらできないので定義しておきます。

IotSupervisor.scala
package practice.akka.iot

import akka.actor.typed.{ Behavior, PostStop, Signal }
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }

object IotSupervisor {
  def apply(): Behavior[Nothing] =
    Behaviors.setup[Nothing](context => new IotSupervisor(context))
}

class IotSupervisor(context: ActorContext[Nothing]) extends AbstractBehavior[Nothing](context) {

  context.log.info("IoT Application started")

  override def onMessage(msg: Nothing): Behavior[Nothing] = {
    Behaviors.unhandled
  }

  override def onSignal: PartialFunction[Signal, Behavior[Nothing]] = {
    case PostStop =>
      context.log.info("IoT Application stopped")
      this
  }
}

AbstractBehavior#setup

Behaviors.scala
  /**
   * `setup` is a factory for a behavior. Creation of the behavior instance is deferred until
   * the actor is started, as opposed to [[Behaviors.receive]] that creates the behavior instance
   * immediately before the actor is running. The `factory` function pass the `ActorContext`
   * as parameter and that can for example be used for spawning child actors.
   *
   * `setup` is typically used as the outer most behavior when spawning an actor, but it
   * can also be returned as the next behavior when processing a message or signal. In that
   * case it will be started immediately after it is returned, i.e. next message will be
   * processed by the started behavior.
   */
  def setup[T](factory: ActorContext[T] => Behavior[T]): Behavior[T] =
    BehaviorImpl.DeferredBehavior(factory)

setupメソッドは、Behaviorのファクトリーメソッドです。頻出ワードです。
アプリ側のBehaviorの定義はAbstractBehaviorを継承して行います。AbstractBehaviorを継承すると、onMessageメソッドの実装を求められます。また、サンプルコードではonSignalメソッドもオーバーライドして使用しています。

  1. onMessage(msg: T): Behavior[T]
  2. onSignal: PartialFunction[Signal, Behavior[T]]

AbstractBehavior#onMessage

AbstractBehavior.scala
  /**
   * Implement this method to process an incoming message and return the next behavior.
   *
   * The returned behavior can in addition to normal behaviors be one of the canned special objects:
   * <ul>
   * <li>returning `stopped` will terminate this Behavior</li>
   * <li>returning `this` or `same` designates to reuse the current Behavior</li>
   * <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
   * </ul>
   *
   */
  @throws(classOf[Exception])
  def onMessage(msg: T): Behavior[T]

メッセージを受け取った時の処理を実装します。戻り値は3つの中から適切な戻り値を選択します。

  1. Behaviors.stopped
  2. this または Behaviors.same
  3. Behaviors.unhandled

stopped

Behaviorは停止される。

this または same

Behaviorは再利用される。

unhandled

メッセージがまだ処理されていないことを通知します。
 →どういうこと?

AbstractBehavior#onSignal

AbstractBehavior.scala
  /**
   * Override this method to process an incoming [[akka.actor.typed.Signal]] and return the next behavior.
   * This means that all lifecycle hooks, ReceiveTimeout, Terminated and Failed messages
   * can initiate a behavior change.
   *
   * The returned behavior can in addition to normal behaviors be one of the canned special objects:
   *
   *  * returning `stopped` will terminate this Behavior
   *  * returning `this` or `same` designates to reuse the current Behavior
   *  * returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
   *
   * By default, partial function is empty and does not handle any signals.
   */
  @throws(classOf[Exception])
  def onSignal: PartialFunction[Signal, Behavior[T]] = PartialFunction.empty

シグナルの処理を定義し、次の振る舞い(Behavior)を返す。シグナルの種類は、onMessageの戻り値の種類と同じ。つまり、onMessageの戻り値がシグナルとなり、それぞれのシグナルに応じた処理を定義することができます。

とりあえず動かしてみる

sbt:akka-typed-practice> run
[info] running practice.akka.iot.IotApp
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.

とりあえず何かが動きました。ただ、IoT Application startedが出力されていません。IoTSupervisorが呼び出されたらログに出力されると思ったんだけど???答えはBehaviorssetupメソッドコメントにありました。

Creation of the behavior instance is deferred until the actor is started

インスタンス生成はアクターが実際に開始されるまで遅延されるようです。ログが出力されない理由はわかった気がするけど、じゃあいつアクターは開始するの?アクターの処理が呼び出される時なのかな?
止まってても仕方がないので、いったんそういうものだと思って次に進みましょう。

Deviceアクター

ヒエラルキのトップであるSupervisorはいったん作れたので、次は末端にあたるDeviceを作っていきます。
Deviceアクターで実現することは以下の2つです。

  1. 温度を測定して収集する
  2. 問いかけたら直近の温度を答える

注意点は、デバイス開始後すぐは温度測定結果を保持していないかもしれないことです。
それでは早速やっていきます。具体的な実装内容は以下のとおりです。

  1. 現在の温度のリクエストを待つ
  2. 以下のいずれかの方法でリクエストに答える
    3. 現在の温度を返す
    4. まだ温度が計測できていないことを返す

まずは、温度計測結果の取得からです。処理する内容はCommandを継承してcase classで示します。また、replyTo: ActorRef[RespondTemperature]は応答を返す際に使用します。つまり、『温度計測結果をどこに返してほしいか』を要求時に指定しています。

Device.scala
Device.scala
package practice.akka.iot

import akka.actor.typed.ActorRef

object Device {
  sealed trait Command
  final case class ReadTemperature(replyTo: ActorRef[RespondTemperature]) extends Command
  final case class RespondTemperature(value: Option[Double])
}

アクターはどのように定義されるべきか?

ここで一時中断です。公式でもかなりのボリュームを割いて説明しています。(これに該当する内容の説明が日本語サイトに載っています。)

ここでは、『アクターは分散環境で実行される』ことを前提とした「メッセージ配信の特徴」と「メッセージ順序の特徴」が述べられています。詳細はリンクを見てください。

(再)Deviceアクター

上で実装した時は、分散環境で実行されることを考慮することができていませんでした。そこで、メッセージにIDを付与することで、どの要求に対するレスポンスが返ってきたのかを判断できるようにします。

Device.scala(修正版)
Device.scala
package practice.akka.iot

import akka.actor.typed.ActorRef

object Device {
  sealed trait Command
  final case class ReadTemperature(replyTo: ActorRef[RespondTemperature]) extends Command
  final case class RespondTemperature(value: Option[Double])
}

さらにDeviceアクターを実装していきます。
Supervisorの実装時と同様に、Behaviorを生成できるようにしていきます。

Device.scala(最終版)
Device.scala
package practice.akka.iot

import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import practice.akka.iot.Device.Command

object Device {

  def apply(groupId: String, deviceId: String): Behavior[Command] =
    Behaviors.setup(context => new Device(context, groupId, deviceId))

  sealed trait Command
  final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(context: ActorContext[Command], groupId: String, deviceId: String)
    extends AbstractBehavior[Command](context) {

  import practice.akka.iot.Device._

  var lastTemperatureReading: Option[Double] = None

  context.log.info(s"Device actor $groupId-$deviceId started")

  override def onMessage(msg: Command): Behavior[Command] = {
    msg match {
      case ReadTemperature(requestId, replyTo) => {
        replyTo ! RespondTemperature(requestId, lastTemperatureReading)
        this
      }
    }
  }

  override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
    case PostStop =>
      context.log.info(s"Device actor $groupId-$deviceId started")
      this
  }
}

onMessageでは、ReadTemperatureのメッセージを受け取ったら、replyToで指定された先に直近の計測結果を返すようになっています。直近の計測結果は、変数lastTemperatureReadingとしてアクター内に状態として保存されています。Deviceアクターの実装前に確認した注意点のとおり、起動直後は温度の計測結果が取得できていない可能性があるのでNoneが設定されています。

唐突に登場するgroupIddeviceIdについてですが、今回のシステムは冒頭のヒエラルキの図のとおりM個のグループの中にあるN個のデバイスを対象にするシステムなので、デバイスを特定するための必要なIDを指定できるようにしているだけです。

アクターのテスト

Akkaにはテストキットが付属されているので、テストキットを使ってDeviceアクターのテストを書く。公式のとおりに書こうと思ったけど、すぐに真っ赤に・・・そっか、公式のTestingのページを参考にライブラリーを追加します。

build.sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.6.1" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test

これでテストが書けるようになりました。

DeviceSpec.scala
DeviceSpec.scala
package practice.akka.iot

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike

class DeviceSpec extends ScalaTestWithActorTestKit with WordSpecLike {

  import Device._

  "Device actor" must {

    "reply with empty reading if no temperature is known" in {

      val prob        = createTestProbe[RespondTemperature]()
      val deviceActor = spawn(Device("group", "device"))

      deviceActor ! Device.ReadTemperature(requestId = 42, prob.ref)
      val response = prob.receiveMessage()
      response.requestId should ===(42)
      response.value should ===(None)

    }
  }
}

ActorTestKitBase#createTestProbe

ActorTestKitBase.scala
  /**
   * See corresponding method on [[ActorTestKit]]
   */
  def createTestProbe[M](): TestProbe[M] = testKit.createTestProbe[M]()

ActorTestKitメソッドを見ろということなので、こちらも見てみます。

ActorTestKit.scala
  /**
   * Shortcut for creating a new test probe for the testkit actor system
   * @tparam M the type of messages the probe should accept
   */
  def createTestProbe[M](): TestProbe[M] = TestProbe()(system)

テストコード内のprobは、createTestProbeメソッドによって生成されたテスト用のメールボックスです。probからアクターの処理結果を返してもらえるようです。テストコードの中でも、prob.receiveMessage()で処理結果を返してもらっています。

ActorTestKitBase#spawn

ActorTestKitBase.scala
  /**
   * See corresponding method on [[ActorTestKit]]
   */
  def spawn[T](behavior: Behavior[T]): ActorRef[T] = testKit.spawn(behavior)

こちらもActorTestKitを見てみます。

ActorTestKit.scala
  /**
   * Spawn the given behavior. This is created as a child of the test kit
   * guardian
   */
  def spawn[T](behavior: Behavior[T]): ActorRef[T] =
    spawn(behavior, Props.empty)

  /**
   * Spawn the given behavior. This is created as a child of the test kit
   * guardian
   */
  def spawn[T](behavior: Behavior[T], props: Props): ActorRef[T] =
    Await.result(internalSystem.ask(ActorTestKitGuardian.SpawnActorAnonymous(behavior, _, props)), timeout.duration)

spawnBehaviorからアクターを生成する処理です。テストコードの中でも、spawnによって生成されたdeviceActorを使ってメッセージを送信しています。ReadTemperatureメッセージのreplyToprobを使用することでメッセージの返信先をテスト用のメールボックスにしているわけですね。

(再々)Deviceアクター

テストもとりあえず書けるようになったので、Deviceアクターのもう1つの機能である温度計測結果の保持を実現していきます。
ここでも分散環境での実行を意識して以下のように定義します。

Device.scala
final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded]) extends Command
final case class TemperatureRecorded(requestId: Long)
Device.scala(書き込みも可能なバージョン)
Device.scala
package practice.akka.iot

import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, PostStop, Signal }
import practice.akka.iot.Device.Command

object Device {

  def apply(groupId: String, deviceId: String): Behavior[Command] =
    Behaviors.setup(context => new Device(context, groupId, deviceId))

  sealed trait Command

  final case class ReadTemperature(requestId: Long, replyTo: ActorRef[RespondTemperature]) extends Command
  final case class RespondTemperature(requestId: Long, value: Option[Double])

  final case class RecordTemperature(requestId: Long, value: Double, replyTo: ActorRef[TemperatureRecorded])
      extends Command
  final case class TemperatureRecorded(requestId: Long)
}

class Device(context: ActorContext[Command], groupId: String, deviceId: String)
    extends AbstractBehavior[Command](context) {

  import practice.akka.iot.Device._

  var lastTemperatureReading: Option[Double] = None

  context.log.info(s"Device actor $groupId-$deviceId started")

  override def onMessage(msg: Command): Behavior[Command] = {
    msg match {
      case RecordTemperature(requestId, value, replyTo) => {
        context.log.info(s"Recorded temperature reading $value with $requestId")
        lastTemperatureReading = Some(value)
        replyTo ! TemperatureRecorded(requestId)
        this
      }
      case ReadTemperature(requestId, replyTo) => {
        replyTo ! RespondTemperature(requestId, lastTemperatureReading)
        this
      }
    }
  }

  override def onSignal: PartialFunction[Signal, Behavior[Command]] = {
    case PostStop =>
      context.log.info(s"Device actor $groupId-$deviceId started")
      this
  }
}

これにテストを加えます。

DeviceSpec.scala
  val recordProbe = createTestProbe[TemperatureRecorded]()
  val readProbe   = createTestProbe[RespondTemperature]()
  val deviceActor = spawn(Device("group", "device"))

  deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
  recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))

  deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
  val response1 = readProbe.receiveMessage()
  response1.requestId should ===(2)
  response1.value should ===(Some(24.0))

24.0で書き込んだ後にReadTemperatureメッセージで温度計測結果を取得したら24.0が返ってくることを確認できました。

DeviceSpec.scala(書き込みテスト追加バージョン)
DeviceSpec.scala
package practice.akka.iot

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.WordSpecLike

class DeviceSpec extends ScalaTestWithActorTestKit with WordSpecLike {

  import Device._

  "Device actor" must {

    "reply with empty reading if no temperature is known" in {

      val prob        = createTestProbe[RespondTemperature]()
      val deviceActor = spawn(Device("group", "device"))

      deviceActor ! Device.ReadTemperature(requestId = 42, prob.ref)
      val response = prob.receiveMessage()
      response.requestId should ===(42)
      response.value should ===(None)

    }

    "reply with latest temperature reading" in {

      val recordProbe = createTestProbe[TemperatureRecorded]()
      val readProbe   = createTestProbe[RespondTemperature]()
      val deviceActor = spawn(Device("group", "device"))

      deviceActor ! Device.RecordTemperature(requestId = 1, 24.0, recordProbe.ref)
      recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 1))

      deviceActor ! Device.ReadTemperature(requestId = 2, readProbe.ref)
      val response1 = readProbe.receiveMessage()
      response1.requestId should ===(2)
      response1.value should ===(Some(24.0))

      deviceActor ! Device.RecordTemperature(requestId = 3, 55.0, recordProbe.ref)
      recordProbe.expectMessage(Device.TemperatureRecorded(requestId = 3))

      deviceActor ! Device.ReadTemperature(requestId = 4, readProbe.ref)
      val response2 = readProbe.receiveMessage()
      response2.requestId should ===(4)
      response2.value should ===(Some(55.0))
    }
  }
}

ここまでのまとめとこれからやること

  • Supervisorを作る
  • DeviceManagerを作る
  • DeviceGroupを作る
  • Deviceを作る
  • アクターのテストを書く
  • Behavior[T]Tが何を意味するのかを明らかにする
10
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
10