0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 15

Confluent CloudにてAvro/JSON形式メッセージのProduce & Consume実践

Posted at

概要

本記事では、Confluent Cloud 上で Avro形式 および JSON形式 のメッセージを Produce (送信) / Consume (受信) する方法を、サンプルコードを用いて解説します。
ここで取り上げるサンプルは Google ColabJupyter Notebook 環境での実行を想定しています。また、Kafka Brokerへのアクセスや、Schema Registryを介したシリアライズ / デシリアライズの方法も紹介します。

事前準備

1. Confluent Cloud 上での設定

1. avro-topic-01json-topic-01 という名前でトピックを作成

image.png

image.png

2. ConfluentのクラスタおよびSchema Registryの認証情報を取得

image.png

image.png

2. Google Colab 上で必要なPythonパッケージのインストール

!pip install confluent-kafka confluent-kafka[avro,json,protobuf] -q

image.png

3. 接続・認証情報の設定

以下はKafkaクラスタおよびSchema Registryへの接続情報を環境変数的に設定する例です。

# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "DXDQYDMIFC7M5GDL"
sasl_password = "SBvqZ69flYfrq/U47LJFcfdJfNni/KMxdAav4UE4a1n2wteAKd3v0mbf196zpGkM"
kafka_conf = {
    'bootstrap.servers': bootstrap_servers,
    'sasl.username':     sasl_username,
    'sasl.password':     sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
}

# Schema Registry 接続情報を設定
sr_url = "https://psrc-8kz20.us-east-2.aws.confluent.cloud"
sr_api_key = "USUPVVZFUT6D3PIR"
sr_api_secret = "LcaEx0NKC76wI72P2IMJd3VAyH9S+9jiVbgjH8x0PXR9uESXA/B4GkI8FS6RmbjA"
sr_conf = {
    'url': sr_url,
    'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}"
}

image.png

実行例:Avro形式でのProduce / Consume

Avroスキーマとレコードの用意

まずはAvro形式でメッセージを送受信する例です。以下では、UserというAvroスキーマを定義し、サンプルのレコードを用意します。

# Topic 名を
avro_topic_name = "avro-topic-01"

# スキーマを定義
avro_schema_str = """
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": "int"},
    {"name": "favorite_color", "type": "string"}
  ]
}
"""

# サンプルレコード
record = {
    "name": "alice",
    "favorite_number": 42,
    "favorite_color": "blue"
}

image.png

Avro形式でのメッセージ送信 (Produce)

AvroスキーマをSchema Registryに登録し、confluent_kafkaAvroSerializer を用いてシリアライズした上で、メッセージをKafkaトピックに送信します。

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
    schema_registry_client,
    avro_schema_str,
)
producer = Producer(kafka_conf)
serialized_value = avro_serializer(
    record,
    SerializationContext(avro_topic_name, MessageField.VALUE)
)
producer.produce(
    topic=avro_topic_name,
    key=str(record["name"]),
    value=serialized_value,
)
producer.flush()
print("Avroメッセージ書き込み成功")

image.png

Avro形式でのメッセージ受信 (Consume)

続いて、Avro形式で送信したメッセージをConsumerで受信します。AvroDeserializer を用いてデシリアライズします。

from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer

consumer_conf = {
    'group.id': 'avro_consumer_group_01',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)

consumer = Consumer(consumer_conf)
consumer.subscribe([avro_topic_name])

avro_deserializer = AvroDeserializer(
    schema_registry_client,
    avro_schema_str,
)

try:
    while True:
        msg = consumer.poll(3.0)
        if msg is None:
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        user = avro_deserializer(
            msg.value(),
            SerializationContext(msg.topic(), MessageField.VALUE)
        )
        if user:
            print(f"Consumed Avro record: key={msg.key()}, value={user}")
finally:
    consumer.close()
    print("Avroメッセージ受信終了")
Consumed Avro record: key=b'alice', value={'name': 'alice', 'favorite_number': 42, 'favorite_color': 'blue'}
Avroメッセージ受信終了

image.png


実行例:JSON形式でのProduce / Consume

JSONスキーマとレコードの用意

次に、JSON形式でメッセージを送受信する例です。ここではJSON Schemaを使用します。

# Topic 名を
json_topic_name = "json-topic-01"

# スキーマを定義
json_schema_str = """
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "User",
  "type": "object",
  "properties": {
    "name": {"type": "string"},
    "favorite_number": {"type": "number"},
    "favorite_color": {"type": "string"}
  },
  "required": [ "name", "favorite_number", "favorite_color" ]
}
"""

# サンプルレコード
record = {
    "name": "alice",
    "favorite_number": 42,
    "favorite_color": "blue"
}

image.png

JSON形式でのメッセージ送信 (Produce)

JSONSerializer を用いてJSONスキーマに基づくシリアライゼーションを行い、メッセージをトピックに送信します。

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer

schema_registry_client = SchemaRegistryClient(sr_conf)

json_serializer = JSONSerializer(
    json_schema_str,
    schema_registry_client,
)

producer = Producer(kafka_conf)
serialized_value = json_serializer(
    record,
    SerializationContext(json_topic_name, MessageField.VALUE),
)

producer.produce(
    topic=json_topic_name,
    key=str(record["name"]),
    value=serialized_value
)
producer.flush()
print("JSONメッセージ書き込み成功")

image.png

JSON形式でのメッセージ受信 (Consume)

JSONDeserializer を用いてJSONスキーマでシリアライズされたメッセージを受信・デシリアライズします。

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer

schema_registry_client = SchemaRegistryClient(sr_conf)

consumer_conf = {
    'group.id': 'json_consumer_group_01',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
consumer = Consumer(consumer_conf)
consumer.subscribe([json_topic_name])

json_deserializer = JSONDeserializer(
    json_schema_str,
    schema_registry_client=schema_registry_client,
)

try:
    while True:
        msg = consumer.poll(3.0)
        if msg is None:
            break
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        user = json_deserializer(
            msg.value(),
            SerializationContext(msg.topic(), MessageField.VALUE)
        )
        if user:
            print(f"Consumed JSON record: key={msg.key()}, value={user}")
finally:
    consumer.close()
    print("JSONメッセージ受信終了")
Consumed JSON record: key=b'alice', value={'name': 'alice', 'favorite_number': 42, 'favorite_color': 'blue'}
JSONメッセージ受信終了

image.png

事後処理

テストが完了したら、作成したトピックやリソースを Confluent Cloud 上で適宜クリーンアップしてください。

image.png


まとめ

本記事では、Confluent Cloud 上でAvroおよびJSON形式のメッセージをProduce / Consume する基本的な手順を紹介しました。

  • Avro形式:AvroSerializer / AvroDeserializer を使用してシリアライズ・デシリアライズ
  • JSON形式:JSONSerializer / JSONDeserializer を使用し、JSON Schemaに準拠したメッセージを取り扱い

これらを活用することで、KafkaとSchema Registryを組み合わせた強力なデータストリーミング基盤を容易に構築できます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?