2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

bitFlyerAdvent Calendar 2022

Day 5

Azure Functionsでイベントソーシング(コマンド側)

Posted at

始めに

Azure Functionsでイベントソーシングのためにイベントストアのバインディングの実装についてご説明します。

注意: この記事の内容はコマンド側のみです。

基本

バインディングとは?

バインディングはAzure Functionが他のリソースに接続することです。例えば、データベース、メッセージミドルウェア、SlackやTwilioのような別のサービスなど。

イベントストアとは?

イベントストアはイミュータブルな変更のみデータベースでイベントを保存することです。

イベントストリームとは?

イベントストリームは一つのオブジェクトのイベントの並びです。各々のイベントを保存するためにバージョンの番号を付けます。

実装

イベントストア

イベントストアとしてAzure Storage Tableを使用します。一つのテーブルに全部のイベントを保存します。テーブルのスキーマは以下です。

Property Description
Partition Key オブジェクトの識別子です
Row Key ストリームの順序のためです
Timestamp イベントが発生した時間
Version イベントが発生したとき、ストリームのバージョン番号
Data シリアライズされたイベント
Type ディシリアライズのためにイベントのCLR名前

そして、テーブルレコードは以下の定義です。

public sealed class EventEntity : ITableEntity
{
    public EventEntity()
    {
    }

    public EventEntity(string streamId, int version, string type, byte[] data)
    {
        Data = data;
        PartitionKey = streamId;
        RowKey = version.ToString("0000000000");
        Type = type;
        Version = version;
    }

    public byte[]? Data { get; set; }
    public string? Type { get; set; }
    public int Version { get; set; }

    public string PartitionKey { get; set; } = "";
    public string RowKey { get; set; } = "";
    public DateTimeOffset? Timestamp { get; set; }
    public ETag ETag { get; set; }
}

イベントストリーム

イベントストリームは二つのことができます。

  1. イベントから状態を取得すること
  2. ストリームの後ろに新しいイベントを追加すること

以下のインタフェースを実装します。

public interface IEventStream
{
    string StreamId { get; }

    int Version { get; }

    Task AppendAsync(object @event, CancellationToken ct = default);

    Task<T> LoadAsync<T>(Func<T, object, T> fold, CancellationToken ct = default) where T : new();
}

LoadAsync

LoadAsyncは全部のイベントからオブジェクトの状態を取得します。

まずは、オブジェクトはディフォルトコンストラクタで初期化します。これから、各々のイベントは関数にイベントをapplyしていくと、そのときの状態に対してイベントをapplyするかたちになるので、どんどん状態を作って更新していけます。全部のイベントがapplyされると状態は最新になります。

LoadAsyncを呼んだ後で、EventStreamは最新のバージョンの番号を保存します。AppendAsyncは楽観的並行性制御のために検証されます。

public async Task<T> LoadAsync<T>(Func<T, object, T> fold, CancellationToken ct = default) where T : new()
{
    // read all events
    var pages = table.QueryAsync<EventEntity>(o => o.PartitionKey == StreamId && o.RowKey != HeadRowKey, cancellationToken: ct).AsPages().ConfigureAwait(false);
    var records = new List<EventEntity>();
    await foreach (var item in pages)
    {
        records.AddRange(item.Values);
    }

    // rebuild
    Version = 0;
    var instance = new T();
    foreach (var item in records)
    {
        var @event = serializer.Deserialize(item.Type!, item.Data!);
        instance = fold(instance, @event);
        Version += 1;
    }

    return instance;
}

AppendAsync

AppendAsyncは楽観的並行性制御の検証ができたらストリームの後ろにイベントを追加します。

まずは、LoadAsyncに読み込んだバージョンがストリームヘッドのバージョンと合わなければなりません。ストリームヘッドは現在のバージョンを保存しています。最初のイベントが保存される時、ストリームヘッドを作ります。その後はイベントを追加するとバージョンが増えて、さらに、もう一つの検証があり、ヘッドのETagも確認できます。

両方の検証ができたら、イベントが保存されます。

public async Task AppendAsync(object @event, CancellationToken ct = default)
{
    var head = table.Query<EventEntity>(o => o.PartitionKey == StreamId && o.RowKey == HeadRowKey, cancellationToken: ct).SingleOrDefault();

    // optimistic concurrency check
    if (head != null && head.Version != Version)
    {
        throw new InvalidOperationException($"Stream has updated since last load (Loaded Version = {Version}, Latest Version = {head.Version})");
    }

    var actions = new List<TableTransactionAction>();

    // update the stream head
    if (head == null)
    {
        head = new EventEntity { PartitionKey = StreamId, RowKey = HeadRowKey, Version = 1, };

        actions.Add(new TableTransactionAction(Add, head));
    }
    else
    {
        head.Version += 1;

        actions.Add(new TableTransactionAction(UpdateMerge, head, head.ETag));
    }

    // append the event
    var type = @event.GetType().AssemblyQualifiedName!;
    var data = serializer.Serialize(@event);
    actions.Add(new TableTransactionAction(Add, new EventEntity(StreamId, head.Version, type, data)));

    await table.SubmitTransactionAsync(actions, ct).ConfigureAwait(false);

    Version = head.Version;
}

バインディング

最後はEventStreamがAzure Functionのパラメータと繋ぐバインディングを実装します。バインディングは属性として実装してIExtensionConfigProviderを実装しているタイプに初期化します。

EventStore属性の定義は以下です。

[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public sealed class EventStoreAttribute : Attribute
{
    public EventStoreAttribute(string streamId)
    {
        StreamId = streamId;
    }

    [AutoResolve]
    public string StreamId { get; }
}

Functionは要求を受けるとStreamIdが要求のvalueになります。

EventStoreAttributeEventStoreBindingを使用してEventStreamを初期化します。

public sealed class EventStoreBinding : IExtensionConfigProvider
{
    private readonly ISerializer serializer;
    private readonly TableClient tableClient;

    public EventStoreBinding(IOptions<EventStoreOptions> options, ISerializer serializer)
    {
        this.serializer = serializer;

        tableClient = new TableClient(options.Value.TableConnectionString, options.Value.TableName);
        tableClient.CreateIfNotExists();
    }

    public void Initialize(ExtensionConfigContext context)
    {
        context.AddBindingRule<EventStoreAttribute>().BindToInput<IEventStream>(attribute => new EventStream(serializer, tableClient, attribute.StreamId));
    }
}

使い方

イベントソーシングや、このバインディングはCQRSとよく使えます。そして、CQRSのようなコマンドハンドラをAzure Functionsで実装しやすいです。

コマンドのフローは以下のようです。
event sourcing on azure functions (command side).drawio.png

  1. ClientはFunctionにコマンドを投げます
  2. Functionはコマンドを受けるとコマンドに入っている識別子でイベントストリームを受けます
  3. イベントストリームから状態を読み込んでこの状態によってコマンドを処理します
  4. 処理の結果によってイベントを生成して保存します

下記に、必要なタイプを示します。

  • Commands
  • Events
  • States

コマンド

コマンドは簡単なデータクラスです。それで、C# 9のレコード型で実装したほうがいいと思います。シリアライズ化可能なのです。

public static class Commands {
    public static class V1 {
        public record LogProgress (string Id, int Progress);
    }
}

イベント

イベントも簡単なデータクラスです。イベントストアに保存するためにシリアライズ化可能である必要があります。

イベントはコマンドとほとんどに似ていますが、どう違うか考え方は以下のようになっています。

コマンドは

  • Clientの要求を示します
  • ハンドラに拒否することができます
  • 命名規則は命令形で定義します

イベントは

  • すでに起こったことを示します
  • 保存するもので無視することができません
  • 命名規則は過去形で定義します

これら以外コマンドとイベントは同じ情報を持っているのでイベントはコマンドから変換されます。そこで、コマンドをイベントに変換するためのファクトリーメソッドを実装しました。

public static class Events
{
    public static class V1
    {
        public sealed record LoggedProgress(string Id, int Progress, DateTime Timestamp)
        {
            public static LoggedProgress From(Commands.V1.LogProgress command) =>
                new(command.Id, command.Progress, DateTime.UtcNow);
        }

        public sealed record Completed(string Id, DateTime Timestamp)
        {
            public static Completed From(Commands.V1.LogProgress command) =>
                new(command.Id, DateTime.UtcNow);
        }
    }
}

ステート

イベントが起こったときオブジェクトの状態が変わります。状態を変更するためにオブジェクトの現在の状態とイベントをApplyに渡してイベントによって変わります。

オブジェクトの状態を取得するために、コマンドハンドラはApply関数をEventStream.LoadAsyncfoldパラメータに渡します。

public enum Status : int { Backlogged = 0, Doing = 1, Done = 2, }

public sealed record TodoState
{
    public Status Status { get; init; } = Status.Backlogged;
    public int Progress { get; init; }

    public static TodoState Apply (TodoState state, object @event) =>
        @event switch
        {
            Events.V1.Created e => state with { Status = Status.Backlogged, Progress = 0 },
            Events.V1.LoggedProgress e => state with { Status = Status.Doing, Progress = (state.Progress + e.Progress) },
            Events.V1.Completed e => state with { Status = Status.Done, Progress = 100 },
            _ => throw new ArgumentException ($"Cannot apply '{@event.GetType().Name}'", nameof (@event))
        };
}

ハンドラ

最後に、下記はコマンドハンドラとして用いるfunctionです。

public static class LogProgressFunction
{
    [FunctionName(nameof(LogProgressFunction))]
    public static async Task<IActionResult> Run
    (
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "update")] Commands.V1.LogProgress command,
        [EventStore("{Id}")] IEventStream events,
        CancellationToken ct
    )
    {
        // validate command
        if (string.IsNullOrWhiteSpace(command.Id) || command.Progress < 0 || command.Progress > 100)
        {
            return new BadRequestResult();
        }

        // load state to make decisions on how to process command
        var instance = await events.LoadAsync<TodoState>(TodoState.Apply, ct);

        if (instance.Status == Status.Done)
        {
            return new BadRequestResult();
        }
        else if (instance.Progress + command.Progress >= 100)
        {
            await events.AppendAsync(Events.V1.Completed.From(command), ct);
        }
        else
        {
            await events.AppendAsync(Events.V1.LoggedProgress.From(command), ct);
        }

        return new OkResult();
    }
}

まとめ

ここではAzure Functionsでイベントソーシングとコマンド側を考察しました。クエリー側はまだです。

ソースとサンプルは以下です。記事に関してコミットは「2022-advent-calendar」タグです。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?