Akka-Persistenceはstatefulなactorの内部状態を永続化することによって、生成時、再開時、migration時などに状態を復旧することを可能とするもの。
内部状態に至るまでのイベント(メッセージ)を永続化しておくことで、そのイベントを再生すれば全く同じ状態を再現させることが出来るため、CQRS/ESの文脈で最近はよく出てきている(気がする)。
build.sbt
まずはbuild.sbtに依存を追加する。
バージョンは適宜変更すること。(Release Versions)
サンプルでの永続化先としてLevelDBを利用する。
val akkaVersion = "2.4.9-RC2"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"org.iq80.leveldb" % "leveldb" % "0.7",
"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
)
fork := true
Akka-Persistenceをとりあえず使う
基本的にはakka.actor.Actor
と同様な使い方が出来て、akka.persistence.PersistentActor
をextendsしていくつかメソッドを実装すれば良い。
状態とメッセージを定義する
まずActorに送るためのメッセージと、Actorの内部状態となるクラス群を用意する。
今回は内部状態としてSeq[Int]
を持つこととする。
// メッセージ
private sealed trait MyCommand
private case object MySnapshot extends MyCommand
private case object MyPrint extends MyCommand
private case class AppendCommand(data: Int) extends MyCommand
// 内部状態
private case class MyState(events: Seq[Int] = Seq.empty) {
def updated(evt: AppendCommand) = copy(evt.data +: events)
def state = toString
override def toString: String = events.reverse.mkString(" :: ")
}
PersistentActorを実装する
定義した内部状態を保持し、メッセージを受け付けるActorをPersistentActor
をextendsしつつ実装する。
private class ExamplePersistentActor extends PersistentActor {
// メッセージを永続化する際のID
override def persistenceId = "example-id"
// 内部状態
private var state = MyState()
// 状態を復元する際に実行される
override def receiveRecover: Receive = {
case command: AppendCommand =>
// メッセージからの復元
state = state.updated(command)
case SnapshotOffer(_, snapshot: MyState) =>
// Snapshotからの復元
state = snapshot
}
// Actorのreceiveにあたるもの。何かしらのcommandに対する処理
override def receiveCommand: Receive = {
case command: AppendCommand =>
// メッセージを永続化している
persist(command) { _command =>
state = state.updated(_command)
}
case MySnapshot => saveSnapshot(state)
case MyPrint => println(state.state)
}
}
実装としてはおおよそこれだけで良い。
実行してみる
とりあえずActorを実行するためのAppを用意する。
object PersistentActorExample extends App {
val system = ActorSystem("PersistentActorExample")
val persistentActor = system.actorOf(Props[ExamplePersistentActor], "my-example")
// send messages to target actor
persistentActor ! AppendCommand(-1)
persistentActor ! MySnapshot
persistentActor ! AppendCommand(3)
persistentActor ! MyPrint
Thread.sleep(1000)
system.terminate()
}
これだけで実行すると以下のようにakka/reference.confを見ろ、というエラーメッセージが吐かれて永続化されない。
confを追加する
必要っぽいconfを適当にapplication.confに記述する。
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
// 永続化用ファイルの置き場所
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
// build.sbtに`fork := true`を書かないなら必要
akka.persistence.journal.leveldb.native = off
これで繰り返し実行してみると[info] -1 :: 3 :: -1 :: 3 :: -1 :: 3
のように表示されて、永続化に成功していることが分かる。
Serializerについて
ドキュメントはこちら
Serialization — Akka Documentation
先ほどのAppをsbt run
してみると以下の様なwarnが出力される。
Using the default Java serializer for class [net.petitviolet.ex.persistence.practice.AppendCommand] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
デフォルトで使用されるJavaのserializerはパフォーマンス上おすすめ出来ないとのこと。
とりあえずwarnを抑制したければ、メッセージにならってapplication.confを修正すればよい。
akka.actor.warn-about-java-serializer-usage = false
serializerとしてkryoを使う
Javaのserializer以外の選択肢としてprotocol bufferやkryoが候補として挙げられる。(自作も可)
protocol bufferは設定が面倒らしいので、kryoを使ってみる。
まずbuild.sbtのlibraryDependencies
にkryoなserializerを追加する。
libraryDependencies ++= Seq(
...
"com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.1",
...
)
次にapplication.confにserializer周りの設定を追加。
akka {
actor {
warn-about-java-serializer-usage = true
serializers {
// `kryo`という名前で`KryoSerializer`を指定
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
// kryoでserializeする対象となるクラスを指定する
"net.petitviolet.ex.persistence.practice.AppendCommand" = kryo
"net.petitviolet.ex.persistence.practice.MyState" = kryo
}
kryo {
type = "graph"
idstrategy = "default"
}
}
}
これでkryoを使ったserializeが出来る。
akka.actor.kryo
の設定については以下を参照。
Configuration of akka-kryo-serialization