Posted at

Akka-PersistenceのserializerとしてKryoを使う

More than 1 year has passed since last update.


とりあえず使えるようにする

Akka-PersistenceでKryoをとりあえず使えるようにする設定については以前書いた。

Akka-Persistenceコトハジメ#serializerとしてkryoを使う


serializer選択について

Akka-PersistenceのPersistentActor#persistで永続化するメッセージはserializeしてから永続化している。

その際に使用できるSerializerを公式が提示している。

External Akka Serializers

以下のページでもserializer選択は大事だと強調しており、こちらではGoogle Protobuf/Apache Thrift/Apache Avroが提示されている。

Picking the right serialization format

Protobufは何となく使うのがめんどくさそうなイメージがあったので楽そうなKryoを使う。

他にもJSONフォーマットなSerializer実装としてscalapenos/staminaがある。


serializerのカスタマイズ方法

serializerのカスタマイズ方法は2パターンありそうだった。


  • akkaのやり方に自分で従うパターン


  • romix/akka-kryo-serializationをがっつり使うパターン


    • kryo以外についてはここでは触れない



それぞれやってみる。


serialize(persist)する型

以下のサンプルで使う型。

Todoアプリにおいて新しくTaskを追加するRegisterメッセージをAkka-Persistenceでpersistする対象とする。

sealed trait CommandEvent

case class Register(task: Task) extends CommandEvent

Taskの定義はこんな感じでidtitlestateを持つ。

若干DDDっぽくそれぞれにちゃんと型を用意した。

case class Task(id: TaskId, title: TaskTitle, state: TaskState = TaskState.Todo)

case class TaskId(value: String) extends AnyVal
case class TaskTitle(value: String) extends AnyVal

// enumlike
sealed abstract class TaskState(val value: Int) extends Serializable
object TaskState {
case object Completed extends TaskState(1)
case object Todo extends TaskState(0)

private val values = Completed :: Todo :: Nil
// [[TaskState]] factory
val from: Int => TaskState = n => values.find(_.value == n).get
}


akkaのやり方に従って独自serializerを実装する

Customizationを参照。

akka.serialization.Serializerを実装してやれば良い。


twitter/chillを使う

Kryoを直接使うのではなぜかうまくいかなかったので、twitter/chillを利用する。

参考:takezoux2のブログ: chillを使ってクラス定義変更に強いシリアライズを実現

build.sbtに依存を追加。

libraryDependencies += "com.twitter" %% "chill-bijection" % "0.8.0"

twitter/chillを使ってシンプルにRegisterSerializerを実装する。

class RegisterKryoSerializer extends akka.serialization.Serializer {

private val CLAZZ = classOf[Register]

override def identifier: Int = 1000

override def includeManifest: Boolean = true

override def toBinary(o: AnyRef): Array[Byte] = {
// これではうまくいかなかった
// val kryo = new Kryo()
// val baos = new ByteArrayOutputStream()
// kryo.writeObject(new Output(baos), o)
// baos.toByteArray

KryoInjection.apply(o)
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
manifest match {
case Some(CLAZZ) =>
// これではうまくいかなかった
// val kryo = new Kryo()
// val input = new Input(new ByteArrayInputStream(bytes))
// kryo.readObject(input, CLAZZ)
KryoInjection.invert(bytes).get
case _ => sys.error(s"unknown manifest: $manifest")
}
}
}

このようにKryoInjection#applyKryoInjection#invertを使ったらSerializerはお手軽に実装できる。

しかしこれではKryoInjectionにserialize/deserializeを任せっきりになってしまい、そもそもやりたかったカスタマイズが出来ない。

なので、次のセクションでうまくいったもう一つの方法について書く。

コメントにも書いたように自分なりにArray[Byte]Registerのserialize/deserializeを実装することがうまく出来なかった。

その際のStackTraceには以下のようなものが出力される。

[ERROR] swallowing exception during message send

com.esotericsoftware.kryo.KryoException: Buffer underflow.
at com.esotericsoftware.kryo.io.Input.require(Input.java:199)
at com.esotericsoftware.kryo.io.Input.readString(Input.java:470)
...

これが解決出来たら教えて下さい...。


romix/akka-kryo-serializationを使う

こちらの場合はcom.esotericsoftware.kryo.Serializerを実装すればよい。

// (`accesptsNull` = false, `immutable` = true)

abstract class KryoSerializerBase[T] extends com.esotericsoftware.kryo.Serializer[T](false, true)

class RegisterKryoSerializer extends KryoSerializerBase[Register] {
override def write(kryo: Kryo, output: Output, `object`: Register): Unit = {
output.writeString(`object`.task.id.value)
output.writeString(`object`.task.name.value)
output.writeInt(`object`.task.state.value)
}

override def read(kryo: Kryo, input: Input, `type`: Class[Register]): Register = {
val taskId = TaskId(input.readString)
val taskTitle = TaskTitle(input.readString)
val taskState = TaskState.from(input.readInt)
Register(Task(taskId, taskTitle, taskState))
}
}

ここで実装したRegisterKryoSerializerを使うためには以下の様に実行できればいいが、

kryoインスタンスをどうやって手に入れるかが難しい。

kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])

ここにあるようにKryoのinitializerを独自に実装して初期化処理に追加すればよい。

How to create a custom initializer for Kryo


initializerを用意する

extendsするtraitなどはなく、customize(kryo: Kryo)なメソッドを持つclassを実装するだけ。

リフレクションを使って実現されていてこの辺りにその実装がある

kryoインスタンスのaddDefaultSerializerを実行するだけのclassを用意した。

class CustomKryoSerializerInitializer {

def customize(kryo: Kryo) = {
kryo.setDefaultSerializer(classOf[CompatibleFieldSerializer[Any]])
kryo.addDefaultSerializer(classOf[Register], classOf[RegisterKryoSerializer])
println(s"after - ${kryo.getSerializer(classOf[Register])}")
}
}


initializerをconfに追加する

application.confに追加する。

該当箇所はこんな感じ。

extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]

akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
}
warn-about-java-serializer-usage = false
serialization-bindings {
"java.io.Serializable" = java
"net.petitviolet.ex.persistence.task.actor.Register" = kryo
}
kryo {
type = "graph"
idstrategy = "default"
kryo-trace = true
// ここ!
kryo-custom-serializer-init = "net.petitviolet.ex.persistence.task.model.support.CustomKryoSerializerInitializer"
}
}
}

これでアプリケーションを起動すると以下のようなログが出力される。


[info] 00:00 TRACE: [kryo] Registration required: false

[info] 00:00 TRACE: [kryo] Register class name: net.petitviolet.ex.persistence.task.actor.Register (net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer)

[info] after - net.petitviolet.ex.persistence.task.model.support.RegisterKryoSerializer@550c6251


kryo-custom-serializer-initに用意したinitializerを設定してやればcustomizeが実行されて、Registerに対するserializerを登録する事が出来た。


感想

Akka-Persistenceを使うにあたって恐らく重要な意思決定をしなければならないSerializerの選択。

プロダクト開発においてはpersistの対象となるイベントもリファクタリングしていくことが想定されるため、

先まで見据えてSerializerは導入段階から精査してきっちり実装していくことが必要になりそう。

ちなみにイベントのリファクタリングについてのAkka公式な記事。

Persistence - Schema Evolution

本当はここまでやりたかったが、Akka-PersistenceのSerializerとしてKryoは向いてないんじゃないかというところで挫折。

イベントのバージョニングに対応したKryoなSerializer実装はあるんだろうか...?