1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DynamoDBトリガーLambdaの「障害時の送信先 (SQS/SNS) 」における出力内容

Last updated at Posted at 2022-02-08

何の記事?

業務でDynamoDB Streamsをトリガー実行とするLambdaを開発することになりました。
DynamoDB Streamsをトリガーにする場合、トリガーには関数でエラーが起きた際の「再試行回数」と「レコードの有効期間」を設定することができます。
同一レコードがこの設定値を超過した場合、該当レコードはスキップされ次の取得可能なレコードに処理が移ります。
したがって、レコードが捨てられる際にリカバリのための「障害時の送信先」としてSQSまたはSNSを設定することができます。
この際、SQSやSNSにはどんな内容が出力されるのか説明した記事が見当たらなかったので、記事に書き起こしておきます。

出力内容

SQSでもSNSでも内容は変わらず、次の形式でJsonが出力されます。
DDBStreamBatchInfoに今回処理対象のStream情報が記載されており、これらの値を使ってリカバリすることができます。

{
  "requestContext": {
    "requestId": "XXX-XXX-XXX-XXX-XXX",
  	"functionArn": "arn:aws:lambda:apnortheast-1:XXX:function:lambda_func",
  	"condition": "RetryAttemptsExhausted",
  	"approximateInvokeCount": 4
  },
  "responseContext": {
  	"statusCode": 200,
  	"executedVersion": "$LATEST",
  	"functionError": "Unhandled"
  },
  "version": "1.0",
  "timestamp": "2022-02-08T03:36:53.132Z",
  "DDBStreamBatchInfo": {
  	"shardId":"shardId-00000000000000000000-XXXXXXXX",
  	"startSequenceNumber": "00000000000000000000001",
  	"endSequenceNumber": "00000000000000000000002",
  	"approximateArrivalOfFirstRecord": "2022-02-08T03:35:51Z",
  	"approximateArrivalOfLastRecord": "2022-02-08T03:35:51Z",
  	"batchSize": 2,
  	"streamArn": "arn:aws:dynamodb:ap-northeast-1:XXX:table/dynamodb-table/stream/2022-02-08T03:17:07.920"
  }
}

リカバリについて

方法1: DynamoDB Streamsを再度読み込む

SNS/SQSなどをトリガーにして再処理するLambdaを実装するという方法が一番簡単かと思います。
DynamoDB StreamsのGetShardIteratorというAPIに次の形式でリクエストすると、欲しいShardIdのShardIteratorを取得できます。
この方法では、通常のStreamを処理するのと同じ方法で処理をできます。

{
   "SequenceNumber": "${DDBStreamBatchInfo.startSequenceNumber}",
   "ShardId": "${DDBStreamBatchInfo.shardId}",
   "ShardIteratorType": "AT_SEQUENCE_NUMBER",
   "StreamArn": "${DDBStreamBatchInfo.streamArn}"
}

方法2: DynamoDBを直接参照する

approximateArrivalOfFirstRecordapproximateArrivalOfLastRecordあたりを使えば、テーブルの形式によっては、直接DynamoDBを参照することで該当レコードに近い値を割り出すことができるかもしれません。
ちゃんと調べていませんが、approximateArrivalOfFirstRecordstartSequenceNumberのレコードがStreamsに該当レコードが追加された時間です。
処理の重複が許されるのであれば、グローバルセカンダリインデックスを使ってQueryなどでレコードが作成された時間帯である程度絞り込むことも可能だと思います(テーブル形式による)。

おまけ

リカバリの方法1「DynamoDB Streamsを再度読み込む」のJavaサンプルレコードを貼っておきます(SNSトリガー)。

build.gradle
dependencies {
    implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
    implementation 'com.amazonaws:aws-lambda-java-core:1.2.1'
    implementation 'com.amazonaws:aws-lambda-java-events:3.11.0'
}
RecoverStreams.java
package example;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamRecord;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;

public class RecoverStreams implements RequestHandler<SNSEvent, Object> {

    @Override
    public Object handleRequest(SNSEvent input, Context context) {
        for (SNSRecord snsRecord : input.getRecords()) {
            var sns = snsRecord.getSNS();
            var mapper = new ObjectMapper();
            try {
                var node = mapper.readValue(sns.getMessage(), JsonNode.class);
                var ddbStreamBatchInfo = node.get("DDBStreamBatchInfo");
                processRecords(ddbStreamBatchInfo.get("streamArn").asText(),
                        ddbStreamBatchInfo.get("shardId").asText(),
                        ddbStreamBatchInfo.get("startSequenceNumber").asText(),
                        ddbStreamBatchInfo.get("endSequenceNumber").asText());
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException("Failed");
            }
        }

        return null;
    }

    private void processRecords(String streamArn, String shardId, String startSequenceNumber,
            String endSequenceNumber) {
        var streamsClient = AmazonDynamoDBStreamsClientBuilder
                .standard()
                .withRegion(Regions.AP_NORTHEAST_1)
                .withCredentials(new DefaultAWSCredentialsProviderChain())
                .build();

        var getShardIteratorRequest = new GetShardIteratorRequest()
                .withStreamArn(streamArn)
                .withShardId(shardId)
                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
                .withSequenceNumber(startSequenceNumber);
        var getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest);

        var shardIterator = getShardIteratorResult.getShardIterator();
        var targetSequenceNumber = startSequenceNumber;
        do {
            var getRecordsResult = streamsClient.getRecords(
                    new GetRecordsRequest().withShardIterator(shardIterator));
            var records = getRecordsResult.getRecords();
            for (Record record : records) {
                targetSequenceNumber = extract(record.getDynamodb());
                if (targetSequenceNumber.equals(endSequenceNumber)) {
                    break;
                }
            }
            shardIterator = getRecordsResult.getNextShardIterator();
        } while (!targetSequenceNumber.equals(endSequenceNumber));
    }

    private String extract(StreamRecord record) {
        // TODO 処理
        System.out.println(record.getNewImage().toString());
        return record.getSequenceNumber();
    }
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?