概要
本記事では、Confluent Cloud 上で Avro形式 および JSON形式 のメッセージを Produce (送信) / Consume (受信) する方法を、サンプルコードを用いて解説します。
ここで取り上げるサンプルは Google Colab や Jupyter Notebook 環境での実行を想定しています。また、Kafka Brokerへのアクセスや、Schema Registryを介したシリアライズ / デシリアライズの方法も紹介します。
事前準備
1. Confluent Cloud 上での設定
1. avro-topic-01
と json-topic-01
という名前でトピックを作成
2. ConfluentのクラスタおよびSchema Registryの認証情報を取得
2. Google Colab 上で必要なPythonパッケージのインストール
!pip install confluent-kafka confluent-kafka[avro,json,protobuf] -q
3. 接続・認証情報の設定
以下はKafkaクラスタおよびSchema Registryへの接続情報を環境変数的に設定する例です。
# Kafka の接続情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "DXDQYDMIFC7M5GDL"
sasl_password = "SBvqZ69flYfrq/U47LJFcfdJfNni/KMxdAav4UE4a1n2wteAKd3v0mbf196zpGkM"
kafka_conf = {
'bootstrap.servers': bootstrap_servers,
'sasl.username': sasl_username,
'sasl.password': sasl_password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
}
# Schema Registry 接続情報を設定
sr_url = "https://psrc-8kz20.us-east-2.aws.confluent.cloud"
sr_api_key = "USUPVVZFUT6D3PIR"
sr_api_secret = "LcaEx0NKC76wI72P2IMJd3VAyH9S+9jiVbgjH8x0PXR9uESXA/B4GkI8FS6RmbjA"
sr_conf = {
'url': sr_url,
'basic.auth.user.info': f"{sr_api_key}:{sr_api_secret}"
}
実行例:Avro形式でのProduce / Consume
Avroスキーマとレコードの用意
まずはAvro形式でメッセージを送受信する例です。以下では、User
というAvroスキーマを定義し、サンプルのレコードを用意します。
# Topic 名を
avro_topic_name = "avro-topic-01"
# スキーマを定義
avro_schema_str = """
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
# サンプルレコード
record = {
"name": "alice",
"favorite_number": 42,
"favorite_color": "blue"
}
Avro形式でのメッセージ送信 (Produce)
AvroスキーマをSchema Registryに登録し、confluent_kafka
の AvroSerializer
を用いてシリアライズした上で、メッセージをKafkaトピックに送信します。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_serializer = AvroSerializer(
schema_registry_client,
avro_schema_str,
)
producer = Producer(kafka_conf)
serialized_value = avro_serializer(
record,
SerializationContext(avro_topic_name, MessageField.VALUE)
)
producer.produce(
topic=avro_topic_name,
key=str(record["name"]),
value=serialized_value,
)
producer.flush()
print("Avroメッセージ書き込み成功")
Avro形式でのメッセージ受信 (Consume)
続いて、Avro形式で送信したメッセージをConsumerで受信します。AvroDeserializer
を用いてデシリアライズします。
from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
consumer_conf = {
'group.id': 'avro_consumer_group_01',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
consumer = Consumer(consumer_conf)
consumer.subscribe([avro_topic_name])
avro_deserializer = AvroDeserializer(
schema_registry_client,
avro_schema_str,
)
try:
while True:
msg = consumer.poll(3.0)
if msg is None:
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
user = avro_deserializer(
msg.value(),
SerializationContext(msg.topic(), MessageField.VALUE)
)
if user:
print(f"Consumed Avro record: key={msg.key()}, value={user}")
finally:
consumer.close()
print("Avroメッセージ受信終了")
Consumed Avro record: key=b'alice', value={'name': 'alice', 'favorite_number': 42, 'favorite_color': 'blue'}
Avroメッセージ受信終了
実行例:JSON形式でのProduce / Consume
JSONスキーマとレコードの用意
次に、JSON形式でメッセージを送受信する例です。ここではJSON Schemaを使用します。
# Topic 名を
json_topic_name = "json-topic-01"
# スキーマを定義
json_schema_str = """
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "User",
"type": "object",
"properties": {
"name": {"type": "string"},
"favorite_number": {"type": "number"},
"favorite_color": {"type": "string"}
},
"required": [ "name", "favorite_number", "favorite_color" ]
}
"""
# サンプルレコード
record = {
"name": "alice",
"favorite_number": 42,
"favorite_color": "blue"
}
JSON形式でのメッセージ送信 (Produce)
JSONSerializer
を用いてJSONスキーマに基づくシリアライゼーションを行い、メッセージをトピックに送信します。
from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer
schema_registry_client = SchemaRegistryClient(sr_conf)
json_serializer = JSONSerializer(
json_schema_str,
schema_registry_client,
)
producer = Producer(kafka_conf)
serialized_value = json_serializer(
record,
SerializationContext(json_topic_name, MessageField.VALUE),
)
producer.produce(
topic=json_topic_name,
key=str(record["name"]),
value=serialized_value
)
producer.flush()
print("JSONメッセージ書き込み成功")
JSON形式でのメッセージ受信 (Consume)
JSONDeserializer
を用いてJSONスキーマでシリアライズされたメッセージを受信・デシリアライズします。
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONDeserializer
schema_registry_client = SchemaRegistryClient(sr_conf)
consumer_conf = {
'group.id': 'json_consumer_group_01',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer_conf.update(kafka_conf)
consumer = Consumer(consumer_conf)
consumer.subscribe([json_topic_name])
json_deserializer = JSONDeserializer(
json_schema_str,
schema_registry_client=schema_registry_client,
)
try:
while True:
msg = consumer.poll(3.0)
if msg is None:
break
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
user = json_deserializer(
msg.value(),
SerializationContext(msg.topic(), MessageField.VALUE)
)
if user:
print(f"Consumed JSON record: key={msg.key()}, value={user}")
finally:
consumer.close()
print("JSONメッセージ受信終了")
Consumed JSON record: key=b'alice', value={'name': 'alice', 'favorite_number': 42, 'favorite_color': 'blue'}
JSONメッセージ受信終了
事後処理
テストが完了したら、作成したトピックやリソースを Confluent Cloud 上で適宜クリーンアップしてください。
まとめ
本記事では、Confluent Cloud 上でAvroおよびJSON形式のメッセージをProduce / Consume する基本的な手順を紹介しました。
- Avro形式:
AvroSerializer
/AvroDeserializer
を使用してシリアライズ・デシリアライズ - JSON形式:
JSONSerializer
/JSONDeserializer
を使用し、JSON Schemaに準拠したメッセージを取り扱い
これらを活用することで、KafkaとSchema Registryを組み合わせた強力なデータストリーミング基盤を容易に構築できます。