外部システムに対する直列処理パターン実装ガイド
1. 概要
外部システムの制約(同時実行数制限、レート制限など)に対応するため、SQS FIFOを用いて処理を直列化するパターンについて解説します。
1.1 想定されるユースケース
- 外部システムのAPI制限(同時実行数、レート制限)への対応
- データの整合性を保つために必要な順序制御
- システム負荷を制御するための意図的な直列化
1.2 メリット
- 外部システムへの負荷制御が容易
- 処理の順序性が保証される
- エラー時のリトライ制御が容易
- 処理状況の可視化が容易
2. アーキテクチャ設計
2.1 基本構成
2.2 キー設定ポイント
-
単一MessageGroupIdの利用
const sendParams = { QueueUrl: queueUrl, MessageBody: JSON.stringify(message), MessageGroupId: 'EXTERNAL_API_CALL', // 全メッセージで同一のGroupId MessageDeduplicationId: `${timestamp}-${uniqueId}` };
-
Lambda設定
const consumerFunction = new lambda.Function(this, 'ConsumerFunction', { runtime: lambda.Runtime.NODEJS_20_X, handler: 'index.handler', timeout: Duration.seconds(25), // 同時実行数を1に制限 reservedConcurrentExecutions: 1, });
-
イベントソース設定
const eventSource = new lambdaEventSources.SqsEventSource(queue, { batchSize: 1, // 1メッセージずつ処理 maxBatchingWindow: Duration.seconds(0), // 即時処理 reportBatchItemFailures: true });
3. 実装パターン
3.1 基本的な直列処理パターン
// キュー定義
const sequentialQueue = new sqs.Queue(this, 'SequentialQueue', {
queueName: 'sequential-processing.fifo',
fifo: true,
contentBasedDeduplication: false, // MessageDeduplicationIdを明示的に指定
visibilityTimeout: Duration.seconds(30),
deadLetterQueue: {
queue: dlqQueue,
maxReceiveCount: 3
}
});
// メッセージ構造
interface SequentialMessage {
operationType: string;
targetId: string;
payload: any;
metadata: {
sequenceNumber: number;
totalInSequence: number;
timestamp: string;
};
}
// 送信例
const sendSequentialMessages = async (messages: SequentialMessage[]) => {
for (const [index, message] of messages.entries()) {
await sqs.sendMessage({
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
MessageGroupId: 'SEQUENTIAL',
MessageDeduplicationId: `${message.targetId}-${index}`,
// 順序制御のためのメッセージ属性
MessageAttributes: {
SequenceNumber: {
DataType: 'Number',
StringValue: index.toString()
}
}
}).promise();
}
};
3.2 待機時間制御パターン
// 外部APIコール間の待機時間を制御
const processWithDelay = async (message: SequentialMessage) => {
// 前回の実行時刻を取得
const lastExecutionTime = await getLastExecutionTime();
const now = Date.now();
const timeSinceLastExecution = now - lastExecutionTime;
// 最小待機時間を確保
if (timeSinceLastExecution < MINIMUM_INTERVAL_MS) {
await new Promise(resolve =>
setTimeout(resolve, MINIMUM_INTERVAL_MS - timeSinceLastExecution)
);
}
// 外部API呼び出し
const result = await callExternalApi(message);
// 実行時刻を記録
await updateLastExecutionTime(now);
return result;
};
3.3 エラーハンドリングパターン
export const handler = async (event: SQSEvent) => {
const record = event.Records[0]; // バッチサイズ1なので単一レコード
try {
const message = JSON.parse(record.body);
const result = await processWithDelay(message);
// 処理結果をDynamoDBに記録
await saveProcessingResult({
messageId: record.messageId,
result,
status: 'SUCCESS',
processedAt: new Date().toISOString()
});
} catch (error) {
// エラー情報を記録
await saveProcessingResult({
messageId: record.messageId,
error: error.message,
status: 'ERROR',
processedAt: new Date().toISOString()
});
// リトライ可能なエラーの場合
if (isRetryableError(error)) {
return {
batchItemFailures: [{
itemIdentifier: record.messageId
}]
};
}
// 致命的なエラーの場合はDLQへ
throw error;
}
};
4. モニタリングと運用
4.1 処理状況の監視
// CloudWatch メトリクスのセットアップ
new cloudwatch.Metric({
namespace: 'SequentialProcessing',
metricName: 'ProcessingLatency',
dimensions: { QueueName: queue.queueName },
unit: cloudwatch.Unit.MILLISECONDS,
statistic: 'Average'
});
// 処理遅延のアラート設定
new cloudwatch.Alarm(this, 'ProcessingLatencyAlarm', {
metric: processingLatencyMetric,
threshold: 60, // 60秒以上の遅延でアラート
evaluationPeriods: 3,
alarmDescription: 'Sequential processing is taking too long'
});
4.2 デッドレターキューの監視
// DLQモニタリング
const dlqMessagesMetric = dlqQueue.metricApproximateNumberOfMessagesVisible();
new cloudwatch.Alarm(this, 'DLQNotEmptyAlarm', {
metric: dlqMessagesMetric,
threshold: 1,
evaluationPeriods: 1,
alarmDescription: 'Messages detected in DLQ'
});
5. スケーリングとパフォーマンスの考慮点
5.1 スループットの制御
- FIFOキューの制限(1秒あたり300メッセージ)を考慮
- 外部システムのレート制限に合わせた処理間隔の設定
- バッファリングとバッチ処理の検討
5.2 パフォーマンス最適化
-
メッセージサイズの最適化
- 必要最小限のデータのみを含める
- 大きなデータはS3経由で受け渡し
-
タイムアウト設定
- Lambda関数のタイムアウトはSQSの可視性タイムアウトより短く設定
- 外部APIの応答時間を考慮した設定
-
エラー処理の最適化
- 一時的なエラーと永続的なエラーの区別
- 適切なリトライ戦略の実装
6. ベストプラクティス
-
順序制御
- 単一のMessageGroupIdを使用
- シーケンス番号による順序の追跡
- 処理結果の確認と整合性の維持
-
エラーハンドリング
- リトライ可能なエラーの識別
- バックオフ戦略の実装
- エラーログの詳細な記録
-
運用管理
- 処理状況の可視化
- アラート閾値の適切な設定
- 定期的なパフォーマンス評価
-
コスト最適化
- メッセージ保持期間の適切な設定
- Lambda実行時間の最適化
- モニタリングコストの管理