2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon MSKのKafkaクラスターにJavaプログラムでProduce/Consume(String形式&Avro形式)

Last updated at Posted at 2024-09-24

Amazon MSKのKafkaクラスターにJavaプログラムでProduce/Consume

はじめに

Amazon Managed Streaming for Apache Kafka(以下、Amazon MSK)のKafkaクラスターに対してProduce/Consumeするプログラムの実装方法を確認した記録を紹介します。
ここでは、シンプルな文字列形式のメッセージだけでなく、Avro形式のメッセージのProduce/Consume方法も確認しています。
Avroスキーマを保存するスキーマレジストリはAWS Glue Schema Registryを採用し、プログラムから使用するシリアライザー/デシリアライザーもAWS Glue Schema Registry専用のものを使用しています。

 参考記事:Amazon MSKでオンプレから接続可能なKafkaクラスターの構築方法(できるだけ普通のKafkaっぽく使えるように)

前提

  • Apache Kafkaのバージョン:3.5.1
  • Zookeeper構成(KRaft構成ではない)
  • トライした時期:2024/9/13-17
  • AWS環境は個人利用のもの(自費)
  • Amazon MSKクラスターはパブリックアクセス可能(構築方法は後述の「実施した概要」①参照)

実施した概要

image.png

シンプルな文字列形式のメッセージのProduce/Consume

Javaのプログラムでシンプルな文字列形式をKafkaとやりとりする場合、Kafka ClientにあるProducer APIおよびConsumer APIを使用することで実現できます。
以下は実装内容のポイントと実行結果です。

実装内容のポイント
pom.xml

Amazon MSK側のKafkaクラスターと同じバージョンのKafka Clientを利用できるようにします。

pom.xml
	<dependencies>
            :
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>3.5.1</version>
		</dependency>
            :
	</dependencies>
import

Produce/Consumeを実装するクラスでは下記のものあたりをimportすれば良いです。

import部分(抜粋)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Producer/ConsumerのConfig

Producer/Consumer実装の基本として、以下の設定を記述する必要があります。

  • Producerとしての設定(Producer Config)
  • Consumerとしての設定(Consumer Config)
  • Kafka Brokerにアクセスするための認証系の設定(Producer/Consumerのどちらも必要)

以下は、上記の設定を記述した実装例です。

Producer Configが設定されたProducerオブジェクトを作成するメソッド
    private Producer<String, String> createKafkaProducer() {

        Properties props = new Properties();
        Date now = new Date();

        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-test-" + now.getTime());
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        configureSecurityIfPresent(props);

        Producer<String, String> producer = new KafkaProducer<>(props);
        return producer;
    }
Consumer Configが設定されたConsumerオブジェクトを作成するメソッド
    private KafkaConsumer<String, String> createKafkaConsumer() {

    	Properties props = new Properties();
        Date now = new Date();
        
        props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-test-" + now.getTime());
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        configureSecurityIfPresent(props);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        return consumer;
    }
Kafka Brokerにアクセスするための認証系の設定のためのメソッド
    // Kafka Brokerのアクセスコントロール設定によって設定するプロパティの種類は異なります。
    // これはSASL/SCRAM認証のみが設定されたAmazon MSKクラスターへのアクセスのための実装です。
    // SCRAMの部分のmy-user、my-password の部分は環境によって異なります。
    private void configureSecurityIfPresent(Properties props) {

        props.putIfAbsent("security.protocol", "SASL_SSL");
        props.putIfAbsent("sasl.mechanism", "SCRAM-SHA-512");
        props.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"my-user\" password=\"my-password\";");

    }
Producer

Kafka Broker内に既にあるmy-test-topicというトピックに5つのメッセージを書き込みます。

Producer実装例(抜粋)
        final String topicName = "my-test-topic";
    
        // 前述のProducer作成メソッドを実行
        Producer<String, String> producer = createKafkaProducer();

        int producedMessages = 0;
        try {
            for (int idx = 0; idx < 5; idx++) {
                String message = "Hello (" + producedMessages++ + ")!";
                ProducerRecord<String, String> producedRecord = new ProducerRecord<>(topicName, message);
                producer.send(producedRecord);
                Thread.sleep(100);
            }
        } finally {
            producer.flush();
            producer.close();
        }
Consumer

Kafka Broker内に既にあるmy-test-topicというトピックからメッセージを読み込みます。

Consumer実装例(抜粋)
        final String topicName = "my-test-topic";
    
        // 前述のConsumer作成メソッドを実行
        KafkaConsumer<String, String> consumer = createKafkaConsumer();

        consumer.subscribe(Collections.singletonList(topicName));
        try {
            int messageCount = 0;
            while (messageCount < 50) {
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
                messageCount += records.count();
                if (records.count() == 0) {
                    logger.info("No messages waiting...");
                } else records.forEach(record -> {
                    logger.info("Key: " + record.key() + "  Value: " + record.value());
                });
            }
        } finally {
            consumer.close();
        }
実行結果
実行ログ
実行ログ(抜粋)
2024-09-16T22:13:42.783+09:00  INFO 81544 --- [mykafka] [  restartedMain] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-Consumer-test-1726492416835-1, groupId=Consumer-test-1726492416835] Resetting offset for partition my-test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 (id: 1 rack: apne1-az1)], epoch=0}}.
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: sasasasasasa
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: kikikikikiki
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: yssk111111YSSK
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: Hello (0)!
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: Hello (1)!
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: Hello (2)!
2024-09-16T22:13:42.855+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: Hello (3)!
2024-09-16T22:13:42.856+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: null  Value: Hello (4)!
2024-09-16T22:13:52.857+09:00  INFO 81544 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : No messages waiting...

Hello(0)!からHello(4)!が実装したProducerによって書き込まれたメッセージです。
(sasasasasasa、kikikikikiki、yssk111111YSSKはすでに書き込まれていたメッセージです)

kafka-cosole-consumserでも確認
kafka-console-consumer.shを使用したメッセージのConsume
./kafka-console-consumer.sh \
  --bootstrap-server b-2-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 \
  --topic my-test-topic \
  --consumer.config /mskconfig-ssk.properties \
  --from-beginning

image.png

AWS Glue Schema Registryの設定

Avro形式のメッセージをKafka ClientとKafka Brokerの間でやりとりする場合は、メッセージ(実体はバイナリ)とAvroスキーマをKafka Clientが取り扱える必要があります。
メッセージの書き手であるProducerとメッセージの読み手であるConsumerが同じAvroスキーマをもとにメッセージを解釈する必要があるからです。
Avroスキーマはプログラム内で直書きすることもできますが、スキーマレジストリを使用するのが実用的です。
スキーマレジストリにも色々ありますが、今回はAmazon MSKでKakfaを構成していることもあり、AWS Glue Schema Registryを使用します。
以下は、AWS Glue Schema Registryの設定をしたときの記録です。

参考:AWS Glue Schema Registry

Registryの作成
  • AWSマネジメントコンソールより、AWS GlueサービスのナビゲーターにあるStream Schema Registriesを選択し、ここでAdd Registryを選択する
    image.png
  • Add a new schema registryの画面で名称を指定する
    image.png
  • 作成されたことを確認
    image.png
    image.png
IAMロール作成

AWS Glue Schema RegistryにアクセスできるIAMロールを作成します。
試行のため、あまり難しいことは考えずにポリシー設定をしています。

  • IAMサービスの画面でロールの作成を選択
    image.png
  • 今回は信頼されたエンティティタイプにはAWSアカウントを選択
    image.png
  • IAMロールの名称を指定
    image.png
  • 許可を追加(ここでは便宜的にAWSGlueSchemaRegistryFullAccessを指定)
    image.png
  • AWS Glue Schema Registryへのアクセス用のIAMロールが作成できたことを確認
    image.png

ここで作成したIAMロールのARNは後で必要になります。

作成したIAMロールのARN(アカウント番号はマスク)
arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole

Avro形式のメッセージのProduce/Consume

実装内容のポイント
pom.xml

Amazon MSK側のKafkaクラスターと同じバージョンのKafka Clientを利用できるようにします。
Avroメッセージを取り扱えるようにするライブラリと、AWS Glue Schema Registry用のserde(シリアライザーとデシリアライザー)ライブラリも利用できるようにする必要があります。

pom.xml
	<dependencies>
            :
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>3.5.1</version>
		</dependency>
            :
		<dependency>
			<groupId>org.apache.avro</groupId>
			<artifactId>avro</artifactId>
			<version>1.11.1</version>
		</dependency>
            :
		<dependency>
			<groupId>software.amazon.glue</groupId>
			<artifactId>schema-registry-serde</artifactId>
			<version>1.1.20</version>
		</dependency>
            :
	</dependencies>
import

シンプルな文字列をProduce/Consumeするためにimportするものに加えて、Avroメッセージの取り扱いとAWS Glue Schema Registry用のものをimportします。
STSを使用して先の手順で作成したAWS Glue Schema Registry用のIAMロールをAssumeできるようにAWSSDKのモジュールもimportします。

import部分(抜粋)
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistryKafkaSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import software.amazon.awssdk.services.glue.model.DataFormat;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.services.sts.model.Credentials;
import software.amazon.awssdk.services.sts.model.StsException;
Producer/ConsumerのConfig(Avro、AWS Glue Schema Registry用)

Producer/Consumerの設定を記述する必要があるのはシンプルな文字列形式でトライしたときと同様です。
Avro、AWS Glue Schema Registry用に必要な設定を記述する必要があります。

Avro、AWS Glue Schema Registry用のProducer Configが設定されたProducerオブジェクトを作成するメソッド
    private Producer<String, GenericRecord> createKafkaProducer() {

        Properties props = new Properties();
        Date now = new Date();

        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-avrotest-" + now.getTime());
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
        props.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-northeast-1");
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
        props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "my-schema-greeting");
        props.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, "true");

        configureSecurityIfPresent(props);

        Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
        return producer;
    }
Avro、AWS Glue Schema Registry用のConsumer Configが設定されたConsumerオブジェクトを作成するメソッド
    private KafkaConsumer<String, GenericRecord> createKafkaConsumer() {

    	Properties props = new Properties();
        Date now = new Date();
        
        props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-avrotest-" + now.getTime());
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION, "ap-northeast-1");
        props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "my-registry");
        props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

        configureSecurityIfPresent(props);

        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);

        return consumer;
    }
Kafka Brokerにアクセスするための認証系の設定のためのメソッド
    // Kafka Brokerのアクセスコントロール設定によって設定するプロパティの種類は異なります。
    // これはSASL/SCRAM認証のみが設定されたAmazon MSKクラスターへのアクセスのための実装です。
    // SCRAMの部分のmy-user、my-password の部分は環境によって異なります。
    private void configureSecurityIfPresent(Properties props) {

        props.putIfAbsent("security.protocol", "SASL_SSL");
        props.putIfAbsent("sasl.mechanism", "SCRAM-SHA-512");
        props.putIfAbsent("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"my-user\" password=\"my-password\";");

    }
AWS Glue Schema RegistryにアクセスするためのAssumeRole実装

前述の手順で作成したAWS Glue Schema Registryのアクセス権限をもつIAMロールをAssumeRoleする実装です。
(他にもやり方ありそうですが例として)

    private void assumeGlueSchemaRegistryRole() {
        try {

            System.setProperty("aws.accessKeyId", "XXXXXXXXXXXXXXXXXXXX");
            System.setProperty("aws.secretAccessKey", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");

            Region region = Region.of("ap-northeast-1");
            StsClient stsClient = StsClient.builder().region(region).build();
            AssumeRoleRequest roleRequest = AssumeRoleRequest.builder()
                    .roleArn("arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole")
                    .roleSessionName("msk-session")
                    .build();
            AssumeRoleResponse roleResponse = stsClient.assumeRole(roleRequest);
            Credentials myCreds = roleResponse.credentials();
            System.setProperty("aws.accessKeyId", myCreds.accessKeyId());
            System.setProperty("aws.secretAccessKey", myCreds.secretAccessKey());
            System.setProperty("aws.sessionToken", myCreds.sessionToken());
            stsClient.close();
        } catch (StsException e) {
            logger.error(e.getMessage());
            System.exit(1);
        }
    }
Producer

Kafka Broker内に既にあるmy-test-avro-topicというトピックに5つのメッセージを書き込みます。
AWS Glue Schema Registryに書き込むスキーマはソースコード内に直書きしています。

Producer実装例(抜粋)
        final String topicName = "my-test-avro-topic";

        // AWS Glue Schema RegistryにアクセスするためのAssumeRole用メソッドを実行
        assumeGlueSchemaRegistryRole();

        // 前述のProducer作成メソッドを実行
        Producer<String, GenericRecord> producer = createKafkaProducer();

        // 手書きのスキーマ
        final String SCHEMA = "{\"type\":\"record\",\"name\":\"Greeting\",\"fields\":[{\"name\":\"Message\",\"type\":\"string\"},{\"name\":\"Time\",\"type\":\"long\"}]}";
    
        int producedMessages = 0;
        try {
            Schema schema = new Schema.Parser().parse(SCHEMA);
            for (int idx = 0; idx < 5; idx++) {
                GenericRecord record = new GenericData.Record(schema);
                Date now = new Date();
                String message = "Hello (" + producedMessages++ + ")!";
                record.put("Message", message);
                record.put("Time", now.getTime());

                ProducerRecord<String, GenericRecord> producedRecord = new ProducerRecord<>(topicName, Integer.toString(idx), record);
                producer.send(producedRecord);
                Thread.sleep(100);
            }
        } finally {
            producer.flush();
            producer.close();
        }
Consumer

Kafka Broker内に既にあるmy-test-avro-topicというトピックからメッセージを読み込みます。

Consumer実装例(抜粋)
        final String topicName = "my-test-avro-topic";
    
        // 前述のConsumer作成メソッドを実行
        KafkaConsumer<String, GenericRecord> consumer = createKafkaConsumer();

        consumer.subscribe(Collections.singletonList(topicName));

        try {
            int messageCount = 0;
            while (messageCount < 50) {
                final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofSeconds(10));
                messageCount += records.count();
                if (records.count() == 0) {
                    logger.info("No messages waiting...");
                } else records.forEach(record -> {
                    GenericRecord value = record.value();
                    logger.info("Key: " + record.key() + "  Value: " + value.toString());
                });
            }
        } finally {
            consumer.close();
        }
実行結果
実行ログ
実行ログ(抜粋)
2024-09-16T23:33:51.533+09:00  INFO 95839 --- [mykafka] [  restartedMain] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-Consumer-avrotest-1726497199895-1, groupId=Consumer-avrotest-1726497199895] Resetting offset for partition my-test-avro-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-1-public.mymskcluster.zxrapj.c4.kafka.ap-northeast-1.amazonaws.com:9196 (id: 1 rack: apne1-az1)], epoch=0}}.
2024-09-16T23:33:52.306+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: 0  Value: {"Message": "Hello (0)!", "Time": 1726497178429}
2024-09-16T23:33:52.306+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: 1  Value: {"Message": "Hello (1)!", "Time": 1726497180238}
2024-09-16T23:33:52.307+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: 2  Value: {"Message": "Hello (2)!", "Time": 1726497180343}
2024-09-16T23:33:52.308+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: 3  Value: {"Message": "Hello (3)!", "Time": 1726497180449}
2024-09-16T23:33:52.308+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : Key: 4  Value: {"Message": "Hello (4)!", "Time": 1726497180551}
2024-09-16T23:34:02.311+09:00  INFO 95839 --- [mykafka] [  restartedMain] c.m.mykafka.MykafkaApplicationRunner     : No messages waiting...

今回はメッセージのキーをシンプルな文字列形式、バリューの部分をAvro形式にしています。
上記の実行ログの抜粋より、バリュー部分がデシリアライズされて出力されていることが確認できました。

AWS Glue Schema Registryへの登録結果

プログラム実行より、AWS Glue Schema Registryが以下のように更新されました。
image.png
image.png

以下に、AWS CLIでスキーマを確認した際のコマンドと結果を示します。

# Javaプログラムと同様にAWS Glue Schema Registryのアクセス権を持つIAMロールの認証情報を取得
aws sts assume-role --role-arn arn:aws:iam::XXXXXXXXXXXX:role/MyGlueSchemaRegistryAccessRole --role-session-name test-session --profile default --region ap-northeast-1 --output json

# 取得したIAMロールの認証情報をAWSの認証情報系の環境変数にセット
export AWS_ACCESS_KEY_ID="・・・"
export AWS_SECRET_ACCESS_KEY="・・・"
export AWS_SESSION_TOKEN="・・・"

# ① スキーマの情報を表示
aws glue get-schema --schema-id SchemaArn=arn:aws:glue:ap-northeast-1:XXXXXXXXXXXX:schema/my-registry/my-schema-greeting

# ② スキーマの内容を表示(最新バージョンを指定)
aws glue get-schema-version --schema-id SchemaArn=arn:aws:glue:ap-northeast-1:XXXXXXXXXXXX:schema/my-registry/my-schema-greeting --schema-version-number LatestVersion=true

①の結果
image.png
②の結果
image.png

おわりに

ここでは、シンプルな文字列形式のメッセージ、Avro形式のメッセージのProduce/Consumeを行う実装例を紹介しました。
データベースにあるデータ更新の都度、その変更内容をKafkaに伝えることができればイベントドリブンなユースケースを実装したり、データの利用側で高鮮度データを分析対象にしたりすることができるようになります。
次はIBM製品であるInfosphere Replication CDCを使用して、データベースのデータ更新の都度、Amazon MSKに変更差分を連携する方法を確認したいと思います。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?