Posted at

Akkaによるアクターモデルことはじめ

More than 1 year has passed since last update.


Akkaによるアクターモデルプログラミング

PlayやLagomなどを勉強しているとよくよくAkkaというライブラリを使っているということが聞かれます。何やら敷居が高そうだな、とも思ったのですが、scalaでは標準的に取り入れられているプログラミングモデルのようですね。ちなみに日本語の書籍では「Akka実践バイブル」が詳しいとは思うのですが、ちょっと導入的なところが抜けてる気がしたので、基本的なところを書いてみます。


アクターモデルとアクター

いわゆるアクターモデルというのは、非同期のPUB/SUB通信を拡張したプログラムモデルということができそうです。MQTTとかを使っていると理解が早いですが、可能な限りモジュール(アクター)間を疎結合にすることで、スケーラビリティの向上と複雑性の低減を両立します。

アクターとは生成(create)、送信(send)、状態変化(become)、監督(supervise)という4つの操作を持つ軽量プロセスと定義できます。生成されたアクターは、それぞれにメッセージを送りあって、非同期に処理をするわけですが、ジャーナルといわれるメッセージリストを保持します。もしアクターがクラッシュした場合は、自身が保持するジャーナルから自分自身を復元します。これらのアクターの挙動は、そのアクターを生成した親アクターが監督し、不具合時の挙動などを定義することができます。なお、アクターは親子関係が定義された木構造のように記述することができます。この辺が単純なPUB/SUBモデルとは異なる考え方ですね。

また、生成されたアクターは並行処理されます。並列化と並行化は異なります。並列化はプロセスを同時に実行しますが、並行処理はプロセスを同時に実行することもできますし、またタイミング次第では処理がオーバーラップします。しかし必ず同時に実行する必要はないということです。アクターモデルを採用するメリットを端的に述べると、一般的な並列プログラムと違ってデッドロックが起きないこと、といえます。


アクターシステムの生成

アクターシステムは、アプリケーション内に1つ生成する、すべてのアクターの実行環境のようなもので、ここから木構造の根となるトップレベルアクターを生成できます。トップレベルアクターは子アクターを生成していきますが、それぞれのアクターへの参照はActorRefによって保持され、そこにメッセージを送ります。

ちなみに、アクターに送られたメッセージは、メールボックスと呼ばれるキューに一時保存されます。メールボックスからディスパッチャーを経由して、アクターにプッシュされるという構造になっています。

それでは実際にAkkaを使ってみます。


Hello World

プロジェクトのひな型が公式サイトからダウンロードできますが、まずはまっさらな状態から試してみましょう。sbt newを使って、プロジェクトのひな型を作ります。

sbt new scala/scala-seed.g8

次にakkaライブラリを追加します。ちなみに、以前に存在したscala.actorsパッケージはすでに無くなったようです。


build.sbt

import Dependencies._

lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.12.5",
version := "0.1.0-SNAPSHOT"
)),
name := "Hello",
libraryDependencies += scalaTest % Test,
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
)


まずは以下のようなコードで、Actorの生成について確認します。


ActorHierarchyExperiments.scala

import akka.actor.{ Actor, Props, ActorSystem }

import akka.event.Logging
import scala.io.StdIn

class PrintMyActorRefActor extends Actor {

val log = Logging(context.system, this)

override def preStart(): Unit = log.info("actor started")
override def postStop(): Unit = log.info("actor stopped")

override def receive: Receive = {
case "hello"
log.info("Hello world")
case "child" =>
val secondRef = context.actorOf(Props[PrintMyActorRefActor], "second-actor")
println(s"$secondRef")
secondRef ! "hello"
}
}

object ActorHierarchyExperiments extends App {
val system = ActorSystem("testSystem")

val firstRef = system.actorOf(Props[PrintMyActorRefActor], "first-actor")
// アクターにメッセージを送る
firstRef ! "hello"
firstRef ! "child"

println(">>> Press ENTER to exit <<<")
try StdIn.readLine()
finally system.terminate()
}


Actorはakka.actor.Actorクラスを拡張して作ります。ここではPrintMyActorRefActorです。

testSystemというアクターシステムを最初に作成し、そこからfirst-actorという名前でアクターを生成し、各種メッセージを送っています。ちなみに「!」はアクターに非同期でメッセージの送信を行うメソッドでtell()と等価です。同様に「?」はask()と等価で同期的にメッセージの送受信ができます。

Actor内では、prestart、preStopというメソッドがActorの生成時、消滅時に呼ばれます。Actorにはそうしたライフサイクルがあり、それぞれのフェーズで対応したメソッド(lifecycle hook)が呼ばれます。

上記を実行すると以下のような出力となります。Actor内でオーバーライドされているreceiveメソッドが、メッセージに応じて動作を変えているのが見て取れると思います。akka.event.Loggingを使っているので、利用しているディスパッチャ、メッセージを出力しているActorのパスまで見えています。何も設定していないと、デフォルトのディスパッチャが選ばれるようですね。

[info] Running ActorHierarchyExperiments

>>> Press ENTER to exit <<<
Actor[akka://testSystem/user/first-actor/second-actor#-562396395]
[INFO] [08/02/2018 20:29:11.651] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/first-actor] actor started
[INFO] [08/02/2018 20:29:11.651] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/first-actor] Hello world
[INFO] [08/02/2018 20:29:11.652] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/first-actor/second-actor] actor started
[INFO] [08/02/2018 20:29:11.653] [testSystem-akka.actor.default-dispatcher-5] [akka://testSystem/user/first-actor] Receive stop
[INFO] [08/02/2018 20:29:11.653] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/first-actor/second-actor] Hello world
[INFO] [08/02/2018 20:29:11.671] [testSystem-akka.actor.default-dispatcher-4] [akka://testSystem/user/first-actor/second-actor] actor stopped
[INFO] [08/02/2018 20:29:11.677] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/first-actor] actor stopped

まずはここまで。