はじめに
CQRS(Command Query Responsibility Segregation)とイベントソーシング(Event Sourcing)は、複雑なドメインロジックや監査要件を持つシステムで威力を発揮するアーキテクチャパターンです。
今回は学習目的で、シンプルなユーザー登録機能を題材に、CQRS/ESをどのように設計・実装してみました。
注意: 本記事の実装はあくまで学習目的です。単純なユーザー登録機能に対してCQRS/ESは過剰な選択であり、実際のプロジェクトではROIを慎重に検討する必要があります。
CQRS/ESの基本概念
CQRS(Command Query Responsibility Segregation)
CQRSは、データの「書き込み」と「読み取り」を異なるモデルで処理するパターンです。
従来のCRUDアプローチでは、同一のモデルで読み書きを行いますが、CQRSでは:
- Command(書き込み): ビジネスロジックに焦点を当てた処理
- Query(読み取り): 表示に最適化されたデータ構造
という構造になります。
イベントソーシング
イベントソーシングは、状態の変更を「イベント」として記録するパターンです。
メリット
- 監査証跡: データの変更履歴を保存するのでデバッグがしやすく、復元も容易
- コンプライアンス: 不正な操作やデータ改ざんを検知することもでき、コンプライアンス的にもよい
- 分析の柔軟性: データを状態ではなくストーリーとしてとらえるアプローチなので、いろんな切り口で分析可能
- AI連携: AIとの相性も良い
実用例
- 金融取引の履歴
- 医療記録の変更履歴
- コンプライアンス要件の充足
システム構成
図が汚くてごめんなさい(Domain層の処理をしている部分は省いています)
使用した技術スタック
- Hono: 軽量で高速なWebフレームワーク(Express/Fastifyからの乗り換え)
- Drizzle ORM: TypeScript対応の型安全なORM
- PostgreSQL: イベントストアとして使用
- Redis Streams: イベントの非同期配信
- Auth0 + GitHub: 認証とプロフィール取得
技術選定の背景
なぜHono?
// Expressと比較して型安全で軽量
app.post('/commands/auth/register',
validator('json', RegisterUserSchema),
async (c) => {
const command = c.req.valid('json'); // 型安全!
// ...
}
);
Edge環境でも動作し、Zodによるバリデーションとの相性も抜群でした。
なぜDrizzle?
// 型安全なスキーマ定義
export const eventsTable = pgTable('events', {
id: uuid('id').primaryKey(),
aggregateId: uuid('aggregate_id').notNull(),
eventType: varchar('event_type', { length: 100 }).notNull(),
eventData: jsonb('event_data').notNull(),
// ...
});
// 型安全なクエリ
const events = await db
.select()
.from(eventsTable)
.where(eq(eventsTable.aggregateId, aggregateId))
.orderBy(asc(eventsTable.eventVersion));
Prismaと比較して、より低レベルな制御が可能で、イベントストアのような特殊な要件に適していました。
なぜRedis Streams?
イベントの非同期配信には以下の選択肢がありました:
- PostgreSQL LISTEN/NOTIFY: シンプルだが永続性なし
- RabbitMQ/Kafka: 高機能だが学習目的にはオーバースペック
- Redis Streams: 永続性あり、コンシューマーグループ対応、適度にシンプル
// Redis Streamsの利点: At-least-once配信を簡単に実現
await redis.xadd(
'events:UserEventHandler',
'*',
'eventType', event.eventType,
'payload', JSON.stringify(event)
);
アーキテクチャ設計
今回実装したのは、レイヤードアーキテクチャにCQRS/ESパターンを適用した構成です。
レイヤー構造
backend/src/
├── presentation/ # APIレイヤー
│ └── api/ # Honoルーティング
├── application/ # アプリケーションレイヤー
│ ├── commands/ # コマンドハンドラー
│ └── event-handlers/ # イベントハンドラー
├── domain/ # ドメインレイヤー
│ ├── aggregates/ # 集約ルート
│ └── services/ # ドメインサービス
└── infrastructure/ # インフラストラクチャレイヤー
├── eventStore/ # イベント永続化
└── external/ # 外部サービス連携
実装編①: コマンド側(Write)
処理フロー
ユーザー登録の処理フローは以下の通りです:
- クライアントからAuth0トークンを受信
- トークン検証とGitHubプロフィール取得
- 重複チェック
- ドメインイベント生成
- イベントストアへの永続化
- Redis Streamsへの非同期配信
- リードモデルの更新(Worker)
APIエンドポイント(Hono)
// presentation/api/commands/auth.ts
export const authCommandRoutes = new Hono<{ Bindings: Env }>();
authCommandRoutes.post(
'/register',
validator('json', RegisterUserSchema),
async (c) => {
const requestId = c.get('requestId');
const { auth0Token } = c.req.valid('json');
const container = c.get('container');
const commandBus = container.resolve<CommandBus>('commandBus');
const command = new RegisterUserCommand(auth0Token);
const result = await commandBus.execute(command);
if (!result.isSuccess) {
return c.json(
{ error: result.error?.message || 'Registration failed' },
result.error?.code === 'USER_ALREADY_EXISTS' ? 409 : 500
);
}
return c.json({ message: 'User registration initiated' }, 202);
}
);
コマンドハンドラー
// application/commands/handlers/user/RegisterUserCommandHandler.ts
export class RegisterUserCommandHandler implements CommandHandler<RegisterUserCommand> {
async handle(command: RegisterUserCommand): Promise<CommandResult> {
try {
// 1. Auth0トークン検証
const auth0User = await this.auth0Service.verifyToken(command.auth0Token);
// 2. GitHubプロフィール取得
const githubProfile = await this.githubService.getProfile(
auth0User.nickname || auth0User.sub
);
// 3. 重複チェック
const isDuplicated = await this.userRegistrationService.checkDuplication(
auth0Id,
githubProfile.id.toString()
);
if (isDuplicated.exists) {
// 監査ログ用の失敗イベント
const failureEvent = UserRegistrationFailedEvent.create({
auth0Id: auth0Id.value,
githubId: githubProfile.id.toString(),
reason: isDuplicated.reason!,
attemptedAt: new Date().toISOString(),
duplicateUserId: isDuplicated.duplicateUserId,
});
await this.eventProcessor.processEvents([failureEvent], 'UserRegistrationAudit');
return CommandResult.failure(
new UserAlreadyExistsError(
`User already exists: ${isDuplicated.reason}`
)
);
}
// 4. ユーザー登録
const userId = UserId.generate();
const userRegisteredEvent = User.register(
userId,
auth0Id,
GitHubProfile.create(githubProfile)
);
// 5. イベント永続化と配信
await this.eventProcessor.processEvents([userRegisteredEvent], 'User');
return CommandResult.success({ userId: userId.value });
} catch (error) {
return CommandResult.failure(error as Error);
}
}
}
ドメインモデルとイベント
// domain/aggregates/user/User.ts
export class User extends AggregateRoot {
private constructor(
id: UserId,
private auth0Id: Auth0Id,
private githubProfile: GitHubProfile,
version: number = 0
) {
super(id, version);
}
static register(
userId: UserId,
auth0Id: Auth0Id,
githubProfile: GitHubProfile
): UserRegisteredEvent {
// ドメインルールのチェック
if (!githubProfile.username || githubProfile.username.length === 0) {
throw new Error('GitHub username is required');
}
return UserRegisteredEvent.create({
aggregateId: userId.value,
auth0Id: auth0Id.value,
githubId: githubProfile.id,
githubUsername: githubProfile.username,
displayName: githubProfile.name || githubProfile.username,
avatarUrl: githubProfile.avatarUrl,
registeredAt: new Date().toISOString(),
});
}
}
// domain/aggregates/user/events/UserRegisteredEvent.ts
export class UserRegisteredEvent extends DomainEvent<{
auth0Id: string;
githubId: string;
githubUsername: string;
displayName: string;
avatarUrl?: string;
registeredAt: string;
}> {
static readonly eventType = 'UserRegistered';
}
実装編②: イベントストア
PostgreSQLでのイベント永続化
// infrastructure/eventStore/PostgresEventStore.ts
export class PostgresEventStore implements EventStore {
async save(event: DomainEvent, aggregateType: string): Promise<void> {
const eventId = ulid();
await this.db.transaction(async (tx) => {
// 楽観的排他制御: 最新バージョンを取得
const lastEvent = await tx
.select({ eventVersion: eventsTable.eventVersion })
.from(eventsTable)
.where(eq(eventsTable.aggregateId, event.aggregateId))
.orderBy(desc(eventsTable.eventVersion))
.limit(1);
const nextVersion = lastEvent[0]?.eventVersion ?? 0 + 1;
// イベントを保存
await tx.insert(eventsTable).values({
id: eventId,
aggregateId: event.aggregateId,
aggregateType,
eventType: event.eventType,
eventVersion: nextVersion,
eventData: event.data,
metadata: event.metadata,
occurredAt: new Date(event.occurredAt),
});
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const events = await this.db
.select()
.from(eventsTable)
.where(eq(eventsTable.aggregateId, aggregateId))
.orderBy(asc(eventsTable.eventVersion));
return events.map(this.eventSerializer.deserialize);
}
}
なぜULID?
UUIDv4ではなくULIDを採用した理由:
- 時系列ソート可能: イベントの順序が重要
- ランダム性も確保: 衝突回避
- インデックス効率: B-Treeインデックスとの相性
-- eventsテーブルのスキーマ
CREATE TABLE events (
id VARCHAR(26) PRIMARY KEY, -- ULID
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_version INTEGER NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(aggregate_id, event_version) -- 楽観的排他制御
);
実装編③: 非同期処理とリードモデル
イベントプロセッサー(Redis Streams発行)
// infrastructure/events/EventProcessor.ts
export class EventProcessor {
async processEvents(
events: DomainEvent[],
aggregateType: string
): Promise<void> {
// 1. イベントストアに保存
for (const event of events) {
await this.eventStore.save(event, aggregateType);
}
// 2. Redis Streamsに発行
for (const event of events) {
const streamKey = this.getStreamKey(event.eventType);
await this.redis.xadd(
streamKey,
'*', // 自動ID生成
'eventId', event.id,
'eventType', event.eventType,
'aggregateId', event.aggregateId,
'payload', JSON.stringify({
data: event.data,
metadata: event.metadata,
occurredAt: event.occurredAt,
})
);
}
}
private getStreamKey(eventType: string): string {
// UserRegistered → events:UserEventHandler
const handlerName = eventType.replace('Registered', 'EventHandler');
return `events:${handlerName}`;
}
}
ワーカー(リードモデル更新)
// workers/EventStreamWorker.ts
export class EventStreamWorker {
async start(): Promise<void> {
await this.ensureConsumerGroup();
while (this.isRunning) {
try {
await this.consumeMessages();
} catch (error) {
console.error('Error consuming messages:', error);
await this.sleep(5000); // エラー時は5秒待機
}
}
}
private async consumeMessages(): Promise<void> {
const messages = await this.redis.xreadgroup(
'GROUP', this.consumerGroup, this.consumerId,
'COUNT', 10,
'BLOCK', 1000, // 1秒ブロッキング
'STREAMS', this.streamKey, '>'
);
if (!messages || messages.length === 0) return;
for (const [stream, streamMessages] of messages) {
for (const [messageId, fields] of streamMessages) {
await this.processMessage(messageId, fields);
}
}
}
private async processMessage(
messageId: string,
fields: string[]
): Promise<void> {
try {
const event = this.parseEvent(fields);
// イベントハンドラーに処理を委譲
await this.eventHandler.handle(event);
// 処理成功をACK
await this.redis.xack(
this.streamKey,
this.consumerGroup,
messageId
);
} catch (error) {
console.error(`Failed to process message ${messageId}:`, error);
// ACKしないことで再試行可能に
}
}
}
リードモデルへの反映
// application/event-handlers/UserEventHandler.ts
export class UserEventHandler {
async handle(event: DomainEvent): Promise<void> {
switch (event.eventType) {
case 'UserRegistered':
await this.handleUserRegistered(event as UserRegisteredEvent);
break;
// 他のイベントタイプ...
}
}
private async handleUserRegistered(event: UserRegisteredEvent): Promise<void> {
// リードモデル(users_read)に挿入
await this.db.insert(usersReadTable).values({
id: event.aggregateId,
auth0Id: event.data.auth0Id,
githubId: event.data.githubId,
githubUsername: event.data.githubUsername,
displayName: event.data.displayName,
avatarUrl: event.data.avatarUrl,
status: 'active',
createdAt: new Date(event.data.registeredAt),
updatedAt: new Date(event.data.registeredAt),
version: 1,
});
}
}
実装上の考慮点
1. 失敗イベントの扱い
ユーザー登録が失敗した場合(重複など)、そのイベントをどこに保存するかという設計上の課題があります。
アプローチ: 監査専用の集約を分離
// 成功イベント: User集約に紐付け
await this.eventProcessor.processEvents([userRegisteredEvent], 'User');
// 失敗イベント: 監査集約に紐付け
await this.eventProcessor.processEvents([failureEvent], 'UserRegistrationAudit');
この設計により、失敗の履歴も完全に追跡できます。
2. 非同期処理の信頼性
Redis Streamsを使用した非同期イベント配信では、メッセージの重複や欠落を防ぐ必要があります。
アプローチ: コンシューマーグループの活用
// コンシューマーグループによる配信保証
await this.redis.xreadgroup(
'GROUP', this.consumerGroup, this.consumerId,
'COUNT', 10,
'BLOCK', 1000,
'STREAMS', this.streamKey, '>'
);
// 処理完了後のACK
await this.redis.xack(this.streamKey, this.consumerGroup, messageId);
3. イベントの順序保証
分散環境では、イベントの順序が保証されない可能性があります。
アプローチ: 複数の仕組みを組み合わせた順序保証
実装してわかったメリット・デメリット
メリット
完全な監査証跡
- すべての変更履歴が残る
- 「いつ誰が何を変更したか」が明確
- デバッグが容易
読み取りパフォーマンスの最適化
- リードモデルは検索に最適化された形で保存
- 複雑な集計もリードモデル側で事前計算
デメリット
実装の複雑さ
- 単純なCRUDと比べてコード量が3-4倍
- 概念の理解に時間がかかる
結果整合性
- コマンド実行とリードモデル更新にタイムラグ
- UIでの考慮が必要
運用コスト
- イベントストアのデータ量が増大
- スナップショット戦略が必要
まとめ
今回は、CQRS/イベントソーシングパターンを使ってユーザー登録機能を実装する方法を体系的に解説しました。
実装時のポイント
- 段階的な導入: すべてをCQRS/ESにする必要はない
- 適切な粒度: イベントは細かすぎず粗すぎず
- 運用の考慮: イベントストアの成長に対する戦略(スナップショット等)
今回の実装から得られた知見
- レイヤードアーキテクチャとの相性: 各レイヤーの責務が明確になる
- 非同期処理の重要性: イベント駆動により疎結合を実現
- テストの書きやすさ: イベントベースでテストシナリオが明確
再度の注意: 本記事で示した実装は、学習目的としては価値がありますが、実際のプロジェクトでは必要性を慎重に検討してください。多くの場合、シンプルなCRUDで十分です。
参考資料
書籍・記事
- Event Sourcing by Martin Fowler
- CQRS by Martin Fowler
- 「実践ドメイン駆動設計」(ヴァーン・ヴァーノン 著)
- 「ドメイン駆動設計を始めよう」(Vlad Khononov 著、増田 亨、綿引 琢磨 訳)
使用技術
関連パターン
- Event Sourcing
- Saga Pattern
- Outbox Pattern
今後の展望
次回は、より複雑な要件を持つ「プロジェクト管理機能」を題材に、以下の要素を含めたCQRS/ES実装を紹介予定です(やる気が出れば):
- 複数集約間のトランザクション
- Sagaパターンによる分散トランザクション
- プロジェクション(Read Model)の最適化
- イベントのバージョニング戦略