概要
DynamoDB StreamsのレコードをKCLを使用して処理する方法です。
AWSの公式ドキュメントに実装例が示されています。
この実装例は、
- DynamoDBにテーブルを生成する
- DynamoDBのレコードを作成(更新、削除)する
- レコードの更新を受け、DynamoDB Streamsにレコードが流れる
- DynamoDB Streamsに流れたレコードを処理する
というものですが、実際には一連の処理を1つのアプリケーションで行うことはないかと思い、DynamoDB Streamsに流れたレコードを処理する部分(4.)にフォーカスを当てる形で作り替えました。
使用した技術
- Java 17
- localstack
- Kinesis Client Library (KCL)
- aws-sdk-java
実装
※ 一部、以下のクラス内のimport等は省略しています。
build.gradle
dependencies {
implementation platform("software.amazon.awssdk:bom:2.17.214")
implementation "software.amazon.awssdk:cloudwatch"
implementation "software.amazon.awssdk:dynamodb"
implementation "software.amazon.awssdk:kinesis"
implementation "software.amazon.kinesis:amazon-kinesis-client:2.4.1"
implementation "com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3"
:
}
DynamoDBStreamsAdapter.java
public class DynamoDBStreamsAdapter {
DynamoDBStreamsConsumer consumer;
public static void main(String[] args) {
new DynamoDBStreamsAdapter(new DynamoDBStreamsConsumerImpl()).run();
}
private void run() {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
consumer.run();
}
public DynamoDBStreamsAdapter(DynamoDBStreamsConsumerImpl dynamoDBStreamsConsumer) {
this.consumer = dynamoDBStreamsConsumer;
}
private void shutdown() {
consumer.shutdown();
}
}
DynamoDBStreamsConsumer.java
public interface DynamoDBStreamsConsumer {
void run();
void shutdown();
}
DynamoDBStreamsConsumerImpl.java
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DynamoDBStreamsConsumerImpl implements DynamoDBStreamsConsumer {
private Worker worker;
private static final String DYNAMO_DB_TABLE_NAME = "sample-table";
private static final Regions AWS_REGION = Regions.AP_NORTHEAST_1;
private static final String DYNAMODB_ENDPOINT = "http://localhost:24566";
private static final String DYNAMODB_STREAMS_ENDPOINT = "http://localhost:24566";
private static final String CLOUDWATCH_ENDPOINT = "http://localhost:24566";
private static final AWSCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(...);
@Override
public void run() {
AmazonDynamoDB dynamoDBClient =
AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
DYNAMODB_ENDPOINT, AWS_REGION.getName()))
.withCredentials(credentialsProvider).build();
AmazonCloudWatch cloudWatchClient =
AmazonCloudWatchClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
CLOUDWATCH_ENDPOINT, AWS_REGION.getName()))
.withCredentials(credentialsProvider).build();
AmazonDynamoDBStreamsAdapterClient adapterClient =
new AmazonDynamoDBStreamsAdapterClient(
AmazonDynamoDBStreamsClientBuilder.standard()
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
DYNAMODB_STREAMS_ENDPOINT, AWS_REGION.getName()))
.withCredentials(credentialsProvider).build());
IRecordProcessorFactory recordProcessorFactory =
new DynamoDBStreamsRecordProcessorFactory();
DescribeTableResult describeTableResult = dynamoDBClient.describeTable(DYNAMO_DB_TABLE_NAME);
KinesisClientLibConfiguration workerConfig =
createKinesisClientLibConfiguration(
describeTableResult.getTable().getLatestStreamArn());
worker =
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(workerConfig)
.kinesisClient(adapterClient)
.dynamoDBClient(dynamoDBClient)
.cloudWatchClient(cloudWatchClient)
.build();
worker.run();
}
@Override
public void shutdown() {
if (worker == null) {
return;
}
Future<Boolean> gracefulShutdownFuture = worker.startGracefulShutdown();
try {
gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public static KinesisClientLibConfiguration createKinesisClientLibConfiguration(
String streamArn) {
return new KinesisClientLibConfiguration(
"sampleApp",
streamArn,
credentialsProvider,
"workerId")
.withMaxRecords(100) // 1回に取得する最大レコード数
.withIdleTimeBetweenReadsInMillis(500) // レコードを取得する間隔(ミリ秒)
.withCallProcessRecordsEvenForEmptyRecordList(true)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
}
}
withCallProcessRecordsEvenForEmptyRecordList(true)とすることで、対象レコードがない場合でも DynamoDBStreamsRecordProcessor.processRecordsを実行します。
レコードがない時に実行する必要がない場合は指定する必要はありません。
DynamoDBStreamsRecordProcessorFactory.java
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
public class DynamoDBStreamsRecordProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new DynamoDBStreamsRecordProcessor();
}
}
DynamoDBStreamsRecordProcessor.java
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
public class DynamoDBStreamsRecordProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) {
System.out.println("initialize");
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
for (Record r : processRecordsInput.getRecords()) {
if (r instanceof RecordAdapter recordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record dynamoDBStreamsRecord =
recordAdapter.getInternalObject();
System.out.println(dynamoDBStreamsRecord.getDynamodb()); // レコード単位の処理を実行する
}
}
try {
processRecordsInput.getCheckpointer().checkpoint();
} catch (InvalidStateException | ShutdownException e) {
e.printStackTrace();
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
DynamoDBStreamsRecordProcessor.javaのfor文の中で1レコードずつ処理する形になっています。
DynamoDBにputItem等でレコードを追加することで、以下のレコードが取得できます。(putItemの場合)
{ApproximateCreationDateTime: Fri Sep 09 00:00:00 JST 2022,Keys: {key={N: 123456,}},NewImage: {...,key={N: 123456,}},SequenceNumber: 111111111111,SizeBytes: 1111,StreamViewType: NEW_IMAGE}
上記の場合、以下の方法で 123456
を取得できます。
dynamoRecord.getDynamodb().getNewImage().get("key").getN();