Amazon MSKのKafkaクラスターにJavaプログラムでProduce/Consume
はじめに
Amazon Managed Streaming for Apache Kafka(以下、Amazon MSK)のKafkaクラスターに対してProduce/Consumeするプログラムの実装方法を確認した記録を紹介します。
ここでは、シンプルな文字列形式のメッセージだけでなく、Avro形式のメッセージのProduce/Consume方法も確認しています。
Avroスキーマを保存するスキーマレジストリはAWS Glue Schema Registryを採用し、プログラムから使用するシリアライザー/デシリアライザーもAWS Glue Schema Registry専用のものを使用しています。
参考記事:Amazon MSKでオンプレから接続可能なKafkaクラスターの構築方法(できるだけ普通のKafkaっぽく使えるように)
前提
- Apache Kafkaのバージョン:3.5.1
- Zookeeper構成(KRaft構成ではない)
- トライした時期:2024/9/13-17
- AWS環境は個人利用のもの(自費)
- Amazon MSKクラスターはパブリックアクセス可能(構築方法は後述の「実施した概要」①参照)
実施した概要
- ① Amazon MSKのクラスターを構築する(→Amazon MSKでオンプレから接続可能なKafkaクラスターの構築方法(できるだけ普通のKafkaっぽく使えるように))
- ② AWS外からJavaプログラムを使用して以下を実施(本稿で紹介)
- シンプルな文字列形式のメッセージのProduce/Consume
- AWS Glue Schema Registryの設定
- Avro形式のメッセージのProduce/Consume
- ③ AWS外からIBM Infosphere Replication CDCを使用して以下を実施(別途紹介)
- JSON形式でキャプチャー対象テーブルのデータをAmazon MSKに送信
- Avro形式でキャプチャー対象テーブルのデータをAmazon MSKに送信
シンプルな文字列形式のメッセージのProduce/Consume
Javaのプログラムでシンプルな文字列形式をKafkaとやりとりする場合、Kafka ClientにあるProducer APIおよびConsumer APIを使用することで実現できます。
以下は実装内容のポイントと実行結果です。
実装内容のポイント
pom.xml
Amazon MSK側のKafkaクラスターと同じバージョンのKafka Clientを利用できるようにします。
<dependencies>
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
:
</dependencies>
import
Produce/Consumeを実装するクラスでは下記のものあたりをimportすれば良いです。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Producer/ConsumerのConfig
Producer/Consumer実装の基本として、以下の設定を記述する必要があります。
- Producerとしての設定(Producer Config)
- Consumerとしての設定(Consumer Config)
- Kafka Brokerにアクセスするための認証系の設定(Producer/Consumerのどちらも必要)
以下は、上記の設定を記述した実装例です。
private Producer<String, String> createKafkaProducer() {
Properties props = new Properties();
Date now = new Date();
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-test-" + now.getTime());
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configureSecurityIfPresent(props);
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
private KafkaConsumer<String, String> createKafkaConsumer() {
Properties props = new Properties();
Date now = new Date();
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-test-" + now.getTime());
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configureSecurityIfPresent(props);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
// Kafka Brokerのアクセスコントロール設定によって設定するプロパティの種類は異なります。
// これはSASL/SCRAM認証のみが設定されたAmazon MSKクラスターへのアクセスのための実装です。
// SCRAMの部分のmy-user、my-password の部分は環境によって異なります。
private void configureSecurityIfPresent(Properties props) {
props.putIfAbsent("security.protocol", "SASL_SSL");
props.putIfAbsent("sasl.mechanism", "SCRAM-SHA-512");
props.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"my-user\" password=\"my-password\";");
}
Producer
Kafka Broker内に既にあるmy-test-topicというトピックに5つのメッセージを書き込みます。
final String topicName = "my-test-topic";
// 前述のProducer作成メソッドを実行
Producer<String, String> producer = createKafkaProducer();
int producedMessages = 0;
try {
for (int idx = 0; idx < 5; idx++) {
String message = "Hello (" + producedMessages++ + ")!";
ProducerRecord<String, String> producedRecord = new ProducerRecord<>(topicName, message);
producer.send(producedRecord);
Thread.sleep(100);
}
} finally {
producer.flush();
producer.close();
}
Consumer
Kafka Broker内に既にあるmy-test-topicというトピックからメッセージを読み込みます。
final String topicName = "my-test-topic";
// 前述のConsumer作成メソッドを実行
KafkaConsumer<String, String> consumer = createKafkaConsumer();
consumer.subscribe(Collections.singletonList(topicName));
try {
int messageCount = 0;
while (messageCount < 50) {
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
messageCount += records.count();
if (records.count() == 0) {
logger.info("No messages waiting...");
} else records.forEach(record -> {
logger.info("Key: " + record.key() + " Value: " + record.value());
});
}
} finally {
consumer.close();
}
実行結果
実行ログ
2024-09-16T22:13:42.783+09:00 INFO 81544 --- [mykafka] [ restartedMain] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-Consumer-test-1726492416835-1, groupId=Consumer-test-1726492416835] Resetting offset for partition my-test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 (id: 1 rack: apne1-az1)], epoch=0}}.
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: sasasasasasa
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: kikikikikiki
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: yssk111111YSSK
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: Hello (0)!
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: Hello (1)!
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: Hello (2)!
2024-09-16T22:13:42.855+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: Hello (3)!
2024-09-16T22:13:42.856+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: null Value: Hello (4)!
2024-09-16T22:13:52.857+09:00 INFO 81544 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : No messages waiting...
Hello(0)!からHello(4)!が実装したProducerによって書き込まれたメッセージです。
(sasasasasasa、kikikikikiki、yssk111111YSSKはすでに書き込まれていたメッセージです)
kafka-cosole-consumserでも確認
./kafka-console-consumer.sh \
--bootstrap-server b-2-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 \
--topic my-test-topic \
--consumer.config /mskconfig-ssk.properties \
--from-beginning
AWS Glue Schema Registryの設定
Avro形式のメッセージをKafka ClientとKafka Brokerの間でやりとりする場合は、メッセージ(実体はバイナリ)とAvroスキーマをKafka Clientが取り扱える必要があります。
メッセージの書き手であるProducerとメッセージの読み手であるConsumerが同じAvroスキーマをもとにメッセージを解釈する必要があるからです。
Avroスキーマはプログラム内で直書きすることもできますが、スキーマレジストリを使用するのが実用的です。
スキーマレジストリにも色々ありますが、今回はAmazon MSKでKakfaを構成していることもあり、AWS Glue Schema Registryを使用します。
以下は、AWS Glue Schema Registryの設定をしたときの記録です。
Registryの作成
- AWSマネジメントコンソールより、AWS GlueサービスのナビゲーターにあるStream Schema Registriesを選択し、ここでAdd Registryを選択する
- Add a new schema registryの画面で名称を指定する
- 作成されたことを確認
IAMロール作成
AWS Glue Schema RegistryにアクセスできるIAMロールを作成します。
試行のため、あまり難しいことは考えずにポリシー設定をしています。
- IAMサービスの画面でロールの作成を選択
- 今回は信頼されたエンティティタイプにはAWSアカウントを選択
- IAMロールの名称を指定
- 許可を追加(ここでは便宜的にAWSGlueSchemaRegistryFullAccessを指定)
- AWS Glue Schema Registryへのアクセス用のIAMロールが作成できたことを確認
ここで作成したIAMロールのARNは後で必要になります。
arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole
Avro形式のメッセージのProduce/Consume
実装内容のポイント
pom.xml
Amazon MSK側のKafkaクラスターと同じバージョンのKafka Clientを利用できるようにします。
Avroメッセージを取り扱えるようにするライブラリと、AWS Glue Schema Registry用のserde(シリアライザーとデシリアライザー)ライブラリも利用できるようにする必要があります。
<dependencies>
:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.1</version>
</dependency>
:
<dependency>
<groupId>software.amazon.glue</groupId>
<artifactId>schema-registry-serde</artifactId>
<version>1.1.20</version>
</dependency>
:
</dependencies>
import
シンプルな文字列をProduce/Consumeするためにimportするものに加えて、Avroメッセージの取り扱いとAWS Glue Schema Registry用のものをimportします。
STSを使用して先の手順で作成したAWS Glue Schema Registry用のIAMロールをAssumeできるようにAWSSDKのモジュールもimportします。
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;
import software.amazon.awssdk.services.sts.model.StsException;
Producer/ConsumerのConfig(Avro、AWS Glue Schema Registry用)
Producer/Consumerの設定を記述する必要があるのはシンプルな文字列形式でトライしたときと同様です。
Avro、AWS Glue Schema Registry用に必要な設定を記述する必要があります。
private Producer<String, GenericRecord> createKafkaProducer() {
Properties props = new Properties();
Date now = new Date();
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-avrotest-" + now.getTime());
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
props.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-northeast-1");
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema-greeting");
props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");
configureSecurityIfPresent(props);
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
return producer;
}
private KafkaConsumer<String, GenericRecord> createKafkaConsumer() {
Properties props = new Properties();
Date now = new Date();
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-avrotest-" + now.getTime());
props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
props.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-northeast-1");
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
configureSecurityIfPresent(props);
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
return consumer;
}
// Kafka Brokerのアクセスコントロール設定によって設定するプロパティの種類は異なります。
// これはSASL/SCRAM認証のみが設定されたAmazon MSKクラスターへのアクセスのための実装です。
// SCRAMの部分のmy-user、my-password の部分は環境によって異なります。
private void configureSecurityIfPresent(Properties props) {
props.putIfAbsent("security.protocol", "SASL_SSL");
props.putIfAbsent("sasl.mechanism", "SCRAM-SHA-512");
props.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"my-user\" password=\"my-password\";");
}
AWS Glue Schema RegistryにアクセスするためのAssumeRole実装
前述の手順で作成したAWS Glue Schema Registryのアクセス権限をもつIAMロールをAssumeRoleする実装です。
(他にもやり方ありそうですが例として)
private void assumeGlueSchemaRegistryRole() {
try {
System.setProperty("aws.accessKeyId", "XXXXXXXXXXXXXXXXXXXX");
System.setProperty("aws.secretAccessKey", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
Region region = Region.of("ap-northeast-1");
StsClient stsClient = StsClient.builder().region(region).build();
AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
.roleArn("arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole")
.roleSessionName("msk-session")
.build();
AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
Credentials myCreds = roleResponse.credentials();
System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
System.setProperty("aws.sessionToken", myCreds.sessionToken());
stsClient.close();
} catch (StsException e) {
logger.error(e.getMessage());
System.exit(1);
}
}
Producer
Kafka Broker内に既にあるmy-test-avro-topicというトピックに5つのメッセージを書き込みます。
AWS Glue Schema Registryに書き込むスキーマはソースコード内に直書きしています。
final String topicName = "my-test-avro-topic";
// AWS Glue Schema RegistryにアクセスするためのAssumeRole用メソッドを実行
assumeGlueSchemaRegistryRole();
// 前述のProducer作成メソッドを実行
Producer<String, GenericRecord> producer = createKafkaProducer();
// 手書きのスキーマ
final String SCHEMA = "{\"type\":\"record\",\"name\":\"Greeting\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}";
int producedMessages = 0;
try {
Schema schema = new Schema.Parser().parse(SCHEMA);
for (int idx = 0; idx < 5; idx++) {
GenericRecord record = new GenericData.Record(schema);
Date now = new Date();
String message = "Hello (" + producedMessages++ + ")!";
record.put("Message", message);
record.put("Time", now.getTime());
ProducerRecord<String, GenericRecord> producedRecord = new ProducerRecord<>(topicName, Integer.toString(idx), record);
producer.send(producedRecord);
Thread.sleep(100);
}
} finally {
producer.flush();
producer.close();
}
Consumer
Kafka Broker内に既にあるmy-test-avro-topicというトピックからメッセージを読み込みます。
final String topicName = "my-test-avro-topic";
// 前述のConsumer作成メソッドを実行
KafkaConsumer<String, GenericRecord> consumer = createKafkaConsumer();
consumer.subscribe(Collections.singletonList(topicName));
try {
int messageCount = 0;
while (messageCount < 50) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(10));
messageCount += records.count();
if (records.count() == 0) {
logger.info("No messages waiting...");
} else records.forEach(record -> {
GenericRecord value = record.value();
logger.info("Key: " + record.key() + " Value: " + value.toString());
});
}
} finally {
consumer.close();
}
実行結果
実行ログ
2024-09-16T23:33:51.533+09:00 INFO 95839 --- [mykafka] [ restartedMain] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-Consumer-avrotest-1726497199895-1, groupId=Consumer-avrotest-1726497199895] Resetting offset for partition my-test-avro-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 (id: 1 rack: apne1-az1)], epoch=0}}.
2024-09-16T23:33:52.306+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: 0 Value: {"Message": "Hello (0)!", "Time": 1726497178429}
2024-09-16T23:33:52.306+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: 1 Value: {"Message": "Hello (1)!", "Time": 1726497180238}
2024-09-16T23:33:52.307+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: 2 Value: {"Message": "Hello (2)!", "Time": 1726497180343}
2024-09-16T23:33:52.308+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: 3 Value: {"Message": "Hello (3)!", "Time": 1726497180449}
2024-09-16T23:33:52.308+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : Key: 4 Value: {"Message": "Hello (4)!", "Time": 1726497180551}
2024-09-16T23:34:02.311+09:00 INFO 95839 --- [mykafka] [ restartedMain] c.m.mykafka.MykafkaApplicationRunner : No messages waiting...
今回はメッセージのキーをシンプルな文字列形式、バリューの部分をAvro形式にしています。
上記の実行ログの抜粋より、バリュー部分がデシリアライズされて出力されていることが確認できました。
AWS Glue Schema Registryへの登録結果
プログラム実行より、AWS Glue Schema Registryが以下のように更新されました。
以下に、AWS CLIでスキーマを確認した際のコマンドと結果を示します。
# Javaプログラムと同様にAWS Glue Schema Registryのアクセス権を持つIAMロールの認証情報を取得
aws sts assume-role --role-arn arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole --role-session-name test-session --profile default --region ap-northeast-1 --output json
# 取得したIAMロールの認証情報をAWSの認証情報系の環境変数にセット
export AWS_ACCESS_KEY_ID="・・・"
export AWS_SECRET_ACCESS_KEY="・・・"
export AWS_SESSION_TOKEN="・・・"
# ① スキーマの情報を表示
aws glue get-schema --schema-id SchemaArn=arn:aws:glue:ap-northeast-1:XXXXXXXXXXXX:schema/my-registry/my-schema-greeting
# ② スキーマの内容を表示(最新バージョンを指定)
aws glue get-schema-version --schema-id SchemaArn=arn:aws:glue:ap-northeast-1:XXXXXXXXXXXX:schema/my-registry/my-schema-greeting --schema-version-number LatestVersion=true
おわりに
ここでは、シンプルな文字列形式のメッセージ、Avro形式のメッセージのProduce/Consumeを行う実装例を紹介しました。
データベースにあるデータ更新の都度、その変更内容をKafkaに伝えることができればイベントドリブンなユースケースを実装したり、データの利用側で高鮮度データを分析対象にしたりすることができるようになります。
次はIBM製品であるInfosphere Replication CDCを使用して、データベースのデータ更新の都度、Amazon MSKに変更差分を連携する方法を確認したいと思います。
- ① Amazon MSKのクラスターを構築する(→Amazon MSKでオンプレから接続可能なKafkaクラスターの構築方法(できるだけ普通のKafkaっぽく使えるように))
- ② AWS外からJavaプログラムを使用して以下を実施(本稿で紹介)
- シンプルな文字列形式のメッセージのProduce/Consume
- AWS Glue Schema Registryの設定
- Avro形式のメッセージのProduce/Consume
- ③ AWS外からIBM Infosphere Replication CDCを使用して以下を実施(別途紹介)
- JSON形式でキャプチャー対象テーブルのデータをAmazon MSKに送信
- Avro形式でキャプチャー対象テーブルのデータをAmazon MSKに送信