Scala
Akka
actor

Akka-Persistenceコトハジメ

More than 1 year has passed since last update.

Akka-Persistenceはstatefulなactorの内部状態を永続化することによって、生成時、再開時、migration時などに状態を復旧することを可能とするもの。

内部状態に至るまでのイベント(メッセージ)を永続化しておくことで、そのイベントを再生すれば全く同じ状態を再現させることが出来るため、CQRS/ESの文脈で最近はよく出てきている(気がする)。

build.sbt

まずはbuild.sbtに依存を追加する。
バージョンは適宜変更すること。(Release Versions)
サンプルでの永続化先としてLevelDBを利用する

build.sbt
val akkaVersion = "2.4.9-RC2"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence" % akkaVersion,

  "org.iq80.leveldb"  % "leveldb"          % "0.7",
  "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8"
)

fork := true

Akka-Persistenceをとりあえず使う

基本的にはakka.actor.Actorと同様な使い方が出来て、akka.persistence.PersistentActorをextendsしていくつかメソッドを実装すれば良い。

状態とメッセージを定義する

まずActorに送るためのメッセージと、Actorの内部状態となるクラス群を用意する。
今回は内部状態としてSeq[Int]を持つこととする。

// メッセージ
private sealed trait MyCommand
private case object MySnapshot extends MyCommand
private case object MyPrint extends MyCommand
private case class AppendCommand(data: Int) extends MyCommand

// 内部状態
private case class MyState(events: Seq[Int] = Seq.empty) {
  def updated(evt: AppendCommand) = copy(evt.data +: events)
  def state = toString
  override def toString: String = events.reverse.mkString(" :: ")
}

PersistentActorを実装する

定義した内部状態を保持し、メッセージを受け付けるActorをPersistentActorをextendsしつつ実装する。

private class ExamplePersistentActor extends PersistentActor {
  // メッセージを永続化する際のID
  override def persistenceId = "example-id"

  // 内部状態
  private var state = MyState()

  // 状態を復元する際に実行される
  override def receiveRecover: Receive = {
    case command: AppendCommand =>
      // メッセージからの復元
      state = state.updated(command)
    case SnapshotOffer(_, snapshot: MyState) =>
      // Snapshotからの復元
      state = snapshot
  }

  // Actorのreceiveにあたるもの。何かしらのcommandに対する処理
  override def receiveCommand: Receive = {
    case command: AppendCommand  =>
      // メッセージを永続化している
      persist(command) { _command =>
        state = state.updated(_command)
      }
    case MySnapshot => saveSnapshot(state)
    case MyPrint    => println(state.state)
  }

}

実装としてはおおよそこれだけで良い。

実行してみる

とりあえずActorを実行するためのAppを用意する。

object PersistentActorExample extends App {

  val system = ActorSystem("PersistentActorExample")
  val persistentActor = system.actorOf(Props[ExamplePersistentActor], "my-example")

  // send messages to target actor
  persistentActor ! AppendCommand(-1)
  persistentActor ! MySnapshot
  persistentActor ! AppendCommand(3)
  persistentActor ! MyPrint

  Thread.sleep(1000)
  system.terminate()
}

これだけで実行すると以下のようにakka/reference.confを見ろ、というエラーメッセージが吐かれて永続化されない。

confを追加する

必要っぽいconfを適当にapplication.confに記述する。

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

// 永続化用ファイルの置き場所
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

// build.sbtに`fork := true`を書かないなら必要
akka.persistence.journal.leveldb.native = off

これで繰り返し実行してみると[info] -1 :: 3 :: -1 :: 3 :: -1 :: 3のように表示されて、永続化に成功していることが分かる。

Serializerについて

ドキュメントはこちら
Serialization — Akka Documentation

先ほどのAppをsbt runしてみると以下の様なwarnが出力される。

Using the default Java serializer for class [net.petitviolet.ex.persistence.practice.AppendCommand] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'

デフォルトで使用されるJavaのserializerはパフォーマンス上おすすめ出来ないとのこと。
とりあえずwarnを抑制したければ、メッセージにならってapplication.confを修正すればよい。

akka.actor.warn-about-java-serializer-usage = false

serializerとしてkryoを使う

Javaのserializer以外の選択肢としてprotocol bufferやkryoが候補として挙げられる。(自作も可)
protocol bufferは設定が面倒らしいので、kryoを使ってみる。

まずbuild.sbtのlibraryDependenciesにkryoなserializerを追加する。

libraryDependencies ++= Seq(
  ...
  "com.github.romix.akka" %% "akka-kryo-serialization" % "0.4.1",
  ...
)

次にapplication.confにserializer周りの設定を追加。

akka {
  actor {
    warn-about-java-serializer-usage = true

    serializers {
      // `kryo`という名前で`KryoSerializer`を指定
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }

    serialization-bindings {
      // kryoでserializeする対象となるクラスを指定する
      "net.petitviolet.ex.persistence.practice.AppendCommand" = kryo
      "net.petitviolet.ex.persistence.practice.MyState" = kryo
    }

    kryo {
      type = "graph"
      idstrategy = "default"
    }
  }
}

これでkryoを使ったserializeが出来る。
akka.actor.kryoの設定については以下を参照。
Configuration of akka-kryo-serialization