LoginSignup
1
0

More than 5 years have passed since last update.

Akka実践バイブルをゆっくり読み解く 第8章アクターの構造パターン

Last updated at Posted at 2018-04-19

Akka実践バイブルをゆっくり読み解く企画の第8章です。
何やかんや忙しくて、少し間が空いてしまいました。

第8章 アクターの構造パターン

エンタプライズインテグレーションパターンが紹介されています。エンタプライズインテグレーションパターンとは、アプリケーション間を統合するためのパターン、プログラムレベルの実装テクニックとしてのデザインパターンよりも、アプリケーションをアーキテクチャレベルで見た時の実践的実装パターン、といったところでしょうか?

パイプ&フィルター

Unix系OSでコマンドをパイプでつないでいくのと同様の仕組みを適用する。Unixコマンドの場合は、cat xxx.txt | lessみたいに、catlessは互いに独立したコマンドとして成立している。これと同様に、異なる処理と処理をうまく連動させることを目標とする。
Scalaのような型を持った言語でこれを実現するためには、IN/OUTに共通の型を用いることで異なる処理と処理の間のI/F問題を解決することができる。書籍では以下のようなフィルタの例が紹介されている。

共通の型
case class Photo(license: String, speed: Int)

次に、共通の型を入出力の型とする処理を実装する。

スピードチェック
class SpeedFilter(minSpeed: Int, pipe: ActorRef) extends Actor {
  def receive = {
    // Photo型のメッセージを受信することで処理を開始する
    case msg: Photo =>
      if (msg.speed > minSpeed)
        // パイプ先にメッセージを送信
        pipe ! msg
  }
}
ナンバープレートチェック
class LicenseFilter(pipe: ActorRef) extends Actor {
  def receive = {
    // Photo型のメッセージを受信することで処理を開始する
    case msg: Photo =>
      if (!msg.license.isEmpty)
        // パイプ先にメッセージを送信
        pipe ! msg
  }
}

あとは処理したい順にActorRefを渡せるようにActorRefを作っていく。

// 最後に呼出すアクターのActorRef
val endProbe = TestProbe()

// 2番めに呼出すアクターのActorRef
val speedFilterRef = system.actorOf(Props(new SpeedFilter(50, endProbe.ref)))

// 1番めに呼出すアクターのActorRef
val licenseFilterRef = system.actorOf(Props(new LicenseFilter(speedFilterRef)))

// メッセージ送信
val msg = new Photo("XXXXXX", 60)  // スピード60オーバーが処理対象
licenseFilterRef ! msg

パイプ&フィルターのパターンを使用すると、状況に応じて柔軟に処理順を入れ替えることが容易となる。
LicenseFilterがガバガバで90%のメッセージを次に送信する(あまり処理対象が減らない)場合に、SpeedFilterが50%のメッセージを次への送信対象とするのであれば、先にSpeedFilterを実施してしまった方が効率的である。そのような場合に、ActorRefの設定順を変更するだけで柔軟に処理順を変更することができる。

スキャッタギャザー

パイプ&フィルターの処理は直列な処理だが、スキャッタギャザーの処理は並列な処理を実現することができる。並列に処理を行う場合の動機として、以下の2つが考えられる。

競争タスク

同一目的の異なる実装にて並列で処理を行い、最も期待に則した処理の結果のみを採用する。

  • リストを複数のソートアルゴリズムで並列でソートし、最も速いソート結果を採用する。
  • 複数の仕入先に並列で同一商品の値段問合わせを行い、最も値段の安い結果を採用する。

並列協調処理

1つの目的を達成するために処理を分割して並列で実行した上で、処理結果を回収して1つの処理結果として合成する。

  • 社員の情報と会社の情報を並列で取得し、2つの情報を合成させて名刺の情報を作成する。

Akkaの並列タスク

スキャッタギャザーの説明の中で『並列』というキーワードが出てくるが、Akkaで並列処理を行い、その処理結果を1箇所で収集するには『スキャッタコンポーネント』と『ギャザーコンポーネント』を用いる。それぞれを個別で使用することもできるが、スキャッタコンポーネントによって処理を分岐し、ギャザーコンポーネントによって処理を合流させることで、並列処理を実現することができる。

受信者リストパターンによるスキャッタコンポーネント

メッセージの受信先を予め設定しておくことで、1つのメッセージを複数のアクターに送信することができるようにする。
以下のクラスでは、メッセージを受信したらコンストラクタで設定されたSeq[ActorRef]に対してメッセージを送信する。

RecipientList.scala
class RecipientList(recipientList: Seq[ActorRef]) extends Actor {
  def receive = {
    case msg: AnyRef => recipientList.foreach(_ ! msg)
  }
}

RecipientListを利用するために、Seq[ActorRef]を使ってアクターを生成する。

// 対象のActorRefを準備
val endProbe1 = TestProbe()
val endProbe2 = TestProbe()
val endProbe3 = TestProbe()
val list = Seq(endProbe1.ref, endProbe2.ref, endProbe3.ref)

// 対象のSeqをコンストラクタに設定して、アクターを生成する
val actorRef = system.actorOf(Props(new RecipientList(list)))

actorRefにメッセージを送信することで、RecipientListSeqに格納しているActorRefに対してメッセージが送信されるようになる。

アグリゲータパターンによるギャザーコンポーネント

スキャッタコンポーネントによって1回のメッセージ送信で複数のアクターにメッセージを送信できるようにはなったので、ギャザーコンポーネントでそれを回収する。

まずは正常系

Aggregator.scala
class Aggregator(timeout: FiniteDuration, pipe: ActorRef) extends Actor {

  // 受信したメッセージを格納するためのリスト
  val messages = new ListBuffer[PhotoMessage]

  def receive = {
    case rcvMsg: PhotoMessage => {
      messages.find(_.id == rcvMsg.id) match {
        // 既に自アクター内に同一IDのPhotoMessageが存在する場合はデータをマージする
        case Some(alreadyRcvMsg) => {
          val newCombinedMsg = new PhotoMessage(
            rcvMsg.id,
            rcvMsg.photo,
            rcvMsg.creationTime.orElse(alreadyRcvMsg.creationTime),  // 今回受信したデータがNoneの場合は、受信済のデータに必要な情報が格納されているはず
            rcvMsg.speed.orElse(alreadyRcvMsg.speed))                // 同上
          // 指定された先のアクターにマージしたデータを送信する
          pipe ! newCombinedMsg
          // 処理済のデータはリストから削除
          messages -= alreadyRcvMsg
        }

        // 初めて受信するIDの場合はリストに格納する
        case None => {
          messages += rcvMsg
        }
      }
    }
  }
}

メッセージを回収するために、合流先のActorRefを使ってAggregatorを生成する。

val endProbe = TestProbe()

// 合流先のアクター
val actorRef = system.actorOf(Props(new Aggregator(timeout, endProbe.ref)))

// ID:id1のメッセージを送信する
val photoStr = ImageProcessing.createPhotoString(new Date(), 60)
val msg1 = PhotoMessage("id1",photoStr,Some(new Date()),None)
actorRef ! msg1

// 続けて、ID:id1のメッセージを再度送信する
val msg2 = PhotoMessage("id1",photoStr,None,Some(60))
actorRef ! msg2

// 期待する結果はmsg1とmsg2がマージされた以下のようなデータ
val combinedMsg = PhotoMessage("id1",photoStr,msg1.creationTime,msg2.speed)

異常時はどうする?

指示元のアクターから複数のアクターにメッセージを送信した後、想定外の何かが発生した場合のことも考えないといけない。書籍では、以下の例が取り上げられている。

  • タイムアウト
  • 例外発生

タイムアウトの場合は、ペアとなるデータを受信待ちしているデータがListBufferに保持し続けられてしまうこととなる。例外が発生した場合は、ペア待ちのデータが消失してしまう可能性があるため、こちらもやはり適切なペリングができなくなってしまう。

タイムアウトへの対応

1つ目のメッセージを受信してから指定時間内にペアとなるメッセージを受信できなかった場合に、ListBuffer内に蓄積していたデータを削除するように実装する。

Aggregator.scala(タイムアウトへの対応)
class Aggregator(timeout: FiniteDuration, pipe: ActorRef) extends Actor {

  val messages = new ListBuffer[PhotoMessage]
  implicit val ec = context.system.dispatcher

  def receive = {
    case rcvMsg: PhotoMessage => {
      // 〜省略〜
    }
    // タイムアウト発生時の処理
    case TimeoutMessage(rcvMsg) => {
      messages.find(_.id == rcvMsg.id) match {
        // ペアとなるメッセージを受信できなかったデータをListBufferから削除する
        case Some(alreadyRcvMsg) => {
          // ペアリングできなかったデータを合流先に通知する
          pipe ! alreadyRcvMsg
          // ListBufferからは削除する
          messages -= alreadyRcvMsg
        }
        // Noneの場合は、時差で既に処理が行われていたのかも。処理はなし。
        case None => //message is already processed
      }
    }
  }
}

例外への対応

処理中に例外が発生した場合は、第4章に出てきたpreStartを使用して対応する。
例外が発生してそのアクターをクラッシュさせてしまう前に、現在格納しているListBufferのデータを別のAggregatorアクターに送信してアクターの状態を維持させるようにして対応する。

Aggregator.scala(例外への対応)
class Aggregator(timeout: FiniteDuration, pipe: ActorRef) extends Actor {

  val messages = new ListBuffer[PhotoMessage]

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    super.preRestart(reason, message)
    // 現在蓄積しているデータを自アクターに送信
    messages.foreach(self ! _)
    messages.clear()
  }

  def receive = {
    case rcvMsg: PhotoMessage => {
      // 〜省略〜
    }
    case TimeoutMessage(rcvMsg) => {
      // 〜省略〜
    }
  }
}

ルーティングスリップ

ルーティングスリップというのは『回覧票』を指すらしい。
目的別のアクターをそれぞれ作成してメッセージ送信先を動的に操作することで、必要最小限のメッセージングルートで処理を完結させることができる。パイプ&フィルターパターンの時と同様に、目的別に小さいアクターを同一インタフェースで独立に作成する必要がある。

RouteSlip.scala(タスクを捌く部分)
// 車のオプションのラインナップ
object CarOptions extends Enumeration {
  val CAR_COLOR_GRAY, NAVIGATION, PARKING_SENSORS = Value
}

// オプションを順に格納したコレクション
case class Order(options: Seq[CarOptions.Value])

// 標準装備の車
case class Car(color: String = "", hasNavigation: Boolean = false, hasParkingSensors: Boolean = false)

// 実装するオプションをメッセージで渡す際の共通フォーマットとしてのcase class
case class RouteSlipMessage(routeSlip: Seq[ActorRef], message: AnyRef)

trait RouteSlip {

  // 次タスクへのメッセージ送信
  def sendMessageToNextTask(routeSlip: Seq[ActorRef], message: AnyRef) {
    // コレクションから先頭のタスク(次のタスク)を取り出す
    val nextTask = routeSlip.head
    // 残りのタスク
    val newSlip = routeSlip.tail

    if (newSlip.isEmpty) {
      // 最終タスクの場合は回覧票を付与せずにメッセージを送信
      nextTask ! message
    } else {
      // 後続処理がある場合は回覧票を付与してメッセージを送信
      nextTask ! RouteSlipMessage(routeSlip = newSlip, message = message)
    }
  }
}

実際に車にオプションを実装する処理は以下の通り。タスクが行われるにつれ、メッセージ内に格納されている車が標準装備からオプション付きに進化していく。

RouteSlip.scala(タスクの実行部分)
// 塗装のタスク
class PaintCar(color: String) extends Actor with RouteSlip {
  def receive = {
    case RouteSlipMessage(routeSlip, car: Car) => {
      sendMessageToNextTask(routeSlip, car.copy(color = color))
    }
  }
}

// ナビ取り付けのタスク
class AddNavigation() extends Actor with RouteSlip {
  def receive = {
    case RouteSlipMessage(routeSlip, car: Car) => {
      sendMessageToNextTask(routeSlip, car.copy(hasNavigation = true))
    }
  }
}

// パーキングセンサー取り付けのタスク
class AddParkingSensors() extends Actor with RouteSlip {
  def receive = {
    case RouteSlipMessage(routeSlip, car: Car) => {
      sendMessageToNextTask(routeSlip, car.copy(hasParkingSensors = true))
    }
  }
}

最後に処理するタスクを設定する。

RouteSlip.scala(タスクを設定する部分)
class SlipRouter(endStep: ActorRef) extends Actor with RouteSlip {

  // ボディカラーは黒
  val paintBlack = context.actorOf(Props(new PaintCar("black")), "paintBlack")
  // ボディカラーはグレー
  val paintGray = context.actorOf(Props(new PaintCar("gray")), "paintGray")
  // ナビ付き
  val addNavigation = context.actorOf(Props[AddNavigation], "navigation")
  // パーキングセンサー付き
  val addParkingSensor =  context.actorOf(Props[AddParkingSensors], "parkingSensors")

  def receive = {
    // タスクを受け付けたら、それを捌いてくれるアクターにメッセージを送信する
    case order: Order => {
      val routeSlip = createRouteSlip(order.options)
      // 車は全くの標準装備状態からスタート
      sendMessageToNextTask(routeSlip, new Car)
    }
  }

  // 指定されたオプションに応じたActorRefを格納したSeqを生成する
  private def createRouteSlip(options: Seq[CarOptions.Value]): Seq[ActorRef] = {

    val routeSlip = new ListBuffer[ActorRef]

    //car needs a color
    if (!options.contains(CarOptions.CAR_COLOR_GRAY)) {
      routeSlip += paintBlack
    }
    options.foreach {
      case CarOptions.CAR_COLOR_GRAY  => routeSlip += paintGray
      case CarOptions.NAVIGATION      => routeSlip += addNavigation
      case CarOptions.PARKING_SENSORS => routeSlip += addParkingSensor
      case other                      => //do nothing
    }

    // 全てのタスクが完了したらメッセージを送信する先を設定する
    routeSlip += endStep
    routeSlip
  }
}

SlipRouterにメッセージを送信する際はOrderを渡すこととなるが、その内容によって仕上がる車の内容が変化する。

標準装備の車を作成する
slipRouter ! new Order(Seq())
フルカスタムな車を作成する
slipRouter ! new Order(Seq(CarOptions.CAR_COLOR_GRAY, CarOptions.NAVIGATION, CarOptions.PARKING_SENSORS))

このように、最初に作成するOrderの内容を動的に変更することで実行する処理を任意に決定することができる。複雑な処理のフローもメッセージフローの先頭で指定するため、意図しない処理が実行されてデバッグに困ることも無さそうで良さげなカンジ。

個人的まとめ

いずれのパターンも、考え方/実装自体は慣れればそれほど複雑ではなさそう。それでいて、アクターたちに複雑な処理を実行させるための制御が可能となるためとても便利そう。書籍の口ぶりではエンタープライズインテグレーションパターンには他にもパターンがあるっぽいので調べてみたら、めっちゃ大量のパターンがあった。Akka自体の理解がもう少し上がったらこいつの翻訳も挑戦したいなー。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0