概要
本記事では、Databricks と Confluent を活用して、エンドツーエンドのストリーミングパイプラインを構築する方法を紹介します。主な処理の流れは以下のとおりです。
- Confluent(Kafka)上でデータをリアルタイムに生成
- Databricksでストリーミング処理を行い、データを加工・分析
- 処理結果を再度Confluent(Kafka)のトピックに出力
処理の概要
上図は、Confluent(Kafka)とDatabricksを組み合わせてストリーミング処理を行う際のデータフローを示しています。以下のステップに分けて、もう少し詳しく解説します。
-
Datagen Source Connectorによるデータ生成
- ConfluentのDatagen Source Connectorを使用し、ピザの注文データ(
Pizza orders
)や注文完了データ(Pizza Orders completed
)をリアルタイムに生成します。 - これにより、Kafkaトピックに継続的にメッセージが投入される仕組みが整備されます。
- ConfluentのDatagen Source Connectorを使用し、ピザの注文データ(
-
Kafkaトピック(Confluent)の取り込み
-
Pizza orders
とPizza Orders completed
のトピックを、Databricksのストリーミング処理で取り込みます。
-
-
Databricks上でのストリーミング処理(1回目)
- Databricks(Spark Structured Streaming など)を使い、
Pizza orders
を処理・変換します。 - 処理結果は別のストレージ(図中にある
Pizza orders
やPizza orders completed
の出力先)へ書き込みます。
- Databricks(Spark Structured Streaming など)を使い、
-
Databricks上でのストリーミング処理(2回目)
- 3で出力されたデータをさらに別のストリーミングジョブで読み込み、追加の分析やクレンジングを実施し、最終成果物となる
Pizza orders completed items
を生成します。
- 3で出力されたデータをさらに別のストリーミングジョブで読み込み、追加の分析やクレンジングを実施し、最終成果物となる
-
Kafkaトピック(Confluent)への書き戻し
- 4の処理で得られた
Pizza orders completed items
を、再度KafkaトピックとしてConfluent側へ出力します。 - Confluent上でこのトピックをダッシュボードに反映したり、ほかのシステムと連携したりできます。
- 4の処理で得られた
まとめると、Confluentで生成したリアルタイム注文データをDatabricksで処理し、再びConfluentへ出力する流れとなっています。
実施手順
1. Confluent 上での事前準備
1-1. Environment の作成
1-2. クラスターを作成
1-3. クラスターに対する API key を作成
1-4. Schema Registry に対する API Key を作成
2. Databricks 上での事前準備
2-1. Databricks 上でクラスターを作成
2-2. ノートブックを作成して名称を変更
2-3. Databricks のクラスターをノートブックにアタッチして動作を確認
print("Hello World")
3. Confluent 上で Topic の作成とデータの生成
3-1. pizza_orders
(Avro形式)の Topic を作成
以下スキーマを使用:
{
"name": "pizza_orders",
"namespace": "pizza_orders",
"type": "record",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "store_order_id",
"type": "int"
},
{
"name": "coupon_code",
"type": "int"
},
{
"name": "date",
"type": {
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
},
{
"name": "status",
"type": "string"
},
{
"name": "order_lines",
"type": {
"items": {
"connect.name": "pizza_orders.order_line",
"fields": [
{
"name": "product_id",
"type": "int"
},
{
"name": "category",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "unit_price",
"type": "double"
},
{
"name": "net_price",
"type": "double"
}
],
"name": "order_line",
"type": "record"
},
"type": "array"
}
}
]
}
3-2. pizza_orders_completed
(Avro形式)の Topic を作成
以下スキーマを使用:
{
"name": "pizza_orders_completed",
"namespace": "pizza_orders",
"type": "record",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "store_order_id",
"type": "int"
},
{
"name": "date",
"type": {
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
},
{
"name": "status",
"type": "string"
},
{
"name": "rack_time_secs",
"type": "int"
},
{
"name": "order_delivery_time_secs",
"type": "int"
}
]
}
3-3. pizza_orders_completed_items
(Avro形式)の Topic を作成
以下スキーマを使用:
{
"name": "pizza_orders_completed_items",
"namespace": "pizza_orders_completed_items",
"type": "record",
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "date",
"type": {
"logicalType": "date",
"type": "int"
}
},
{
"name": "product_id",
"type": "int"
},
{
"name": "category",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "unit_price",
"type": "double"
},
{
"name": "net_price",
"type": "double"
}
]
}
3-4. pizza_orders
Topic に対して書きこむ Datagen Source Connector を作成
3-5. pizza_orders_completed
Topic に対して書きこむ Datagen Source Connector を作成
3-7. 数分後、2つのコネクターを停止
4. Databricks 上でテーブルなどのデータベースオブジェクトを作成
4-1. カタログとスキーマを作成
catalog_name = "confluent_test"
schema_name = "test"
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
4-2. 処理状態を保持する Volume を作成
checkpoint_volume_name = "checkpoint_01"
checkpoint_volume_dir = f"/Volumes/{catalog_name}/{schema_name}/{checkpoint_volume_name}"
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)
4-3. pizza_orders
テーブルを作成
pizza_orders_table_name = "pizza_orders"
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_table_name}
(
key STRING,
store_id INT,
store_order_id INT,
coupon_code INT,
date DATE,
status STRING,
order_lines ARRAY <
STRUCT <
product_id: INT,
category: STRING,
quantity: INT,
unit_price: DOUBLE,
net_price: DOUBLE
>
>,
topic STRING,
partition INT,
offset BIGINT,
timestamp TIMESTAMP,
timestampType INT
)
;
""")
4-4. pizza_orders_completed
テーブルを作成
pizza_orders_completed_table_name = "pizza_orders_completed"
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_completed_table_name}
(
key STRING,
store_id INT,
store_order_id INT,
date DATE,
status STRING,
rack_time_secs INT,
order_delivery_time_secs INT,
topic STRING,
partition INT,
offset BIGINT,
timestamp TIMESTAMP,
timestampType INT
)
;
""")
4-5. pizza_orders_completed_items
テーブルを作成
pizza_orders_completed_items_table_name = "pizza_orders_completed_items"
spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}
(
key STRING,
store_id INT,
date DATE,
product_id INT,
category STRING,
quantity INT,
unit_price DOUBLE,
net_price DOUBLE
)
;
""")
5. Databricks 上で処理を実行
5-1. Confluent クラスター/Scheme Registry への認証情報をセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "JWRABQ5DWGFVK3ML"
sasl_password = "Pi9S0KWY02dxn8rWBjxfgcX/Yu//j49sKVy08NDmEvdLtK1JVVEYfWHVhytdC31F"
# Schema Registry の接続情報
sr_url = "https://psrc-l622j.us-east-2.aws.confluent.cloud"
sr_api_key = "T4P57YTN5E6CLMZH"
sr_api_secret = "XeDK4ishT0RaBUweEb7CW3hSotuWnp1yWZCcQRti6jmvvSJYh8rGcjC8Atw9vLvD"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}
5-2. pizza_orders
トピックからpizza_orders
テーブルへ書き込み
pizza_orders_topic_name = "pizza_orders"
pizza_orders_subject_name = pizza_orders_topic_name + "-value"
pizza_orders_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_table_name
from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro
# Confluent から読み込み
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", pizza_orders_topic_name)
.option("startingOffsets", "earliest")
.load()
)
# Key を Deserialize
key_des_df = df.withColumn(
"key",
col("Key").cast("string"),
)
# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
"value",
from_avro(
col("value"),
options=schema_registry_options,
subject=pizza_orders_subject_name,
schemaRegistryAddress=sr_url,
),
)
# Deserialize した Value のカラムを展開
tgt_cols = [
"key",
"value.*",
"topic",
"partition",
"offset",
"timestamp",
"timestampType",
]
src_df = decoded_df.select(tgt_cols)
(
src_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", pizza_orders_checkpoint_dir)
.trigger(availableNow=True)
.toTable(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}")
)
5-3. pizza_orders
テーブルへの書き込み確認
spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}").display()
5-4. pizza_orders_completed
トピックからpizza_orders_completed
テーブルへ書き込み
pizza_orders_completed_topic_name = "pizza_orders_completed"
pizza_orders_completed_subject_name = pizza_orders_completed_topic_name + "-value"
pizza_orders_completed_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_completed_topic_name
from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro
# Confluent から読み込み
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", pizza_orders_completed_topic_name)
.option("startingOffsets", "earliest")
.load()
)
# Key を Deserialize
key_des_df = df.withColumn(
"key",
col("Key").cast("string"),
)
# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
"value",
from_avro(
col("value"),
options=schema_registry_options,
subject=pizza_orders_completed_subject_name,
schemaRegistryAddress=sr_url,
),
)
# Deserialize した Value のカラムを展開
tgt_cols = [
"key",
"value.*",
"topic",
"partition",
"offset",
"timestamp",
"timestampType",
]
src_df = decoded_df.select(tgt_cols)
(
src_df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", pizza_orders_completed_checkpoint_dir)
.trigger(availableNow=True)
.toTable(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}")
)
5-5. pizza_orders_completed
テーブルへの書き込み確認
spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}").display()
5-6. pizza_orders_completed_items
テーブルへの書き込み
pizza_orders_completed_items_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_completed_items_table_name
# ピザ注文完了テーブル (pizza_orders_completed) を Streaming 読み込み
df_poc = (
spark.readStream
.table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}")
# 実際のイベント時刻となるカラム名を指定
.withWatermark("timestamp", "3 minutes")
)
# ピザ注文テーブル (pizza_orders) を Streaming 読み込み
df_po = (
spark.readStream
.table(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}")
# 実際のイベント時刻となるカラム名を指定
.withWatermark("timestamp", "3 minutes")
)
# store_order_id で内部結合
df_joined = df_poc.join(
df_po,
on=df_poc.store_order_id == df_po.store_order_id,
how="inner"
)
# order_lines 列(Array[Struct])を行に展開
# explode は alias で struct の列を変数化できます (element.*)
from pyspark.sql.functions import explode
df_exploded = df_joined.select(
df_poc["key"],
df_poc["store_id"],
df_po["date"],
explode(df_po["order_lines"]).alias("element")
)
# element は Struct (product_id, category, quantity, unit_price, net_price)
# これらを単独の列にする
df_result = df_exploded.select(
"key",
"store_id",
"date",
"element.product_id",
"element.category",
"element.quantity",
"element.unit_price",
"element.net_price"
)
# Delta に append モードで書き込み (テーブルまたはパスを指定)
(
df_result.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", pizza_orders_completed_items_checkpoint_dir)
.trigger(availableNow=True)
.toTable(f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}")
)
5-7. pizza_orders_completed_items
テーブルへの書き込み確認
spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}").display()
5-8. Databricks から pizza_orders_completed_items
Topic への出力
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
"confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}
pizza_orders_completed_items_topic_name = "pizza_orders_completed_items"
pizza_orders_completed_items_value_subject_name = pizza_orders_completed_items_topic_name + "-value"
json_format_schema = """
{
"fields": [
{
"name": "store_id",
"type": "int"
},
{
"name": "date",
"type": {
"logicalType": "date",
"type": "int"
}
},
{
"name": "product_id",
"type": "int"
},
{
"name": "category",
"type": "string"
},
{
"name": "quantity",
"type": "int"
},
{
"name": "unit_price",
"type": "double"
},
{
"name": "net_price",
"type": "double"
}
],
"name": "pizza_orders_completed_items",
"namespace": "pizza_orders_completed_items",
"type": "record"
}
"""
# 書き込み時に利用するチェックポイントのディレクトリを指定
pizza_orders_completed_items_kafka_checkpoint_dir = checkpoint_volume_dir + "/topic__" + pizza_orders_completed_items_topic_name
from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro
src_df = spark.readStream.table(
f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}"
)
src_df = src_df.withColumn("key", col("key"))
src_df = src_df.withColumn(
"value",
struct(
"store_id",
"date",
"product_id",
"category",
"quantity",
"unit_price",
"net_price",
),
)
src_df = src_df.withColumn(
"value",
to_avro(
data=col("value"),
options=schema_registry_options,
schemaRegistryAddress=sr_url,
subject=lit(pizza_orders_completed_items_value_subject_name),
jsonFormatSchema=json_format_schema,
),
)
# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")
(
src_df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("topic", pizza_orders_completed_items_topic_name)
.option("checkpointLocation", pizza_orders_completed_items_kafka_checkpoint_dir)
.trigger(availableNow=True)
.start()
)
6. Confluent 上で実行確認とコネクターの再実行
6-1. pizza_orders_completed_items
Topic にデータが書き込まれたことを確認
6-2. 2 つの Datagen Source Connector を再度稼働(Resume)
6-3. Databricks 上でデータ取り込み処理を再実行
6-4. Confluent 上でpizza_orders_completed_items
Topic に追加データがあることを確認
7. 事後処理
7-1. Confluent の Environment を削除
7-2. Databricks で作成したカタログを削除
_ = spark.sql(f"DROP CATALOG IF EXISTS {catalog_name} CASCADE")
7-3. Databricks で作成したクラスターを削除
まとめ
今回紹介したように、Confluent(Kafka) と Databricks を組み合わせることで、リアルタイムに生成されるストリーミングデータを柔軟かつ高パフォーマンスに取り扱うことができます。具体的には、以下のようなポイントが挙げられます。
-
Kafkaトピックへのリアルタイムデータ生成
Confluent が提供する Datagen Source Connector を利用することで、Kafkaトピックへの継続的なメッセージ投入が非常に容易になります。テストデータやサンプルデータを生成しながら、すぐにストリーミングパイプラインを構築できます。 -
Databricks によるストリーミング処理
Databricks 上の Spark Structured Streaming を活用することで、Kafka から取り込んだデータに対して変換や集計などの処理をリアルタイムに行えます。テーブルやビューを通じてデータを管理できるため、ジョブの切り替えやジョイン処理も簡単に実装できます。 -
Kafka トピックへの再書き込み
Databricks で加工・分析したデータを再び Kafkaトピックとして Confluent 側へ送信することで、さまざまな用途に活用できます。データが必要なシステムやサービスとリアルタイムに連携するための土台が整います。 -
クラウド上での柔軟なスケーリング
Databricks と Confluent のそれぞれがクラウドネイティブなサービスであるため、データ量の増加に応じてノードを柔軟にスケールアウトし、高可用性を維持しながら処理を継続できます。
以上の流れとポイントを踏まえると、エンドツーエンドのストリーミングパイプラインを比較的短時間で構築しやすくなり、リアルタイムなデータ収集から分析・可視化・システム連携に至るまでシームレスに管理できます。ピザ注文のように、リアルタイムに発生するイベント情報を扱うユースケースはもちろん、サーバーログや IoT センサーからのデータ、SNS の投稿、金融や小売業など幅広いシナリオに適用可能です。今回の手順をベースに、さまざまなリアルタイムアプリケーションの構築にぜひお役立てください。