とりあえず使えるようにする
Akka-PersistenceでKryoをとりあえず使えるようにする設定については以前書いた。
Akka-Persistenceコトハジメ#serializerとしてkryoを使う
serializer選択について
Akka-PersistenceのPersistentActor#persist
で永続化するメッセージはserializeしてから永続化している。
その際に使用できるSerializerを公式が提示している。
External Akka Serializers
以下のページでもserializer選択は大事だと強調しており、こちらではGoogle Protobuf/Apache Thrift/Apache Avroが提示されている。
Picking the right serialization format
Protobufは何となく使うのがめんどくさそうなイメージがあったので楽そうなKryoを使う。
他にもJSONフォーマットなSerializer実装としてscalapenos/staminaがある。
serializerのカスタマイズ方法
serializerのカスタマイズ方法は2パターンありそうだった。
- akkaのやり方に自分で従うパターン
-
romix/akka-kryo-serializationをがっつり使うパターン
- kryo以外についてはここでは触れない
それぞれやってみる。
serialize(persist)する型
以下のサンプルで使う型。
Todoアプリにおいて新しくTask
を追加するRegister
メッセージをAkka-Persistenceでpersist
する対象とする。
sealed trait CommandEvent
case class Register(task: Task) extends CommandEvent
Task
の定義はこんな感じでid
とtitle
とstate
を持つ。
若干DDDっぽくそれぞれにちゃんと型を用意した。
case class Task(id: TaskId, title: TaskTitle, state: TaskState = TaskState.Todo)
case class TaskId(value: String) extends AnyVal
case class TaskTitle(value: String) extends AnyVal
// enumlike
sealed abstract class TaskState(val value: Int) extends Serializable
object TaskState {
case object Completed extends TaskState(1)
case object Todo extends TaskState(0)
private val values = Completed :: Todo :: Nil
// [[TaskState]] factory
val from: Int => TaskState = n => values.find(_.value == n).get
}
akkaのやり方に従って独自serializerを実装する
Customizationを参照。
akka.serialization.Serializer
を実装してやれば良い。
twitter/chillを使う
Kryoを直接使うのではなぜかうまくいかなかったので、twitter/chillを利用する。
参考:takezoux2のブログ: chillを使ってクラス定義変更に強いシリアライズを実現
build.sbtに依存を追加。
libraryDependencies += "com.twitter" %% "chill-bijection" % "0.8.0"
twitter/chillを使ってシンプルにRegister
のSerializer
を実装する。
class RegisterKryoSerializer extends akka.serialization.Serializer {
private val CLAZZ = classOf[Register]
override def identifier: Int = 1000
override def includeManifest: Boolean = true
override def toBinary(o: AnyRef): Array[Byte] = {
// これではうまくいかなかった
// val kryo = new Kryo()
// val baos = new ByteArrayOutputStream()
// kryo.writeObject(new Output(baos), o)
// baos.toByteArray
KryoInjection.apply(o)
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
manifest match {
case Some(CLAZZ) =>
// これではうまくいかなかった
// val kryo = new Kryo()
// val input = new Input(new ByteArrayInputStream(bytes))
// kryo.readObject(input, CLAZZ)
KryoInjection.invert(bytes).get
case _ => sys.error(s"unknown manifest: $manifest")
}
}
}
このようにKryoInjection#apply
とKryoInjection#invert
を使ったらSerializer
はお手軽に実装できる。
しかしこれではKryoInjection
にserialize/deserializeを任せっきりになってしまい、そもそもやりたかったカスタマイズが出来ない。
なので、次のセクションでうまくいったもう一つの方法について書く。
コメントにも書いたように自分なりにArray[Byte]
とRegister
のserialize/deserializeを実装することがうまく出来なかった。
その際のStackTraceには以下のようなものが出力される。
[ERROR] swallowing exception during message send
com.esotericsoftware.kryo.KryoException: Buffer underflow.
at com.esotericsoftware.kryo.io.Input.require(Input.java:199)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:470)
...
これが解決出来たら教えて下さい...。
romix/akka-kryo-serializationを使う
こちらの場合はcom.esotericsoftware.kryo.Serializer
を実装すればよい。
// (`accesptsNull` = false, `immutable` = true)
abstract class KryoSerializerBase[T] extends com.esotericsoftware.kryo.Serializer[T](false, true)
class RegisterKryoSerializer extends KryoSerializerBase[Register] {
override def write(kryo: Kryo, output: Output, `object`: Register): Unit = {
output.writeString(`object`.task.id.value)
output.writeString(`object`.task.name.value)
output.writeInt(`object`.task.state.value)
}
override def read(kryo: Kryo, input: Input, `type`: Class[Register]): Register = {
val taskId = TaskId(input.readString)
val taskTitle = TaskTitle(input.readString)
val taskState = TaskState.from(input.readInt)
Register(Task(taskId, taskTitle, taskState))
}
}
ここで実装したRegisterKryoSerializer
を使うためには以下の様に実行できればいいが、
kryo
インスタンスをどうやって手に入れるかが難しい。
kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
ここにあるようにKryoのinitializerを独自に実装して初期化処理に追加すればよい。
How to create a custom initializer for Kryo
initializerを用意する
extendsするtraitなどはなく、customize(kryo: Kryo)
なメソッドを持つclassを実装するだけ。
リフレクションを使って実現されていてこの辺りにその実装がある。
kryo
インスタンスのaddDefaultSerializer
を実行するだけのclassを用意した。
class CustomKryoSerializerInitializer {
def customize(kryo: Kryo) = {
kryo.setDefaultSerializer(classOf[CompatibleFieldSerializer[Any]])
kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
println(s"after - ${kryo.getSerializer(classOf[Register])}")
}
}
initializerをconfに追加する
application.confに追加する。
該当箇所はこんな感じ。
extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]
akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
warn-about-java-serializer-usage = false
serialization-bindings {
"java.io.Serializable" = java
"net.petitviolet.ex.persistence.task.actor.Register" = kryo
}
kryo {
type = "graph"
idstrategy = "default"
kryo-trace = true
// ここ!
kryo-custom-serializer-init = "net.petitviolet.ex.persistence.task.model.support.CustomKryoSerializerInitializer"
}
}
}
これでアプリケーションを起動すると以下のようなログが出力される。
[info] 00:00 TRACE: [kryo] Registration required: false
[info] 00:00 TRACE: [kryo] Register class name: net.petitviolet.ex.persistence.task.actor.Register (net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer)
[info] after - net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer@550c6251
kryo-custom-serializer-init
に用意したinitializerを設定してやればcustomize
が実行されて、Register
に対するserializerを登録する事が出来た。
感想
Akka-Persistenceを使うにあたって恐らく重要な意思決定をしなければならないSerializerの選択。
プロダクト開発においてはpersist
の対象となるイベントもリファクタリングしていくことが想定されるため、
先まで見据えてSerializerは導入段階から精査してきっちり実装していくことが必要になりそう。
ちなみにイベントのリファクタリングについてのAkka公式な記事。
Persistence - Schema Evolution
本当はここまでやりたかったが、Akka-PersistenceのSerializerとしてKryoは向いてないんじゃないかというところで挫折。
イベントのバージョニングに対応したKryoなSerializer実装はあるんだろうか...?