はじめに
「このクラス、なんでこんなに責務が多いんだ...」
コードレビューでこんな感想を持ったことはありませんか?決済処理のクラスがメール送信もキャッシュ削除もログ記録も全部やってる。まるで全部入りラーメンみたいな状態。
今回は、そんな密結合地獄から脱出するための イベントドリブンアーキテクチャを、NestJSで実装する方法を紹介します。実際のプロダクションで動いているコードを元に、すぐ使える実装パターンをお見せします!
なぜイベントドリブンなのか? 🤔
従来の実装だとこんな感じになりがち
// ❌ 全部入りサービス
class PaymentService {
async completePayment(orderId: string) {
// 決済処理
await this.processPayment(orderId);
// あれもこれも...
await this.sendEmail(orderId); // メール送信も!
await this.clearCache(orderId); // キャッシュ削除も!
await this.notifyExternal(orderId); // 外部連携も!
await this.updateAnalytics(orderId); // 分析データも!
// もう何のサービスかわからない...
}
}
これをイベントドリブンにすると
// ✅ 単一責務!
class PaymentService {
async completePayment(orderId: string) {
// 決済処理だけに集中
await this.processPayment(orderId);
// あとはイベントを投げるだけ
this.eventBus.emit(new PaymentCompletedEvent(orderId));
}
}
シンプル!関心事が分離されて、各処理は独立したリスナーが担当します。
アーキテクチャの全体像 📐
こんな構成で実装していきます。
[ビジネスロジック]
↓ 「決済完了したよ〜」とイベント発行
[EventBus]
↓ イベントを配信
[リスナーたち]
├── 📧 NotificationListener(メール送信)
├── 🗑️ CacheListener(キャッシュ削除)
└── 🌐 ExternalListener(外部API連携)
各リスナーは独立して動作。一つがコケても他は影響を受けません。
Step 1: 基底イベントクラスを作る🏗️
まずは全イベントの親となる基底クラスから
// base.event.ts
export abstract class DomainEvent {
readonly occurredOn: Date;
readonly aggregateId: string;
readonly eventVersion: number = 1;
constructor(aggregateId: string) {
this.aggregateId = aggregateId;
this.occurredOn = new Date();
}
// クラス名がそのままイベント名になる便利技!
get eventName(): string {
return this.constructor.name;
}
}
ポイントは3つ
- 📝 readonly で不変性を保証(イベントは事実の記録なので変更不可)
- ⏰ occurredOn で発生時刻を自動記録
- 🏷️ クラス名 = イベント名 でtypoとおさらば
Step 2: 具体的なイベントを定義📦
// payment-completed.event.ts
export class PaymentCompletedEvent extends DomainEvent {
constructor(
public readonly orderId: string,
public readonly userId: string,
public readonly amount: number,
) {
super(orderId); // orderIdを集約IDとして使用
}
}
シンプルですね。必要な情報だけを持たせます。
Step 3: EventBusサービスの実装 🚌
イベントの配信を担当する中核サービス
// event-bus.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Injectable()
export class EventBusService {
private readonly logger = new Logger(EventBusService.name);
constructor(private readonly eventEmitter: EventEmitter2) {}
emit<T extends DomainEvent>(event: T): void {
const eventName = event.eventName;
// 構造化ログでデバッグが楽に!
this.logger.log(`🎯 Emitting event: ${eventName}`, {
aggregateId: event.aggregateId,
occurredOn: event.occurredOn,
});
this.eventEmitter.emit(eventName, event);
}
// 複数イベントを一気に発行
emitAll(events: DomainEvent[]): void {
events.forEach(event => this.emit(event));
}
}
Step 4: モジュールの設定 ⚙️
// events.module.ts
import { Module, Global } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
@Global() // どこからでも使えるように
@Module({
imports: [
EventEmitterModule.forRoot({
wildcard: true, // payment.* みたいなパターンマッチ可能
delimiter: '.', // ネームスペースの区切り文字
maxListeners: 20, // メモリリーク防止
ignoreErrors: false, // エラーは隠さない!
}),
],
providers: [EventBusService],
exports: [EventBusService, EventEmitterModule],
})
export class EventsModule {}
Step 5: リスナーの実装 👂
ここが一番楽しいところ!各リスナーが独立して仕事をします。
通知リスナー 📧
@Injectable()
export class NotificationListener {
private readonly logger = new Logger(NotificationListener.name);
// async: true で非同期実行!
@OnEvent(PaymentCompletedEvent.name, { async: true })
async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
this.logger.log(`📧 Sending email for order ${event.orderId}`);
try {
// メール送信処理
await this.emailService.send({
to: event.userId,
subject: '決済完了のお知らせ',
orderId: event.orderId,
});
this.logger.log(`✅ Email sent successfully`);
} catch (error) {
// エラーでも他のリスナーを止めない!
this.logger.error(`❌ Failed to send email`, error);
// ここでリトライキューに入れるとか、アラート送るとか
}
}
}
キャッシュ管理リスナー 🗑️
@Injectable()
export class CacheListener {
private readonly logger = new Logger(CacheListener.name);
@OnEvent(PaymentCompletedEvent.name, { async: true })
async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
this.logger.log(`🗑️ Invalidating cache for order ${event.orderId}`);
try {
// 関連するキャッシュをクリア
await this.cacheService.invalidate([
`order:${event.orderId}`,
`user-orders:${event.userId}`,
]);
this.logger.log(`✅ Cache invalidated`);
} catch (error) {
this.logger.error(`❌ Cache invalidation failed`, error);
// キャッシュ削除失敗は致命的じゃないので続行
}
}
}
外部連携リスナー 🌐
@Injectable()
export class ExternalApiListener {
private readonly logger = new Logger(ExternalApiListener.name);
@OnEvent(PaymentCompletedEvent.name, { async: true })
async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
this.logger.log(`🌐 Notifying external system`);
try {
// 外部APIに通知
await this.externalApi.notify({
type: 'PAYMENT_COMPLETED',
data: {
orderId: event.orderId,
amount: event.amount,
timestamp: event.occurredOn,
},
});
this.logger.log(`✅ External API notified`);
} catch (error) {
this.logger.error(`❌ External API notification failed`, error);
// ここは重要なので再送キューに入れる
await this.retryQueue.add(event);
}
}
}
Step 6: ビジネスロジックから使う💼
実際の使い方はこんな感じ
@Injectable()
export class OrderService {
constructor(
private readonly eventBus: EventBusService,
) {}
async completeOrder(orderId: string): Promise<void> {
// ビジネスロジックの処理
const order = await this.processOrder(orderId);
// イベント発行するだけ!
this.eventBus.emit(
new PaymentCompletedEvent(
order.id,
order.userId,
order.amount,
)
);
// これで終わり。あとは各リスナーがよしなにやってくれる
}
}
テストの書き方 🧪
イベントドリブンはテストも書きやすい!
describe('OrderService', () => {
let service: OrderService;
let eventBus: EventBusService;
beforeEach(() => {
const module = Test.createTestingModule({
providers: [
OrderService,
{
provide: EventBusService,
useValue: { emit: jest.fn() },
},
],
}).compile();
service = module.get(OrderService);
eventBus = module.get(EventBusService);
});
it('注文完了時にイベントが発行される', async () => {
await service.completeOrder('order-123');
expect(eventBus.emit).toHaveBeenCalledWith(
expect.objectContaining({
eventName: 'PaymentCompletedEvent',
aggregateId: 'order-123',
})
);
});
});
リスナーも独立してテスト可能
describe('NotificationListener', () => {
it('決済完了イベントでメールが送信される', async () => {
const event = new PaymentCompletedEvent('order-123', 'user-456', 1000);
await listener.handlePaymentCompleted(event);
expect(emailService.send).toHaveBeenCalledWith(
expect.objectContaining({
to: 'user-456',
orderId: 'order-123',
})
);
});
});
実装のベストプラクティス 📚
✅ DO
1. イベントは不変にする
// readonlyで変更を防ぐ
readonly orderId: string;
2. エラーでも処理を継続
try {
await this.process();
} catch (error) {
this.logger.error(error);
// throwしない!
}
3. 冪等性を保つ
- 同じイベントが2回来ても問題ない設計に
4. 構造化ログを活用
this.logger.log('Event processed', {
eventId: event.id,
duration: Date.now() - startTime,
});
❌ DON'T
1. イベントの中で副作用を起こす
// ❌ コンストラクタでAPI呼ぶとか絶対ダメ
constructor() {
super();
this.callApi(); // NO!
}
2. 同期的な処理が必要な場合に使う
- トランザクション内の処理には向かない
3. イベントの連鎖を深くする
- A → B → C → D... みたいな長い連鎖は追いづらい
運用での考慮点 🔧
デバッグのコツ
- 構造化ログでイベントの流れを追跡
// correlation IDを使った追跡
this.logger.log('Event flow', {
correlationId: event.aggregateId,
step: 'notification_sent',
timestamp: new Date(),
});
- パフォーマンス監視
// 各リスナーの実行時間を計測
const startTime = Date.now();
await this.process(event);
const duration = Date.now() - startTime;
if (duration > 1000) {
this.logger.warn(`Slow listener detected: ${duration}ms`);
}
- 将来の拡張性
今はインメモリのEventEmitterだけど、将来的にはメッセージキューに移行も視野に:
// 将来的にRabbitMQやKafkaに差し替え可能な設計
interface EventPublisher {
publish<T extends DomainEvent>(event: T): Promise<void>;
}
まとめ 🎉
イベントドリブンアーキテクチャを導入することで
- 🎯 関心事が分離されて各クラスの責務が明確に
- 🔌 疎結合で新機能追加が簡単
- 🧪 テスタブルでユニットテストが書きやすい
- 🚀 スケーラブルで将来の拡張も楽々
最初は「イベント多すぎない?」と思うかもしれませんが、慣れると「なんでもっと早く導入しなかったんだ」となるはず。
実際のプロダクションで1年以上運用していますが、新機能追加時に既存コードをほとんど触らずに済む
のは本当に楽です。「決済完了時にSlack通知も追加して」と言われても、新しいリスナー追加するだけ。
みなさんもぜひ試してみてください。質問があればコメント欄でお待ちしています!
参考リンク