このドキュメントの内容
EventStoreDB
で簡単なイベントデータの読み書きを実装してみます。
EventStoreDB とは
EventStoreDB
は EventStore Ltd. がリリースしているオープンソースのイベントソーシングデータベースです。
サポートされているプラットフォーム
-
公式のダウンロードページでは次のバイナリが提供されています。
- Windows 64-bit (.NET Core 3.1)
- Linux
- Ubuntu 16.04 64-bit
- Ubuntu 18.04 64-bit
- Ubuntu 20.04 64-bit
-
今回は Windows のバージョン 20.10 を使用します。
サポートされている開発言語
-
公式のダウンロードページでは次のクライアントが提供されています。
- .NET
- Java
- Go
- Node.js
- Rust
- Hashell
-
バージョン 20.10 で gRPC を用いたクライアント API が追加されたようです。
-
今回は .NET クライアントの gRPC API を使用します。
サーバー実行環境の構築
-
公式のダウンロードページからバイナリをダウンロードします。Windows の場合は ZIP 圧縮ファイルです。
https://www.eventstore.com/downloads -
実行ディレクトリを作ります。おそらくディレクトリ構成に決まりはありません。
E:\ProgramFiles\EventStoreDB : ルートディレクトリ bin : 実行ファイルを配置するディレクトリ cert : 証明書ファイルを配置するディレクトリ ca : ルート証明書ファイルを配置するディレクトリ data : データファイルが出力されるディレクトリ index : インデックスファイルが出力されるディレクトリ log : ログファイルが出力されるディレクトリ
-
SSL を有効にする場合、サーバー証明書(とルート証明書)を準備します。gRPC で SSL を有効にするときと同じです。cert ディレクトリにサーバー証明書を配置します。cert\ca ディレクトリにルート証明書を配置します。
-
ZIP ファイルを解凍し、実行ファイルを bin ディレクトリに配置します。
-
構成ファイルを作成します。公式サイトにウィザードがあります。ウィザードに沿って値を設定していくと、構成ファイルの中身を作成してくれます。作成された内容をコピーしたファイルをルートディレクトリに配置します。
https://developers.eventstore.com/server/v20.10/docs/installation/SampleEventStore.conf--- # Paths Db: E:\ProgramFiles\EventStoreDB\Data Index: E:\ProgramFiles\EventStoreDB\Index Log: E:\ProgramFiles\EventStoreDB\Log # Certificates configuration CertificateFile: E:\ProgramFiles\EventStoreDB\cert\TestServer.crt CertificatePrivateKeyFile: E:\ProgramFiles\EventStoreDB\cert\TestServer.pem TrustedRootCertificatesPath: E:\ProgramFiles\EventStoreDB\cert\ca # Network configuration IntIp: 127.0.0.1 ExtIp: 127.0.0.1 HttpPort: 2113 IntTcpPort: 1112 ExtTcpPort: 1113 EnableExternalTcp: true EnableAtomPubOverHTTP: true # Projections configuration RunProjections: All
-
コマンドプロンプトで次のコマンドを実行すると、サーバーが起動します。
ルートディレクトリで実行するときの例.\bin\EventStore.ClusterNode.exe --config SampleEventStore.conf
SSLを無効にする場合(insecureを指定する).\bin\EventStore.ClusterNode.exe --config SampleEventStore.conf --insecure
-
["127.0.0.1:2113"] Sub System '"Projections"' initialized. のようなメッセージが表示されれば起動完了です。終了させる場合は CTRL+C キーを押します。
クライアントアプリケーションの実装
-
NuGet で次のパッケージをインストールします。
- EventStore.Client
- EventStore.Client.Grpc.Streams
-
必要に応じて JSON シリアライザをインストールします。今回は
Utf8Json
を使用することにしました。
イベントデータ
- 次のコマンドを定義しました。
- 特定のインターフェースを実装する必要はありませんが、プロダクトではロギングなどの共通処理を透過的に実装できるようにするためにインターフェースを実装することが多いと思います。
public interface IEventCommand
{
Guid ID { get; }
DateTimeOffset CreateAt { get; }
}
public sealed class SampleEventCommand : IEventCommand
{
private SampleEventCommand(Guid id, DateTimeOffset createAt, string message)
{
ID = id;
CreateAt = createAt;
Message = message;
}
/// <summary>
/// 新規インスタンスを生成します。
/// </summary>
/// <param name="message">メッセージ</param>
/// <returns></returns>
public static SampleEventCommand Create(string message)
{
return new SampleEventCommand(Guid.NewGuid(), DateTimeOffset.UtcNow, message);
}
public Guid ID { get; private set; }
public DateTimeOffset CreateAt { get; private set; }
public string Message { get; set; }
public override string ToString()
{
return $"{CreateAt}: {Message}";
}
}
シリアライザ
-
Utf8Json
を使用しています。 - 特定のシリアライザへの依存性を軽減させるため、インターフェースを実装しています。
using Utf8Json;
using Utf8Json.Resolvers;
public interface IEventDataSerializer
{
ReadOnlyMemory<byte> Serialize<T>(T obj);
T Deserialize<T>(ReadOnlyMemory<byte> data);
}
public class SampleSerializer : IEventDataSerializer
{
public readonly static SampleSerializer Default = new SampleSerializer();
public ReadOnlyMemory<byte> Serialize<T>(T obj)
{
return JsonSerializer.Serialize(obj, StandardResolver.AllowPrivate);
}
public T Deserialize<T>(ReadOnlyMemory<byte> data)
{
return JsonSerializer.Deserialize<T>(data.ToArray(), StandardResolver.AllowPrivate);
}
}
クライアントプロキシの生成
- EventStoreDB クライアントプロキシを生成するメソッドです。
using System.IO;
using Grpc.Core;
/// <summary>
/// EventStoreDB クライアントを生成します。
/// </summary>
/// <returns></returns>
private EventStoreClient CreateEventStoreClient()
{
// gRPC で接続するときの接続文字列
var settings = EventStoreClientSettings.Create(@"esdb://127.0.0.1:2113?tls=true");
// ルート証明書を明示的に設定する(自己証明書の場合のみ?)
// これを設定しない場合、gRPC のエラー "failed to connect to all addresses." が発生した
settings.ChannelCredentials = new SslCredentials(File.ReadAllText(@".\Certs\TestCA.crt"));
return new EventStoreClient(settings);
}
イベントデータの書き込み
- EventStoreDB にイベントデータを書き込むメソッドです。
- イベントタイプとストリーム名を指定する必要があります。
using EventStore.Client;
/// <summary>
/// 指定されたコマンドをイベントソースに書き込みます。
/// </summary>
/// <param name="command">コマンド</param>
/// <returns></returns>
private Task<IWriteResult> WriteCommandAsync(SampleEventCommand command)
{
// TODO: コマンドの型からイベントタイプとストリーム名を特定できるようにするのが望ましい
return WriteCommandAsync("sampleEvent", "sampleStream", command);
}
/// <summary>
/// 指定されたコマンドをイベントソースに書き込みます。
/// </summary>
/// <typeparam name="TCommand">コマンドの型</typeparam>
/// <param name="eventType">イベントタイプ</param>
/// <param name="streamName">ストリーム名</param>
/// <param name="command">コマンド</param>
/// <returns>書き込み結果</returns>
private Task<IWriteResult> WriteCommandAsync<TCommand>(string eventType, string streamName, TCommand command)
where TCommand : IEventCommand
{
return WriteCommandsAsync(eventType, streamName, new[] { command });
}
/// <summary>
/// 指定されたコマンドをイベントソースに書き込みます。
/// </summary>
/// <typeparam name="TCommand">コマンドの型</typeparam>
/// <param name="eventType">イベントタイプ</param>
/// <param name="streamName">ストリーム名</param>
/// <param name="commands">コマンド</param>
/// <returns>書き込み結果</returns>
private async Task<IWriteResult> WriteCommandsAsync<TCommand>(string eventType, string streamName, IEnumerable<TCommand> commands)
where TCommand : IEventCommand
{
// コマンドを格納したイベントデータを列挙するメソッド
static IEnumerable<EventData> ToEventData(string eventType, IEnumerable<TCommand> commands)
{
foreach (var obj in commands)
{
yield return new EventData(
Uuid.NewUuid()
, eventType
, m_Serializer.Serialize(obj)
);
}
}
// 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる
await using EventStoreClient client = CreateEventStoreClient();
return await client.AppendToStreamAsync(
streamName
, StreamState.Any
, ToEventData(eventType, commands)
).ConfigureAwait(false);
}
イベントデータの読み込み
- gRPC クライアントではイベントデータを読み込む二つのメソッドが提供されています。
- 指定したストリームからイベントデータを読み込む ReadStreamAsync メソッド
- 全てのストリームから全てのイベントデータを読み込む ReadAllAsync メソッド
EventStoreClient.ReadStreamAsync メソッド
-
特定のイベントデータを読み込むときに使用します。
-
読み込み開始位置を指定する場合、シーケンシャルに付番されるイベント番号を指定します。
-
今回は読み込んだ際に取得された読み込み位置をアプリケーション内のフィールドに保存しています。プロダクトではデータベースなどに永続化することになります。
using EventStore.Client;
/// <summary>
/// ストリーム名と最後に読み込んだ位置の組み合わせ
/// </summary>
private Dictionary<string, StreamPosition> LastStreamPosition { get; } = new Dictionary<string, StreamPosition>();
/// <summary>
/// 指定されたストリームの次の読み込み位置を取得します。
/// </summary>
/// <param name="streamName">ストリーム名</param>
/// <returns>位置</returns>
private StreamPosition GetNextStreamPosition(string streamName)
{
if (LastStreamPosition.TryGetValue(streamName, out StreamPosition position))
{
return position.Next();
}
else
{
return StreamPosition.Start;
}
}
using EventStore.Client;
private static readonly IEventDataSerializer Serializer = SampleSerializer.Default;
/// <summary>
/// 指定されたストリームからコマンドを読み込みます。
/// </summary>
/// <typeparam name="TCommand">コマンドの型</typeparam>
/// <param name="streamName">ストリーム名</param>
/// <returns>非同期ストリーム</returns>
private IAsyncEnumerable<TCommand> ReadCommandsAsync<TCommand>(string streamName)
where TCommand : IEventCommand
{
// 次の読み込み位置を指定する
return ReadCommandsAsync<TCommand>(streamName, GetNextStreamPosition(streamName));
}
/// <summary>
/// 指定されたストリームからコマンドを読み込みます。
/// </summary>
/// <typeparam name="TCommand">コマンドの型</typeparam>
/// <param name="streamName">ストリーム名</param>
/// <param name="startPosition">読み込み開始位置</param>
/// <returns>非同期ストリーム</returns>
private async IAsyncEnumerable<TCommand> ReadCommandsAsync<TCommand>(string streamName, StreamPosition startPosition)
where TCommand : IEventCommand
{
// 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる
await using var client = CreateEventStoreClient();
await using var stream = client.ReadStreamAsync(Direction.Forwards, streamName, startPosition);
if (await stream.ReadState == ReadState.Ok)
{
while (await stream.MoveNextAsync().ConfigureAwait(false))
{
yield return Serializer.Deserialize<TCommand>(stream.Current.Event.Data);
// 最後に読み込んだ位置を格納する
LastStreamPosition[streamName] = stream.Current.Event.EventNumber;
}
}
}
EventStoreClient.ReadAllAsync メソッド
-
全てのイベントデータを読み込むときに使用します。読み込まれたイベントタイプからイベントを特定してハンドリングすることになります。
-
読み込み開始位置を指定する場合、トランザクションログ上のバイト位置を指定します。次のバイト位置を取得する方法はわかりませんでした。最後に読み込んだ位置を開始位置に指定し、取得された位置が最後に読み込んだ位置と一致する場合はスキップしています。イベントデータの削除や再配置を行ったときにバイト位置が変わる可能性があり、この方法が正しいかどうかは検証が必要です。
-
今回は読み込んだ際に取得された読み込み位置をアプリケーション内のフィールドに保存しています。プロダクトではデータベースなどに永続化することになります。
-
読み込まれるイベントデータには管理イベントも含まれるため、管理者権限が要求されます。
using EventStore.Client;
/// <summary>
/// 最後に読み込んだ位置
/// </summary>
private Position? LastTransactionPosition = null;
/// <summary>
/// 最後に読み込んだ位置を取得します。
/// </summary>
/// <returns></returns>
private Position GetLastTransactionPosition()
{
return LastTransactionPosition ?? Position.Start;
}
using EventStore.Client;
private static readonly IEventDataSerializer Serializer = SampleSerializer.Default;
/// <summary>
/// トランザクションログからコマンドを読み込みます。
/// </summary>
/// <returns>非同期ストリーム</returns>
private IAsyncEnumerable<IEventCommand> ReadAllCommandsAsync()
{
// 最後の読み込み位置を指定する
return ReadAllCommandsAsync(GetLastTransactionPosition());
}
/// <summary>
/// トランザクションログからコマンドを読み込みます。
/// </summary>
/// <param name="lastPosition">最後の読み込み位置</param>
/// <returns>非同期ストリーム</returns>
private async IAsyncEnumerable<IEventCommand> ReadAllCommandsAsync(Position lastPosition)
{
// 頻繁に書き込みを行うアプリケーションの場合、クライアントの生成と破棄を繰り返さないほうがよいと思われる
await using EventStoreClient client = CreateEventStoreClient();
await foreach (var result in client.ReadAllAsync(
Direction.Forwards
, lastPosition
, userCredentials: new UserCredentials("admin", "changeit")))
{
// TODO: 最後に読み込んだときの位置と同じ場合はスキップする。位置が不変であるのか調査が必要。
if (lastPosition.CommitPosition >= result.Event.Position.CommitPosition)
{
continue;
}
// イベントからコマンドを取得して列挙する
if (TryGetCommand(@event, out IEventCommand cmd))
{
yield return cmd;
}
// 最後に読み込んだ位置を格納する
LastTransactionPosition = result.Event.Position;
}
}
/// <summary>
/// 指定されたイベントからコマンドを取得します。
/// </summary>
/// <param name="event">イベント</param>
/// <param name="command">コマンド</param>
/// <returns>取得できた場合、true を返します。</returns>
private bool TryGetCommand(ResolvedEvent @event, out IEventCommand command)
{
// TODO: イベントタイプからコマンドの型を特定できるようにすることが望ましい
if (@event.Event.EventType == "sampleEvent")
{
command = Serializer.Deserialize<SampleEventCommand>(@event.Event.Data);
return true;
}
command = null;
return false;
}
/// <summary>
/// 指定されたイベントからコマンドを取得します。
/// </summary>
/// <param name="event">イベント</param>
/// <param name="command">コマンド</param>
/// <returns>取得できた場合、true を返します。</returns>
private bool TryGetCommand<TCommand>(ResolvedEvent @event, out TCommand command)
where TCommand : IEventCommand
{
// TODO: イベントタイプからコマンドの型を特定できるようにすることが望ましい
if (@event.Event.EventType == "sampleEvent")
{
command = (TCommand)(IEventCommand)Serializer.Deserialize<SampleEventCommand>(@event.Event.Data);
return true;
}
command = default;
return false;
}
イベントデータの購読
-
Pub/Sub メッセージングモデルの購読側です。Reactive に処理を行う必要がある場合は、読み込みよりも購読が適しています。
-
イベントデータに対する処理自体は読み込みとほぼ同じです。
-
前述の読み込みと同様、gRPC クライアントではイベントデータを購読する二つのメソッドが提供されています。
- 指定したストリームからイベントデータを購読する ReadStreamAsync メソッド
- 全てのストリームから全てのイベントデータを購読する SubscribeToAllAsync メソッド
-
購読の状態を管理するオブジェクトを用いて、購読の終了処理をカプセル化しました。クライアントとキャンセルトークンは購読開始メソッド内のローカル変数としています。
EventStoreClient.SubscribeToAllAsync メソッド
// 購読の状態を管理するオブジェクト
private StreamSubscriptionState m_SubcribeSampleCommandState;
/// <summary>
/// コマンドの購読を開始します。
/// </summary>
/// <returns></returns>
private async Task StartSubcribeSampleCommandAsync()
{
// コマンドを受け取ったときの処理
Task onReceiveCommandAsync(SampleEventCommand command)
{
System.Diagnostics.Debug.WriteLine($"onReceiveCommandAsync: {command}");
return Task.CompletedTask;
}
m_SubcribeSampleCommandState = await SubcribeCommandAsync<SampleEventCommand>("sampleStream", onReceiveCommandAsync).ConfigureAwait(false);
}
/// <summary>
/// コマンドの購読を停止します。
/// </summary>
private void StopSubcribeSampleCommand()
{
m_SubcribeSampleCommandState?.Dispose();
}
/// <summary>
/// 指定されたコマンドの購読を開始します。
/// </summary>
/// <typeparam name="TCommand">コマンドの型</typeparam>
/// <param name="streamName">ストリーム名</param>
/// <param name="startPosition">読み込み開始位置</param>
/// <param name="onReceiveAsync">コマンドを受信したときに実行するメソッド</param>
/// <returns>購読の状態を管理するオブジェクト</returns>
private async Task<StreamSubscriptionState> SubcribeCommandAsync<TCommand>(string streamName, StreamPosition startPosition, Func<TCommand, Task> onReceiveAsync)
where TCommand : IEventCommand
{
// 購読をキャンセルするためのトークン
CancellationTokenSource cancellation = new CancellationTokenSource();
EventStoreClient client = CreateEventStoreClient();
// イベントを受け取ったときの処理
async Task OnEventAsync(StreamSubscription subscription, ResolvedEvent @event, CancellationToken cancellation)
{
// イベントからコマンドを取得する
// SubscribeToStreamAsync メソッドの引数にはイベントタイプがない
// 購読対象でない型のコマンドである可能性がある
if (TryGetCommand(@event, out TCommand cmd))
{
await onReceiveAsync(cmd).ConfigureAwait(false);
}
}
var subscription = await client.SubscribeToStreamAsync(
streamName
, startPosition
, OnEventAsync
, cancellationToken: cancellation.Token
).ConfigureAwait(false);
// 購読に関連するオブジェクトをまとめた状態オブジェクトを返す
return new StreamSubscriptionState(client, subscription, cancellation);
}
EventStoreClient.SubscribeToAllAsync メソッド
// 購読の状態を管理するオブジェクト
private StreamSubscriptionState m_SubcribeAllCommandState;
/// <summary>
/// 全てのコマンドの購読を開始します。
/// </summary>
/// <returns></returns>
private async Task StartSubcribeAllCommandAsync()
{
// コマンドを受け取ったときの処理
Task onReceiveAllCommandAsync(IEventCommand command)
{
System.Diagnostics.Debug.WriteLine($"onReceiveAllCommandAsync: {command}");
return Task.CompletedTask;
}
m_SubcribeAllCommandState = await SubcribeAllCommandAsync(onReceiveAllCommandAsync).ConfigureAwait(false);
}
/// <summary>
/// 全てのコマンドの購読を停止します。
/// </summary>
private void StopSubcribeAllCommand()
{
m_SubcribeAllCommandState?.Dispose();
}
/// <summary>
/// 全てのコマンドの購読を開始します。
/// </summary>
/// <param name="lastPosition">最後の読み込み位置</param>
/// <param name="onReceiveAsync">コマンドを受信したときに実行するメソッド</param>
/// <returns>購読の状態を管理するオブジェクト</returns>
private async Task<StreamSubscriptionState> SubcribeAllCommandAsync(Position lastPosition, Func<IEventCommand, Task> onReceiveAsync)
{
// 購読をキャンセルするためのトークン
CancellationTokenSource cancellation = new CancellationTokenSource();
EventStoreClient client = CreateEventStoreClient();
// イベントを受け取ったときの処理
async Task OnEventAsync(StreamSubscription subscription, ResolvedEvent @event, CancellationToken cancellation)
{
// TODO: 最後に読み込んだときの位置と同じ場合はスキップする。位置が不変であるのか調査が必要。
if (lastPosition.CommitPosition >= @event.Event.Position.CommitPosition)
{
return;
}
// イベントからコマンドを取得する
if (TryGetCommand(@event, out IEventCommand cmd))
{
await onReceiveAsync(cmd).ConfigureAwait(false);
}
}
var subscription = await client.SubscribeToAllAsync(
lastPosition
, OnEventAsync
, cancellationToken: cancellation.Token
, userCredentials: new EventStore.Client.UserCredentials("admin", "changeit")
).ConfigureAwait(false);
// 購読に関連するオブジェクトをまとめた状態オブジェクトを返す
return new StreamSubscriptionState(client, subscription, cancellation);
}
購読の状態を管理するオブジェクト
- IDisposable インターフェースを実装し、Dispose メソッドで購読に関連する各オブジェクトの破棄を行っています。
/// <summary>
/// 購読中の状態を管理します。
/// </summary>
public class StreamSubscriptionState : IDisposable
{
public StreamSubscriptionState(
EventStoreClient client
, StreamSubscription subscription
, CancellationTokenSource cancellation
, bool clientDisposable = true)
{
m_Client = client;
m_Subscription = subscription;
m_CancellationToken = cancellation;
m_ClientDisposable = clientDisposable;
}
private readonly EventStoreClient m_Client;
private readonly StreamSubscription m_Subscription;
private readonly CancellationTokenSource m_CancellationToken;
private readonly bool m_ClientDisposable;
public void Dispose()
{
if (!m_CancellationToken.IsCancellationRequested) { m_CancellationToken.Cancel(); }
m_Subscription.Dispose();
if (m_ClientDisposable) { m_Client.Dispose(); }
m_CancellationToken.Dispose();
}
}
イベントデータの内容
- 読み込みや購読で取得されるイベントデータには、EventRecord オブジェクトが格納されています。
プロパティ | 内容 |
---|---|
ContentType | 常に "application/json" ? |
Created | 生成された日時。UTC。 |
Data | コマンドなどをシリアライズしたバイト配列。 |
EventId | 書き込み時に指定したイベントID。UUID は時系列にソート可能なID。 |
EventNumber | シーケンシャルに設定されるイベント番号。Stream 系の読み込み/購読メソッドで開始位置を指定する場合はこの型の値で指定する。 |
EventStreamId | 書き込み時に指定したストリーム名。 |
EventType | 書き込み時に指定したイベントタイプ。 |
Metadata | メタデータをシリアライズしたバイト配列。 |
Position | トランザクションログ上の位置。おそらく何バイト目かを表している。All 系の読み込み/購読メソッドで開始位置を指定する場合はこの型の値で指定する。 |
参考:Kafka との比較
- 次のコラムによると
- 読み書きの機能は
EventStoreDB
が優れている。 - スループットとスケーラビリティは
Kafka
が優れている。 - 併用することによって両者のメリットを得られる。
- 読み書きの機能は