1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NestJSで実装するイベントドリブンアーキテクチャ 実践ガイド 🚀

1
Last updated at Posted at 2025-08-07

はじめに

「このクラス、なんでこんなに責務が多いんだ...」

コードレビューでこんな感想を持ったことはありませんか?決済処理のクラスがメール送信もキャッシュ削除もログ記録も全部やってる。まるで全部入りラーメンみたいな状態。

今回は、そんな密結合地獄から脱出するための イベントドリブンアーキテクチャを、NestJSで実装する方法を紹介します。実際のプロダクションで動いているコードを元に、すぐ使える実装パターンをお見せします!

なぜイベントドリブンなのか? 🤔

従来の実装だとこんな感じになりがち

// ❌ 全部入りサービス
class PaymentService {
  async completePayment(orderId: string) {
    // 決済処理
    await this.processPayment(orderId);

    // あれもこれも...
    await this.sendEmail(orderId);        // メール送信も!
    await this.clearCache(orderId);       // キャッシュ削除も!
    await this.notifyExternal(orderId);   // 外部連携も!
    await this.updateAnalytics(orderId);  // 分析データも!

    // もう何のサービスかわからない...
  }
}

これをイベントドリブンにすると

// ✅ 単一責務!
class PaymentService {
  async completePayment(orderId: string) {
    // 決済処理だけに集中
    await this.processPayment(orderId);

    // あとはイベントを投げるだけ
    this.eventBus.emit(new PaymentCompletedEvent(orderId));
  }
}

シンプル!関心事が分離されて、各処理は独立したリスナーが担当します。

アーキテクチャの全体像 📐

こんな構成で実装していきます。

  [ビジネスロジック]
      ↓ 「決済完了したよ〜」とイベント発行
  [EventBus]
      ↓ イベントを配信
  [リスナーたち]
      ├── 📧 NotificationListener(メール送信)
      ├── 🗑️ CacheListener(キャッシュ削除)
      └── 🌐 ExternalListener(外部API連携)

各リスナーは独立して動作。一つがコケても他は影響を受けません。

Step 1: 基底イベントクラスを作る🏗️

まずは全イベントの親となる基底クラスから

  // base.event.ts
  export abstract class DomainEvent {
    readonly occurredOn: Date;
    readonly aggregateId: string;
    readonly eventVersion: number = 1;

    constructor(aggregateId: string) {
      this.aggregateId = aggregateId;
      this.occurredOn = new Date();
    }

    // クラス名がそのままイベント名になる便利技!
    get eventName(): string {
      return this.constructor.name;
    }
  }

ポイントは3つ

  • 📝 readonly で不変性を保証(イベントは事実の記録なので変更不可)
  • ⏰ occurredOn で発生時刻を自動記録
  • 🏷️ クラス名 = イベント名 でtypoとおさらば

Step 2: 具体的なイベントを定義📦

  // payment-completed.event.ts
  export class PaymentCompletedEvent extends DomainEvent {
    constructor(
      public readonly orderId: string,
      public readonly userId: string,
      public readonly amount: number,
    ) {
      super(orderId); // orderIdを集約IDとして使用
    }
  }

シンプルですね。必要な情報だけを持たせます。

Step 3: EventBusサービスの実装 🚌

イベントの配信を担当する中核サービス

  // event-bus.service.ts
  import { Injectable, Logger } from '@nestjs/common';
  import { EventEmitter2 } from '@nestjs/event-emitter';

  @Injectable()
  export class EventBusService {
    private readonly logger = new Logger(EventBusService.name);

    constructor(private readonly eventEmitter: EventEmitter2) {}

    emit<T extends DomainEvent>(event: T): void {
      const eventName = event.eventName;

      // 構造化ログでデバッグが楽に!
      this.logger.log(`🎯 Emitting event: ${eventName}`, {
        aggregateId: event.aggregateId,
        occurredOn: event.occurredOn,
      });

      this.eventEmitter.emit(eventName, event);
    }

    // 複数イベントを一気に発行
    emitAll(events: DomainEvent[]): void {
      events.forEach(event => this.emit(event));
    }
  }

Step 4: モジュールの設定 ⚙️

  // events.module.ts
  import { Module, Global } from '@nestjs/common';
  import { EventEmitterModule } from '@nestjs/event-emitter';

  @Global() // どこからでも使えるように
  @Module({
    imports: [
      EventEmitterModule.forRoot({
        wildcard: true,       // payment.* みたいなパターンマッチ可能
        delimiter: '.',       // ネームスペースの区切り文字
        maxListeners: 20,     // メモリリーク防止
        ignoreErrors: false,  // エラーは隠さない!
      }),
    ],
    providers: [EventBusService],
    exports: [EventBusService, EventEmitterModule],
  })
  export class EventsModule {}

Step 5: リスナーの実装 👂

ここが一番楽しいところ!各リスナーが独立して仕事をします。

通知リスナー 📧

  @Injectable()
  export class NotificationListener {
    private readonly logger = new Logger(NotificationListener.name);

    // async: true で非同期実行!
    @OnEvent(PaymentCompletedEvent.name, { async: true })
    async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
      this.logger.log(`📧 Sending email for order ${event.orderId}`);

      try {
        // メール送信処理
        await this.emailService.send({
          to: event.userId,
          subject: '決済完了のお知らせ',
          orderId: event.orderId,
        });

        this.logger.log(`✅ Email sent successfully`);
      } catch (error) {
        // エラーでも他のリスナーを止めない!
        this.logger.error(`❌ Failed to send email`, error);
        // ここでリトライキューに入れるとか、アラート送るとか
      }
    }
  }

キャッシュ管理リスナー 🗑️

  @Injectable()
  export class CacheListener {
    private readonly logger = new Logger(CacheListener.name);

    @OnEvent(PaymentCompletedEvent.name, { async: true })
    async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
      this.logger.log(`🗑️ Invalidating cache for order ${event.orderId}`);

      try {
        // 関連するキャッシュをクリア
        await this.cacheService.invalidate([
          `order:${event.orderId}`,
          `user-orders:${event.userId}`,
        ]);

        this.logger.log(`✅ Cache invalidated`);
      } catch (error) {
        this.logger.error(`❌ Cache invalidation failed`, error);
        // キャッシュ削除失敗は致命的じゃないので続行
      }
    }
  }

外部連携リスナー 🌐

  @Injectable()
  export class ExternalApiListener {
    private readonly logger = new Logger(ExternalApiListener.name);

    @OnEvent(PaymentCompletedEvent.name, { async: true })
    async handlePaymentCompleted(event: PaymentCompletedEvent): Promise<void> {
      this.logger.log(`🌐 Notifying external system`);

      try {
        // 外部APIに通知
        await this.externalApi.notify({
          type: 'PAYMENT_COMPLETED',
          data: {
            orderId: event.orderId,
            amount: event.amount,
            timestamp: event.occurredOn,
          },
        });

        this.logger.log(`✅ External API notified`);
      } catch (error) {
        this.logger.error(`❌ External API notification failed`, error);
        // ここは重要なので再送キューに入れる
        await this.retryQueue.add(event);
      }
    }
  }

Step 6: ビジネスロジックから使う💼

実際の使い方はこんな感じ

@Injectable()
export class OrderService {
  constructor(
    private readonly eventBus: EventBusService,
  ) {}

  async completeOrder(orderId: string): Promise<void> {
    // ビジネスロジックの処理
    const order = await this.processOrder(orderId);

    // イベント発行するだけ!
    this.eventBus.emit(
      new PaymentCompletedEvent(
        order.id,
        order.userId,
        order.amount,
      )
    );

    // これで終わり。あとは各リスナーがよしなにやってくれる
  }
}

テストの書き方 🧪

イベントドリブンはテストも書きやすい!

describe('OrderService', () => {
  let service: OrderService;
  let eventBus: EventBusService;

  beforeEach(() => {
    const module = Test.createTestingModule({
      providers: [
        OrderService,
        {
          provide: EventBusService,
          useValue: { emit: jest.fn() },
        },
      ],
    }).compile();

    service = module.get(OrderService);
    eventBus = module.get(EventBusService);
  });

  it('注文完了時にイベントが発行される', async () => {
    await service.completeOrder('order-123');

    expect(eventBus.emit).toHaveBeenCalledWith(
      expect.objectContaining({
        eventName: 'PaymentCompletedEvent',
        aggregateId: 'order-123',
      })
    );
  });
});

リスナーも独立してテスト可能

describe('NotificationListener', () => {
  it('決済完了イベントでメールが送信される', async () => {
    const event = new PaymentCompletedEvent('order-123', 'user-456', 1000);

    await listener.handlePaymentCompleted(event);

    expect(emailService.send).toHaveBeenCalledWith(
      expect.objectContaining({
        to: 'user-456',
        orderId: 'order-123',
      })
    );
  });
});

実装のベストプラクティス 📚

✅ DO

1. イベントは不変にする
// readonlyで変更を防ぐ
readonly orderId: string;
2. エラーでも処理を継続
try {
  await this.process();
} catch (error) {
  this.logger.error(error);
  // throwしない!
}
3. 冪等性を保つ
  - 同じイベントが2回来ても問題ない設計に
4. 構造化ログを活用
this.logger.log('Event processed', {
  eventId: event.id,
  duration: Date.now() - startTime,
});

❌ DON'T

  1. イベントの中で副作用を起こす
  // ❌ コンストラクタでAPI呼ぶとか絶対ダメ
  constructor() {
    super();
    this.callApi(); // NO!
  }
  2. 同期的な処理が必要な場合に使う
    - トランザクション内の処理には向かない
  3. イベントの連鎖を深くする
    - A  B  C  D... みたいな長い連鎖は追いづらい

運用での考慮点 🔧

デバッグのコツ

  • 構造化ログでイベントの流れを追跡
  // correlation IDを使った追跡
  this.logger.log('Event flow', {
    correlationId: event.aggregateId,
    step: 'notification_sent',
    timestamp: new Date(),
  });
  • パフォーマンス監視
  // 各リスナーの実行時間を計測
  const startTime = Date.now();
  await this.process(event);
  const duration = Date.now() - startTime;

  if (duration > 1000) {
    this.logger.warn(`Slow listener detected: ${duration}ms`);
  }
  • 将来の拡張性
  今はインメモリのEventEmitterだけど将来的にはメッセージキューに移行も視野に

  // 将来的にRabbitMQやKafkaに差し替え可能な設計
  interface EventPublisher {
    publish<T extends DomainEvent>(event: T): Promise<void>;
  }

まとめ 🎉

イベントドリブンアーキテクチャを導入することで

  • 🎯 関心事が分離されて各クラスの責務が明確に
  • 🔌 疎結合で新機能追加が簡単
  • 🧪 テスタブルでユニットテストが書きやすい
  • 🚀 スケーラブルで将来の拡張も楽々

最初は「イベント多すぎない?」と思うかもしれませんが、慣れると「なんでもっと早く導入しなかったんだ」となるはず。

実際のプロダクションで1年以上運用していますが、新機能追加時に既存コードをほとんど触らずに済む
のは本当に楽です。「決済完了時にSlack通知も追加して」と言われても、新しいリスナー追加するだけ。

みなさんもぜひ試してみてください。質問があればコメント欄でお待ちしています!


参考リンク

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?