


1. 概要

外部システムの制約(同時実行数制限、レート制限など)に対応するため、SQS FIFOを用いて処理を直列化するパターンについて解説します。

1.1 想定されるユースケース

  • 外部システムのAPI制限(同時実行数、レート制限)への対応
  • データの整合性を保つために必要な順序制御
  • システム負荷を制御するための意図的な直列化

1.2 メリット

  • 外部システムへの負荷制御が容易
  • 処理の順序性が保証される
  • エラー時のリトライ制御が容易
  • 処理状況の可視化が容易

2. アーキテクチャ設計

2.1 基本構成

2.2 キー設定ポイント

  1. 単一MessageGroupIdの利用

    const sendParams = {
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(message),
      MessageGroupId: 'EXTERNAL_API_CALL',  // 全メッセージで同一のGroupId
      MessageDeduplicationId: `${timestamp}-${uniqueId}`
  2. Lambda設定

    const consumerFunction = new lambda.Function(this, 'ConsumerFunction', {
      runtime: lambda.Runtime.NODEJS_20_X,
      handler: 'index.handler',
      timeout: Duration.seconds(25),
      // 同時実行数を1に制限
      reservedConcurrentExecutions: 1,
  3. イベントソース設定

    const eventSource = new lambdaEventSources.SqsEventSource(queue, {
      batchSize: 1,  // 1メッセージずつ処理
      maxBatchingWindow: Duration.seconds(0),  // 即時処理
      reportBatchItemFailures: true

3. 実装パターン

3.1 基本的な直列処理パターン

// キュー定義
const sequentialQueue = new sqs.Queue(this, 'SequentialQueue', {
  queueName: 'sequential-processing.fifo',
  fifo: true,
  contentBasedDeduplication: false,  // MessageDeduplicationIdを明示的に指定
  visibilityTimeout: Duration.seconds(30),
  deadLetterQueue: {
    queue: dlqQueue,
    maxReceiveCount: 3

// メッセージ構造
interface SequentialMessage {
  operationType: string;
  targetId: string;
  payload: any;
  metadata: {
    sequenceNumber: number;
    totalInSequence: number;
    timestamp: string;

// 送信例
const sendSequentialMessages = async (messages: SequentialMessage[]) => {
  for (const [index, message] of messages.entries()) {
    await sqs.sendMessage({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(message),
      MessageGroupId: 'SEQUENTIAL',
      MessageDeduplicationId: `${message.targetId}-${index}`,
      // 順序制御のためのメッセージ属性
      MessageAttributes: {
        SequenceNumber: {
          DataType: 'Number',
          StringValue: index.toString()

3.2 待機時間制御パターン

// 外部APIコール間の待機時間を制御
const processWithDelay = async (message: SequentialMessage) => {
  // 前回の実行時刻を取得
  const lastExecutionTime = await getLastExecutionTime();
  const now = Date.now();
  const timeSinceLastExecution = now - lastExecutionTime;
  // 最小待機時間を確保
  if (timeSinceLastExecution < MINIMUM_INTERVAL_MS) {
    await new Promise(resolve => 
      setTimeout(resolve, MINIMUM_INTERVAL_MS - timeSinceLastExecution)
  // 外部API呼び出し
  const result = await callExternalApi(message);
  // 実行時刻を記録
  await updateLastExecutionTime(now);
  return result;

3.3 エラーハンドリングパターン

export const handler = async (event: SQSEvent) => {
  const record = event.Records[0];  // バッチサイズ1なので単一レコード
  try {
    const message = JSON.parse(record.body);
    const result = await processWithDelay(message);
    // 処理結果をDynamoDBに記録
    await saveProcessingResult({
      messageId: record.messageId,
      status: 'SUCCESS',
      processedAt: new Date().toISOString()
  } catch (error) {
    // エラー情報を記録
    await saveProcessingResult({
      messageId: record.messageId,
      error: error.message,
      status: 'ERROR',
      processedAt: new Date().toISOString()
    // リトライ可能なエラーの場合
    if (isRetryableError(error)) {
      return {
        batchItemFailures: [{
          itemIdentifier: record.messageId
    // 致命的なエラーの場合はDLQへ
    throw error;

4. モニタリングと運用

4.1 処理状況の監視

// CloudWatch メトリクスのセットアップ
new cloudwatch.Metric({
  namespace: 'SequentialProcessing',
  metricName: 'ProcessingLatency',
  dimensions: { QueueName: queue.queueName },
  unit: cloudwatch.Unit.MILLISECONDS,
  statistic: 'Average'

// 処理遅延のアラート設定
new cloudwatch.Alarm(this, 'ProcessingLatencyAlarm', {
  metric: processingLatencyMetric,
  threshold: 60,  // 60秒以上の遅延でアラート
  evaluationPeriods: 3,
  alarmDescription: 'Sequential processing is taking too long'

4.2 デッドレターキューの監視

// DLQモニタリング
const dlqMessagesMetric = dlqQueue.metricApproximateNumberOfMessagesVisible();

new cloudwatch.Alarm(this, 'DLQNotEmptyAlarm', {
  metric: dlqMessagesMetric,
  threshold: 1,
  evaluationPeriods: 1,
  alarmDescription: 'Messages detected in DLQ'

5. スケーリングとパフォーマンスの考慮点

5.1 スループットの制御

  • FIFOキューの制限(1秒あたり300メッセージ)を考慮
  • 外部システムのレート制限に合わせた処理間隔の設定
  • バッファリングとバッチ処理の検討

5.2 パフォーマンス最適化

  1. メッセージサイズの最適化

    • 必要最小限のデータのみを含める
    • 大きなデータはS3経由で受け渡し
  2. タイムアウト設定

    • Lambda関数のタイムアウトはSQSの可視性タイムアウトより短く設定
    • 外部APIの応答時間を考慮した設定
  3. エラー処理の最適化

    • 一時的なエラーと永続的なエラーの区別
    • 適切なリトライ戦略の実装

6. ベストプラクティス

  1. 順序制御

    • 単一のMessageGroupIdを使用
    • シーケンス番号による順序の追跡
    • 処理結果の確認と整合性の維持
  2. エラーハンドリング

    • リトライ可能なエラーの識別
    • バックオフ戦略の実装
    • エラーログの詳細な記録
  3. 運用管理

    • 処理状況の可視化
    • アラート閾値の適切な設定
    • 定期的なパフォーマンス評価
  4. コスト最適化

    • メッセージ保持期間の適切な設定
    • Lambda実行時間の最適化
    • モニタリングコストの管理

