1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Akka Persistence (kotlin)

Posted at

依存関係

def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation platform("com.typesafe.akka:akka-bom_${versions.ScalaBinary}:2.6.19")

  implementation "com.typesafe.akka:akka-persistence-typed_${versions.ScalaBinary}"
  testImplementation "com.typesafe.akka:akka-persistence-testkit_${versions.ScalaBinary}"
}


イントロダクション

AkkaPersistenceはアクターの状態を保持し、JVMのクラッシュ・スーパーバイザによる再起動・クラスタ内でノード障害時などにアクターを回復させるためのものです。

AkkaPersistenceの根底となる考え方にイベントソーシングがあります。
これはアクターに行われた変更(この場合イベントと言う)だけを積みかせて保存していき、アクター内の実際の状態は保存しないというものです。
一般的に後者はステートソーシングと呼ばれ一般的なCRUDなシステムのデータ保存方法です。

なんでステートソーシングではなくイベントソーシングなんて面倒くさい手法を使うかというと、イベントソーシングはデータストアにイベントを追加しかしないので、
データストアのトランザクションロックなどが不要になりスケーラビリティを確保しやすいという利点があり、マイクロサービスなどの分散システムととても相性が良いです。

AkkaPersistenceを理解するにはこのイベントソーシングの考えが解っていないとかなりきついと思うので、イベントソーシングがよくわからない人は
以下の資料を読んでからまた見たほうが幸せになれると思います。

イベントソーシングについてはこの2つがとても解りやすかったです!



とりあえずコードを見てみる

説明だけ聞いても意味不明だと思うので実際にコードで見ていきましょう。
これはEventSourcedBehaviorを継承させたエラーが出ない最小構成のアクターコードです。

import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.javadsl.CommandHandler
import akka.persistence.typed.javadsl.EventHandler
import akka.persistence.typed.javadsl.EventSourcedBehavior

class MyPersistentBehavior private constructor(
    persistenceId: PersistenceId
) : EventSourcedBehavior<MyPersistentBehavior.Command, MyPersistentBehavior.Event, MyPersistentBehavior.State>(persistenceId) {
    interface Command
    interface Event
    class State

    override fun emptyState(): State {
        return State()
    }

    override fun commandHandler(): CommandHandler<Command, Event, State> {
        return CommandHandler<Command, Event, State> { state, command ->
            throw RuntimeException(
                "TODO: process the command & return an Effect"
            )
        }
    }

    override fun eventHandler(): EventHandler<State, Event> {
        return EventHandler<State, Event> { state, event ->
            throw RuntimeException(
                "TODO: process the event return the next state"
            )
        }
    }

    companion object {
        fun create(persistenceId: PersistenceId): Behavior<Command> {
            return MyPersistentBehavior(persistenceId)
        }
    }
}

EventSourcedBehaviorを構成するメンバ

persistenceId 永続したアクターを示すユニークな識別子のことです。
emptyState エンティティが最初に作成されたときの状態を定義します。
(例えばCounterというアクターだったら0にするみたいな)
commandHandler コマンドを受け付け、そのEffect(効果)を定義します。
eventHandler イベントが永続化されたときに、現在の状態に新しい状態を加えた状態を返します。

ここで注意していただきたいのが、EventSourceBehaviorを永続的なアクターには状態を含めていは行けないということです!
すべての状態はEventSourceBehaviorのジェネリクスで指定したStateに持っていなくてはいけません。
さもなければアクターが停止、または再起動したさいに状態が保持できず消えてしまいます。
また、Stateへの更新はeventhandlerを経由して変更することにも注意してください。
(ここらへんオブジェクト指向的な考えじゃないので詰まりやすいですよね)

じゃぁ次は更に詳しくこの構成要素を見ていきましょうか。


PersistenceId

PersistenceIdはイベントジャーナルおよびスナップショットストアにある永続的なアクターのユニークな識別子のことです。

ClusterShardingは通常、PersistenceIdに対してアクティブなエンティティが1つだけ存在することを保証するために使用されます。
PersistenceId.ofUniqueIdでカスタムなIDを作成することが可能です。


CommandHandler

コマンドハンドラは現在の状態と入力されるコマンドを持ったメソッドです。

コマンドハンドラはコマンドによる効果(Effect)を返します。
例えばコマンドが成功してEventが発行されたとか、逆に失敗して発行されなかったとかのことをEffectといいます。
これはEffect() ファクトリメソッドにより返します。

一般的最も使用されるEffectは以下の2種類です

  • persist 単一 or 複数のイベントを永続化します。
  • none 永続化するイベントを指定しません(コマンドが失敗した時 or 読み取り専用のコマンドだった時など...)

EventHandler

コマンドハンドラー内で無事にイベントが永続化されるとeventhandlerを用いて現在の状態にイベントを適応し、新しい状態が作成されます。
ステート(永続的なアクターで管理されてる状態)は通常イミュータブルに設計しなくてはならず、イベントハンドラでは新たに作成したステート
を返しインスタンスを更新します。


例の残りのコード

これまでの解説でEventSourcedBehaviorの概要は理解できたと思いますので実際にコードベースで更に理解を深めていきましょう。

object Commands {
    sealed interface Command

    //追加コマンド
    data class Add(
        val data: String
    ) : Command

    //クリアコマンド
    enum class Clear : Command {
        INSTANCE
    }
}

object Events {
    sealed interface Event

    //追加されたイベント
    class Added(
        val data: String
    ) : Event

    //クリアされたイベント
    enum class Cleared : Event {
        INSTANCE
    }
}
data class State(
    private val items: List<String> = listOf()
) {
    fun addItem(data: String): State {
        var newItems = ArrayList(items)
        newItems.add(0, data)

        //直近5件のデータのみ常に保持し続ける的なビジネスロジックだった場合
        var latest = items.subList(0, Math.min(5,newItems.size))

        return copy(items = latest)
    }
}
class MyPersistentBehavior private constructor(
    persistenceId: PersistenceId
) : EventSourcedBehavior<Commands.Command, Events.Event, State>(persistenceId) {
    companion object {
        fun create(persistenceId: PersistenceId): Behavior<Commands.Command> {
            return MyPersistentBehavior(persistenceId)
        }
    }

    override fun emptyState(): State {
        return State()
    }

    override fun commandHandler(): CommandHandler<Commands.Command, Events.Event, State> {
        return newCommandHandlerBuilder()
            .forAnyState()
            .onCommand(Commands.Add::class.java) { command ->
                Effect().persist(Events.Added(command.data))
            }
            .onCommand(Commands.Clear::class.java) { _ ->
                Effect().persist(Events.Cleared.INSTANCE)
            }
            .build()
    }

    override fun eventHandler(): EventHandler<State, Events.Event> {
        return newEventHandlerBuilder()
            .forAnyState()
            .onEvent(Events.Added::class.java){ state, event ->
                state.addItem(event.data)
            }
            .onEvent(Events.Cleared::class.java){ _, _ ->
                State()
            }
            .build()
    }
}


Effect(効果)とSubEffect(副作用)

コマンドハンドラは受け取ったコマンドの効果を示すEffectをファクトリメソッドで返すと以前説明しましたが
このファクトリメソッドを使用して制作される効果は以下のものがあります。


コマンドの効果

persist 一つのイベントもしくは複数のイベントを永続化
none 読み取り専用のコマンドなどで保存するイベントがない場合に使用します
unhandled 現在の状態ではコマンドは処理されないことを表します
stop このアクターを停止します
stash 現在のコマンドをstashします
unstashAll stashされたコマンドをすべて処理します
reply ActorRefに返信メッセージを送信します

コマンドの副作用

コマンドが成功した後に継続して実行できる副作用(追加処理的な感じ)を定義することができます。
副作用の定義方法は簡単で↑のコマンドのEffectにメソッドチェーンするだけです。
noneとかunhandledとかは具体的な処理が無いので即座に副作用が実行されます。

例:

Effect().persist().thenRun()
  • thenRun コマンドの結果のあとの副作用を表す
  • thenStop コマンドの結果のあとにアクターを停止します
  • thenUnstashAll stashされたコマンドを一斉に処理します
  • thenReply 与えられたActorRefに返信メッセージを送信する


ClusterShardingとEventSourcedBehaviorの関係性

クラスタシャーディングは永続的なアクター(EventSourcedBehavior継承させた奴のことね)をクラスタ上に分散させ、ユニークなIDを元に各ノードにアクターを配置します。
このおかげで1つのノードのメモリに収まるよりも多くのアクターをメモリ上に展開することが可能になります。
クラスタシャーディングと永続的なアクターが組み合わさることにより障害耐性を向上させることができます。
例えばノードがもしクラッシュしてアクターが全部消えたとしても別のノードで速やかに消えた永続的なアクターを再開することが可能です。

EventSourcedBehaviorを継承した永続的なアクターは通常のアクターと同様に実行することができますが基本的にはクラスターシャーディングとセットで使われることが前提で
設計されていることを頭の隅で覚えておいてください。
基本的に永続的なアクターは2つ存在してはいけません、これは同じ状態が2つ存在しているとイベントが被ってしまう可能性があり、状態の整合性が取れなくなってしまうからです。

ということで基本的にEventSourcedBehaviorはクラスタ上に一つしか存在しないことを保証するクラスターシャーディングとセットで使われるのです。



ActorContextへのアクセス

あまりないことですが、EventSourcedBehaviorでActorContextを使用したい場合があります。
その場合は、通常のアクター通りBehaviors.setupを用いてインスタンスを作成します。
(子アクターを更に作成したい時とか、ログを出したいときとかに使います)

class ContextBehavior private constructor(
    persistenceId: PersistenceId,
    // EventSourcedBehaviorにはcontextフィールドが無いので自分で持ってないといけない
    val context: ActorContext<Commands.Command>
) : EventSourcedBehavior<Commands.Command, Events.Event, State>(persistenceId) {
    companion object {
        fun create(persistenceId: PersistenceId): Behavior<Commands.Command> {
            return Behaviors.setup{ context ->
                ContextBehavior(persistenceId, context)
            }
        }
    }

    //ここから下はめんどくさいから省略contextを使ったlogとか新しいアクター作ったりとか...
}


永続的なアクターからの応答

Request-Responceパターンは永続的アクターでは非常に一般的です。
(リクエストコマンドの中にReply用のActorRef入れとくやり方のことです。)

なぜならコマンドが正常に受け付けたのか、または失敗して拒否されたのかをコマンドを依頼した側は知りたいといった要件の場合がほとんどだからです。
(akkaはメッセージ駆動なので例外出して上位レイヤーで捕まえるみたいな芸当ができないので、このパターンはめちゃくちゃよく使います。)

object Protocol {
    sealed interface Command
    sealed interface CommandResponse

    //何かを追加するイベント
    data class Add(
        val content: String,
        // なにか追加したあとに結果を返すためのActor参照
        val replyTo: ActorRef<CommandResponse>
    ) : Command
}
.onCommand(Protocol.Add::class.java){command, state -> 
    Effect()
        .persist(Event())
        .thenRun{command.replyto.tell(Protocol.CommandResponce())}
}

こんな感じで先程紹介したSubEffectthenRunを使うとイベントが保存されたあとに発行されるのでスッキリ書けます。



シリアライゼーション

基本的にイベント(イベントジャーナル用)とステート(スナップショット用)はシリアライズされて保存または取得されます。
公式ではJacksonを使用したシリアライズを推奨しています

殆どの場合イベントとステートのクラスもしくはインタフェースにマーカーインタフェースをつけておくことになると思います。

interface Event : JacksonSerializable
data class State(val value: Double) : JacksonSerializable


リカバリー

イベントソースされたアクタは、ジャーナルされたイベントを再生することにより、起動時および再起動時に自動的にリカバリされます。
回復中にアクタに送信された新しいメッセージは、再生されたイベントに干渉しません。これらのメッセージは格納され、回復フェーズが完了した後に EventSourcedBehavior によって受信されます。

同時に進行可能なリカバリーの数は、システムとバックエンドのデータストアに過負荷をかけないように制限されています。
(超でかいシステムでノードの中に1000万アクターとかあった時一気に1000万のアクターをイベントapplyで作ろうとするので制限がないと普通に死ぬ)
制限を超えると、アクターは他のリカバリーが完了するまで待機します。これは、以下の方法で設定します。

akka.persistence.max-concurrent-recoveries = 50
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?