始めに
Azure Functionsでイベントソーシングのためにイベントストアのバインディングの実装についてご説明します。
注意: この記事の内容はコマンド側のみです。
基本
バインディングとは?
バインディングはAzure Functionが他のリソースに接続することです。例えば、データベース、メッセージミドルウェア、SlackやTwilioのような別のサービスなど。
イベントストアとは?
イベントストアはイミュータブルな変更のみデータベースでイベントを保存することです。
イベントストリームとは?
イベントストリームは一つのオブジェクトのイベントの並びです。各々のイベントを保存するためにバージョンの番号を付けます。
実装
イベントストア
イベントストアとしてAzure Storage Tableを使用します。一つのテーブルに全部のイベントを保存します。テーブルのスキーマは以下です。
Property | Description |
---|---|
Partition Key | オブジェクトの識別子です |
Row Key | ストリームの順序のためです |
Timestamp | イベントが発生した時間 |
Version | イベントが発生したとき、ストリームのバージョン番号 |
Data | シリアライズされたイベント |
Type | ディシリアライズのためにイベントのCLR名前 |
そして、テーブルレコードは以下の定義です。
public sealed class EventEntity : ITableEntity
{
public EventEntity()
{
}
public EventEntity(string streamId, int version, string type, byte[] data)
{
Data = data;
PartitionKey = streamId;
RowKey = version.ToString("0000000000");
Type = type;
Version = version;
}
public byte[]? Data { get; set; }
public string? Type { get; set; }
public int Version { get; set; }
public string PartitionKey { get; set; } = "";
public string RowKey { get; set; } = "";
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }
}
イベントストリーム
イベントストリームは二つのことができます。
- イベントから状態を取得すること
- ストリームの後ろに新しいイベントを追加すること
以下のインタフェースを実装します。
public interface IEventStream
{
string StreamId { get; }
int Version { get; }
Task AppendAsync(object @event, CancellationToken ct = default);
Task<T> LoadAsync<T>(Func<T, object, T> fold, CancellationToken ct = default) where T : new();
}
LoadAsync
LoadAsync
は全部のイベントからオブジェクトの状態を取得します。
まずは、オブジェクトはディフォルトコンストラクタで初期化します。これから、各々のイベントは関数にイベントをapplyしていくと、そのときの状態に対してイベントをapplyするかたちになるので、どんどん状態を作って更新していけます。全部のイベントがapplyされると状態は最新になります。
LoadAsync
を呼んだ後で、EventStream
は最新のバージョンの番号を保存します。AppendAsync
は楽観的並行性制御のために検証されます。
public async Task<T> LoadAsync<T>(Func<T, object, T> fold, CancellationToken ct = default) where T : new()
{
// read all events
var pages = table.QueryAsync<EventEntity>(o => o.PartitionKey == StreamId && o.RowKey != HeadRowKey, cancellationToken: ct).AsPages().ConfigureAwait(false);
var records = new List<EventEntity>();
await foreach (var item in pages)
{
records.AddRange(item.Values);
}
// rebuild
Version = 0;
var instance = new T();
foreach (var item in records)
{
var @event = serializer.Deserialize(item.Type!, item.Data!);
instance = fold(instance, @event);
Version += 1;
}
return instance;
}
AppendAsync
AppendAsync
は楽観的並行性制御の検証ができたらストリームの後ろにイベントを追加します。
まずは、LoadAsync
に読み込んだバージョンがストリームヘッドのバージョンと合わなければなりません。ストリームヘッドは現在のバージョンを保存しています。最初のイベントが保存される時、ストリームヘッドを作ります。その後はイベントを追加するとバージョンが増えて、さらに、もう一つの検証があり、ヘッドのETag
も確認できます。
両方の検証ができたら、イベントが保存されます。
public async Task AppendAsync(object @event, CancellationToken ct = default)
{
var head = table.Query<EventEntity>(o => o.PartitionKey == StreamId && o.RowKey == HeadRowKey, cancellationToken: ct).SingleOrDefault();
// optimistic concurrency check
if (head != null && head.Version != Version)
{
throw new InvalidOperationException($"Stream has updated since last load (Loaded Version = {Version}, Latest Version = {head.Version})");
}
var actions = new List<TableTransactionAction>();
// update the stream head
if (head == null)
{
head = new EventEntity { PartitionKey = StreamId, RowKey = HeadRowKey, Version = 1, };
actions.Add(new TableTransactionAction(Add, head));
}
else
{
head.Version += 1;
actions.Add(new TableTransactionAction(UpdateMerge, head, head.ETag));
}
// append the event
var type = @event.GetType().AssemblyQualifiedName!;
var data = serializer.Serialize(@event);
actions.Add(new TableTransactionAction(Add, new EventEntity(StreamId, head.Version, type, data)));
await table.SubmitTransactionAsync(actions, ct).ConfigureAwait(false);
Version = head.Version;
}
バインディング
最後はEventStream
がAzure Functionのパラメータと繋ぐバインディングを実装します。バインディングは属性として実装してIExtensionConfigProvider
を実装しているタイプに初期化します。
EventStore
属性の定義は以下です。
[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public sealed class EventStoreAttribute : Attribute
{
public EventStoreAttribute(string streamId)
{
StreamId = streamId;
}
[AutoResolve]
public string StreamId { get; }
}
Functionは要求を受けるとStreamId
が要求のvalueになります。
EventStoreAttribute
はEventStoreBinding
を使用してEventStream
を初期化します。
public sealed class EventStoreBinding : IExtensionConfigProvider
{
private readonly ISerializer serializer;
private readonly TableClient tableClient;
public EventStoreBinding(IOptions<EventStoreOptions> options, ISerializer serializer)
{
this.serializer = serializer;
tableClient = new TableClient(options.Value.TableConnectionString, options.Value.TableName);
tableClient.CreateIfNotExists();
}
public void Initialize(ExtensionConfigContext context)
{
context.AddBindingRule<EventStoreAttribute>().BindToInput<IEventStream>(attribute => new EventStream(serializer, tableClient, attribute.StreamId));
}
}
使い方
イベントソーシングや、このバインディングはCQRSとよく使えます。そして、CQRSのようなコマンドハンドラをAzure Functionsで実装しやすいです。
- ClientはFunctionにコマンドを投げます
- Functionはコマンドを受けるとコマンドに入っている識別子でイベントストリームを受けます
- イベントストリームから状態を読み込んでこの状態によってコマンドを処理します
- 処理の結果によってイベントを生成して保存します
下記に、必要なタイプを示します。
- Commands
- Events
- States
コマンド
コマンドは簡単なデータクラスです。それで、C# 9のレコード型で実装したほうがいいと思います。シリアライズ化可能なのです。
public static class Commands {
public static class V1 {
public record LogProgress (string Id, int Progress);
}
}
イベント
イベントも簡単なデータクラスです。イベントストアに保存するためにシリアライズ化可能である必要があります。
イベントはコマンドとほとんどに似ていますが、どう違うか考え方は以下のようになっています。
コマンドは
- Clientの要求を示します
- ハンドラに拒否することができます
- 命名規則は命令形で定義します
イベントは
- すでに起こったことを示します
- 保存するもので無視することができません
- 命名規則は過去形で定義します
これら以外コマンドとイベントは同じ情報を持っているのでイベントはコマンドから変換されます。そこで、コマンドをイベントに変換するためのファクトリーメソッドを実装しました。
public static class Events
{
public static class V1
{
public sealed record LoggedProgress(string Id, int Progress, DateTime Timestamp)
{
public static LoggedProgress From(Commands.V1.LogProgress command) =>
new(command.Id, command.Progress, DateTime.UtcNow);
}
public sealed record Completed(string Id, DateTime Timestamp)
{
public static Completed From(Commands.V1.LogProgress command) =>
new(command.Id, DateTime.UtcNow);
}
}
}
ステート
イベントが起こったときオブジェクトの状態が変わります。状態を変更するためにオブジェクトの現在の状態とイベントをApply
に渡してイベントによって変わります。
オブジェクトの状態を取得するために、コマンドハンドラはApply
関数をEventStream.LoadAsync
のfold
パラメータに渡します。
public enum Status : int { Backlogged = 0, Doing = 1, Done = 2, }
public sealed record TodoState
{
public Status Status { get; init; } = Status.Backlogged;
public int Progress { get; init; }
public static TodoState Apply (TodoState state, object @event) =>
@event switch
{
Events.V1.Created e => state with { Status = Status.Backlogged, Progress = 0 },
Events.V1.LoggedProgress e => state with { Status = Status.Doing, Progress = (state.Progress + e.Progress) },
Events.V1.Completed e => state with { Status = Status.Done, Progress = 100 },
_ => throw new ArgumentException ($"Cannot apply '{@event.GetType().Name}'", nameof (@event))
};
}
ハンドラ
最後に、下記はコマンドハンドラとして用いるfunctionです。
public static class LogProgressFunction
{
[FunctionName(nameof(LogProgressFunction))]
public static async Task<IActionResult> Run
(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "update")] Commands.V1.LogProgress command,
[EventStore("{Id}")] IEventStream events,
CancellationToken ct
)
{
// validate command
if (string.IsNullOrWhiteSpace(command.Id) || command.Progress < 0 || command.Progress > 100)
{
return new BadRequestResult();
}
// load state to make decisions on how to process command
var instance = await events.LoadAsync<TodoState>(TodoState.Apply, ct);
if (instance.Status == Status.Done)
{
return new BadRequestResult();
}
else if (instance.Progress + command.Progress >= 100)
{
await events.AppendAsync(Events.V1.Completed.From(command), ct);
}
else
{
await events.AppendAsync(Events.V1.LoggedProgress.From(command), ct);
}
return new OkResult();
}
}
まとめ
ここではAzure Functionsでイベントソーシングとコマンド側を考察しました。クエリー側はまだです。
ソースとサンプルは以下です。記事に関してコミットは「2022-advent-calendar」タグです。