0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

外部システムに対する直列処理パターン実装ガイド

Posted at

外部システムに対する直列処理パターン実装ガイド

1. 概要

外部システムの制約(同時実行数制限、レート制限など)に対応するため、SQS FIFOを用いて処理を直列化するパターンについて解説します。

1.1 想定されるユースケース

  • 外部システムのAPI制限(同時実行数、レート制限)への対応
  • データの整合性を保つために必要な順序制御
  • システム負荷を制御するための意図的な直列化

1.2 メリット

  • 外部システムへの負荷制御が容易
  • 処理の順序性が保証される
  • エラー時のリトライ制御が容易
  • 処理状況の可視化が容易

2. アーキテクチャ設計

2.1 基本構成

2.2 キー設定ポイント

  1. 単一MessageGroupIdの利用

    const sendParams = {
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(message),
      MessageGroupId: 'EXTERNAL_API_CALL',  // 全メッセージで同一のGroupId
      MessageDeduplicationId: `${timestamp}-${uniqueId}`
    };
    
  2. Lambda設定

    const consumerFunction = new lambda.Function(this, 'ConsumerFunction', {
      runtime: lambda.Runtime.NODEJS_20_X,
      handler: 'index.handler',
      timeout: Duration.seconds(25),
      // 同時実行数を1に制限
      reservedConcurrentExecutions: 1,
    });
    
  3. イベントソース設定

    const eventSource = new lambdaEventSources.SqsEventSource(queue, {
      batchSize: 1,  // 1メッセージずつ処理
      maxBatchingWindow: Duration.seconds(0),  // 即時処理
      reportBatchItemFailures: true
    });
    

3. 実装パターン

3.1 基本的な直列処理パターン

// キュー定義
const sequentialQueue = new sqs.Queue(this, 'SequentialQueue', {
  queueName: 'sequential-processing.fifo',
  fifo: true,
  contentBasedDeduplication: false,  // MessageDeduplicationIdを明示的に指定
  visibilityTimeout: Duration.seconds(30),
  deadLetterQueue: {
    queue: dlqQueue,
    maxReceiveCount: 3
  }
});

// メッセージ構造
interface SequentialMessage {
  operationType: string;
  targetId: string;
  payload: any;
  metadata: {
    sequenceNumber: number;
    totalInSequence: number;
    timestamp: string;
  };
}

// 送信例
const sendSequentialMessages = async (messages: SequentialMessage[]) => {
  for (const [index, message] of messages.entries()) {
    await sqs.sendMessage({
      QueueUrl: queueUrl,
      MessageBody: JSON.stringify(message),
      MessageGroupId: 'SEQUENTIAL',
      MessageDeduplicationId: `${message.targetId}-${index}`,
      // 順序制御のためのメッセージ属性
      MessageAttributes: {
        SequenceNumber: {
          DataType: 'Number',
          StringValue: index.toString()
        }
      }
    }).promise();
  }
};

3.2 待機時間制御パターン

// 外部APIコール間の待機時間を制御
const processWithDelay = async (message: SequentialMessage) => {
  // 前回の実行時刻を取得
  const lastExecutionTime = await getLastExecutionTime();
  const now = Date.now();
  const timeSinceLastExecution = now - lastExecutionTime;
  
  // 最小待機時間を確保
  if (timeSinceLastExecution < MINIMUM_INTERVAL_MS) {
    await new Promise(resolve => 
      setTimeout(resolve, MINIMUM_INTERVAL_MS - timeSinceLastExecution)
    );
  }
  
  // 外部API呼び出し
  const result = await callExternalApi(message);
  
  // 実行時刻を記録
  await updateLastExecutionTime(now);
  
  return result;
};

3.3 エラーハンドリングパターン

export const handler = async (event: SQSEvent) => {
  const record = event.Records[0];  // バッチサイズ1なので単一レコード
  
  try {
    const message = JSON.parse(record.body);
    const result = await processWithDelay(message);
    
    // 処理結果をDynamoDBに記録
    await saveProcessingResult({
      messageId: record.messageId,
      result,
      status: 'SUCCESS',
      processedAt: new Date().toISOString()
    });
    
  } catch (error) {
    // エラー情報を記録
    await saveProcessingResult({
      messageId: record.messageId,
      error: error.message,
      status: 'ERROR',
      processedAt: new Date().toISOString()
    });
    
    // リトライ可能なエラーの場合
    if (isRetryableError(error)) {
      return {
        batchItemFailures: [{
          itemIdentifier: record.messageId
        }]
      };
    }
    
    // 致命的なエラーの場合はDLQへ
    throw error;
  }
};

4. モニタリングと運用

4.1 処理状況の監視

// CloudWatch メトリクスのセットアップ
new cloudwatch.Metric({
  namespace: 'SequentialProcessing',
  metricName: 'ProcessingLatency',
  dimensions: { QueueName: queue.queueName },
  unit: cloudwatch.Unit.MILLISECONDS,
  statistic: 'Average'
});

// 処理遅延のアラート設定
new cloudwatch.Alarm(this, 'ProcessingLatencyAlarm', {
  metric: processingLatencyMetric,
  threshold: 60,  // 60秒以上の遅延でアラート
  evaluationPeriods: 3,
  alarmDescription: 'Sequential processing is taking too long'
});

4.2 デッドレターキューの監視

// DLQモニタリング
const dlqMessagesMetric = dlqQueue.metricApproximateNumberOfMessagesVisible();

new cloudwatch.Alarm(this, 'DLQNotEmptyAlarm', {
  metric: dlqMessagesMetric,
  threshold: 1,
  evaluationPeriods: 1,
  alarmDescription: 'Messages detected in DLQ'
});

5. スケーリングとパフォーマンスの考慮点

5.1 スループットの制御

  • FIFOキューの制限(1秒あたり300メッセージ)を考慮
  • 外部システムのレート制限に合わせた処理間隔の設定
  • バッファリングとバッチ処理の検討

5.2 パフォーマンス最適化

  1. メッセージサイズの最適化

    • 必要最小限のデータのみを含める
    • 大きなデータはS3経由で受け渡し
  2. タイムアウト設定

    • Lambda関数のタイムアウトはSQSの可視性タイムアウトより短く設定
    • 外部APIの応答時間を考慮した設定
  3. エラー処理の最適化

    • 一時的なエラーと永続的なエラーの区別
    • 適切なリトライ戦略の実装

6. ベストプラクティス

  1. 順序制御

    • 単一のMessageGroupIdを使用
    • シーケンス番号による順序の追跡
    • 処理結果の確認と整合性の維持
  2. エラーハンドリング

    • リトライ可能なエラーの識別
    • バックオフ戦略の実装
    • エラーログの詳細な記録
  3. 運用管理

    • 処理状況の可視化
    • アラート閾値の適切な設定
    • 定期的なパフォーマンス評価
  4. コスト最適化

    • メッセージ保持期間の適切な設定
    • Lambda実行時間の最適化
    • モニタリングコストの管理
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?