何の記事?
業務でDynamoDB Streamsをトリガー実行とするLambdaを開発することになりました。
DynamoDB Streamsをトリガーにする場合、トリガーには関数でエラーが起きた際の「再試行回数」と「レコードの有効期間」を設定することができます。
同一レコードがこの設定値を超過した場合、該当レコードはスキップされ次の取得可能なレコードに処理が移ります。
したがって、レコードが捨てられる際にリカバリのための「障害時の送信先」としてSQSまたはSNSを設定することができます。
この際、SQSやSNSにはどんな内容が出力されるのか説明した記事が見当たらなかったので、記事に書き起こしておきます。
出力内容
SQSでもSNSでも内容は変わらず、次の形式でJsonが出力されます。
DDBStreamBatchInfo
に今回処理対象のStream情報が記載されており、これらの値を使ってリカバリすることができます。
{
"requestContext": {
"requestId": "XXX-XXX-XXX-XXX-XXX",
"functionArn": "arn:aws:lambda:apnortheast-1:XXX:function:lambda_func",
"condition": "RetryAttemptsExhausted",
"approximateInvokeCount": 4
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2022-02-08T03:36:53.132Z",
"DDBStreamBatchInfo": {
"shardId":"shardId-00000000000000000000-XXXXXXXX",
"startSequenceNumber": "00000000000000000000001",
"endSequenceNumber": "00000000000000000000002",
"approximateArrivalOfFirstRecord": "2022-02-08T03:35:51Z",
"approximateArrivalOfLastRecord": "2022-02-08T03:35:51Z",
"batchSize": 2,
"streamArn": "arn:aws:dynamodb:ap-northeast-1:XXX:table/dynamodb-table/stream/2022-02-08T03:17:07.920"
}
}
リカバリについて
方法1: DynamoDB Streamsを再度読み込む
SNS/SQSなどをトリガーにして再処理するLambdaを実装するという方法が一番簡単かと思います。
DynamoDB StreamsのGetShardIteratorというAPIに次の形式でリクエストすると、欲しいShardIdのShardIteratorを取得できます。
この方法では、通常のStreamを処理するのと同じ方法で処理をできます。
{
"SequenceNumber": "${DDBStreamBatchInfo.startSequenceNumber}",
"ShardId": "${DDBStreamBatchInfo.shardId}",
"ShardIteratorType": "AT_SEQUENCE_NUMBER",
"StreamArn": "${DDBStreamBatchInfo.streamArn}"
}
方法2: DynamoDBを直接参照する
approximateArrivalOfFirstRecord
とapproximateArrivalOfLastRecord
あたりを使えば、テーブルの形式によっては、直接DynamoDBを参照することで該当レコードに近い値を割り出すことができるかもしれません。
ちゃんと調べていませんが、approximateArrivalOfFirstRecord
はstartSequenceNumber
のレコードがStreamsに該当レコードが追加された時間です。
処理の重複が許されるのであれば、グローバルセカンダリインデックスを使ってQueryなどでレコードが作成された時間帯である程度絞り込むことも可能だと思います(テーブル形式による)。
おまけ
リカバリの方法1「DynamoDB Streamsを再度読み込む」のJavaサンプルレコードを貼っておきます(SNSトリガー)。
dependencies {
implementation 'com.amazonaws:aws-java-sdk-dynamodb:1.11.833'
implementation 'com.amazonaws:aws-lambda-java-core:1.2.1'
implementation 'com.amazonaws:aws-lambda-java-events:3.11.0'
}
package example;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamRecord;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public class RecoverStreams implements RequestHandler<SNSEvent, Object> {
@Override
public Object handleRequest(SNSEvent input, Context context) {
for (SNSRecord snsRecord : input.getRecords()) {
var sns = snsRecord.getSNS();
var mapper = new ObjectMapper();
try {
var node = mapper.readValue(sns.getMessage(), JsonNode.class);
var ddbStreamBatchInfo = node.get("DDBStreamBatchInfo");
processRecords(ddbStreamBatchInfo.get("streamArn").asText(),
ddbStreamBatchInfo.get("shardId").asText(),
ddbStreamBatchInfo.get("startSequenceNumber").asText(),
ddbStreamBatchInfo.get("endSequenceNumber").asText());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Failed");
}
}
return null;
}
private void processRecords(String streamArn, String shardId, String startSequenceNumber,
String endSequenceNumber) {
var streamsClient = AmazonDynamoDBStreamsClientBuilder
.standard()
.withRegion(Regions.AP_NORTHEAST_1)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
var getShardIteratorRequest = new GetShardIteratorRequest()
.withStreamArn(streamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
.withSequenceNumber(startSequenceNumber);
var getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest);
var shardIterator = getShardIteratorResult.getShardIterator();
var targetSequenceNumber = startSequenceNumber;
do {
var getRecordsResult = streamsClient.getRecords(
new GetRecordsRequest().withShardIterator(shardIterator));
var records = getRecordsResult.getRecords();
for (Record record : records) {
targetSequenceNumber = extract(record.getDynamodb());
if (targetSequenceNumber.equals(endSequenceNumber)) {
break;
}
}
shardIterator = getRecordsResult.getNextShardIterator();
} while (!targetSequenceNumber.equals(endSequenceNumber));
}
private String extract(StreamRecord record) {
// TODO 処理
System.out.println(record.getNewImage().toString());
return record.getSequenceNumber();
}
}