0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Confluent と Databricks を連携したリアルタイム分析の実践

Posted at

概要

本記事では、DatabricksConfluent を活用して、エンドツーエンドのストリーミングパイプラインを構築する方法を紹介します。主な処理の流れは以下のとおりです。

  1. Confluent(Kafka)上でデータをリアルタイムに生成
  2. Databricksでストリーミング処理を行い、データを加工・分析
  3. 処理結果を再度Confluent(Kafka)のトピックに出力

image.png

処理の概要

image.png

上図は、Confluent(Kafka)とDatabricksを組み合わせてストリーミング処理を行う際のデータフローを示しています。以下のステップに分けて、もう少し詳しく解説します。

  1. Datagen Source Connectorによるデータ生成

    • ConfluentのDatagen Source Connectorを使用し、ピザの注文データ(Pizza orders)や注文完了データ(Pizza Orders completed)をリアルタイムに生成します。
    • これにより、Kafkaトピックに継続的にメッセージが投入される仕組みが整備されます。
  2. Kafkaトピック(Confluent)の取り込み

    • Pizza ordersPizza Orders completedのトピックを、Databricksのストリーミング処理で取り込みます。
  3. Databricks上でのストリーミング処理(1回目)

    • Databricks(Spark Structured Streaming など)を使い、Pizza ordersを処理・変換します。
    • 処理結果は別のストレージ(図中にあるPizza ordersPizza orders completedの出力先)へ書き込みます。
  4. Databricks上でのストリーミング処理(2回目)

    • 3で出力されたデータをさらに別のストリーミングジョブで読み込み、追加の分析やクレンジングを実施し、最終成果物となるPizza orders completed itemsを生成します。
  5. Kafkaトピック(Confluent)への書き戻し

    • 4の処理で得られたPizza orders completed itemsを、再度KafkaトピックとしてConfluent側へ出力します。
    • Confluent上でこのトピックをダッシュボードに反映したり、ほかのシステムと連携したりできます。

まとめると、Confluentで生成したリアルタイム注文データをDatabricksで処理し、再びConfluentへ出力する流れとなっています。

実施手順

1. Confluent 上での事前準備

1-1. Environment の作成

image.png

1-2. クラスターを作成

image.png

1-3. クラスターに対する API key を作成

image.png
image.png

1-4. Schema Registry に対する API Key を作成

image.png
image.png
image.png

2. Databricks 上での事前準備

2-1. Databricks 上でクラスターを作成

image.png

2-2. ノートブックを作成して名称を変更

image.png
image.png

2-3. Databricks のクラスターをノートブックにアタッチして動作を確認

print("Hello World")

image.png

3. Confluent 上で Topic の作成とデータの生成

3-1. pizza_orders(Avro形式)の Topic を作成

以下スキーマを使用:

{
  "name": "pizza_orders",
  "namespace": "pizza_orders",
  "type": "record",
  "fields": [
    {
      "name": "store_id",
      "type": "int"
    },
    {
      "name": "store_order_id",
      "type": "int"
    },
    {
      "name": "coupon_code",
      "type": "int"
    },
    {
      "name": "date",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Date",
        "connect.version": 1,
        "logicalType": "date",
        "type": "int"
      }
    },
    {
      "name": "status",
      "type": "string"
    },
    {
      "name": "order_lines",
      "type": {
        "items": {
          "connect.name": "pizza_orders.order_line",
          "fields": [
            {
              "name": "product_id",
              "type": "int"
            },
            {
              "name": "category",
              "type": "string"
            },
            {
              "name": "quantity",
              "type": "int"
            },
            {
              "name": "unit_price",
              "type": "double"
            },
            {
              "name": "net_price",
              "type": "double"
            }
          ],
          "name": "order_line",
          "type": "record"
        },
        "type": "array"
      }
    }
  ]
}

image.png
image.png

3-2. pizza_orders_completed(Avro形式)の Topic を作成

以下スキーマを使用:

{
  "name": "pizza_orders_completed",
  "namespace": "pizza_orders",
  "type": "record",
  "fields": [
    {
      "name": "store_id",
      "type": "int"
    },
    {
      "name": "store_order_id",
      "type": "int"
    },
    {
      "name": "date",
      "type": {
        "connect.name": "org.apache.kafka.connect.data.Date",
        "connect.version": 1,
        "logicalType": "date",
        "type": "int"
      }
    },
    {
      "name": "status",
      "type": "string"
    },
    {
      "name": "rack_time_secs",
      "type": "int"
    },
    {
      "name": "order_delivery_time_secs",
      "type": "int"
    }
  ]
}

image.png
image.png

3-3. pizza_orders_completed_items(Avro形式)の Topic を作成

以下スキーマを使用:

{
  "name": "pizza_orders_completed_items",
  "namespace": "pizza_orders_completed_items",
  "type": "record",
  "fields": [
    {
      "name": "store_id",
      "type": "int"
    },
    {
      "name": "date",
      "type": {
        "logicalType": "date",
        "type": "int"
      }
    },
    {
      "name": "product_id",
      "type": "int"
    },
    {
      "name": "category",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "unit_price",
      "type": "double"
    },
    {
      "name": "net_price",
      "type": "double"
    }
  ]
}

image.png
image.png
image.png

3-4. pizza_orders Topic に対して書きこむ Datagen Source Connector を作成

image.png

image.png

image.png

image.png

image.png

image.png

image.png

3-5. pizza_orders_completed Topic に対して書きこむ Datagen Source Connector を作成

image.png

image.png

image.png

3-7. 数分後、2つのコネクターを停止

image.png
image.png

4. Databricks 上でテーブルなどのデータベースオブジェクトを作成

4-1. カタログとスキーマを作成

catalog_name = "confluent_test"
schema_name = "test"

_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")

image.png

4-2. 処理状態を保持する Volume を作成

checkpoint_volume_name = "checkpoint_01"
checkpoint_volume_dir = f"/Volumes/{catalog_name}/{schema_name}/{checkpoint_volume_name}"

spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{checkpoint_volume_name}")
dbutils.fs.rm(checkpoint_volume_dir, True)

image.png

4-3. pizza_ordersテーブルを作成

pizza_orders_table_name = "pizza_orders"

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_table_name}
(
  key STRING,
  store_id INT,
  store_order_id INT,
  coupon_code INT,
  date DATE,
  status STRING,
  order_lines ARRAY < 
    STRUCT < 
        product_id: INT,
        category: STRING,
        quantity: INT,
        unit_price: DOUBLE,
        net_price: DOUBLE
    >
  >,
  topic STRING,
  partition INT,
  offset BIGINT,
  timestamp TIMESTAMP,
  timestampType INT
)
;
""")

image.png

4-4. pizza_orders_completedテーブルを作成

pizza_orders_completed_table_name = "pizza_orders_completed"

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_completed_table_name}
(
  key STRING,
  store_id INT,
  store_order_id INT,
  date DATE,
  status STRING,
  rack_time_secs INT,
  order_delivery_time_secs INT,
  topic STRING,
  partition INT,
  offset BIGINT,
  timestamp TIMESTAMP,
  timestampType INT
)
;
""")

image.png

4-5. pizza_orders_completed_itemsテーブルを作成

pizza_orders_completed_items_table_name = "pizza_orders_completed_items"

spark.sql(f"""
CREATE OR REPLACE TABLE {catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}
(
  key STRING,
  store_id INT,
  date DATE,
  product_id INT,
  category STRING,
  quantity INT,
  unit_price DOUBLE,
  net_price DOUBLE
)
;
""")

image.png


5. Databricks 上で処理を実行

5-1. Confluent クラスター/Scheme Registry への認証情報をセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921jm.us-east-2.aws.confluent.cloud:9092"
sasl_username = "JWRABQ5DWGFVK3ML"
sasl_password = "Pi9S0KWY02dxn8rWBjxfgcX/Yu//j49sKVy08NDmEvdLtK1JVVEYfWHVhytdC31F"

# Schema Registry の接続情報
sr_url = "https://psrc-l622j.us-east-2.aws.confluent.cloud"
sr_api_key = "T4P57YTN5E6CLMZH"
sr_api_secret = "XeDK4ishT0RaBUweEb7CW3hSotuWnp1yWZCcQRti6jmvvSJYh8rGcjC8Atw9vLvD"

schema_registry_options = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}

image.png

5-2. pizza_ordersトピックからpizza_ordersテーブルへ書き込み

pizza_orders_topic_name = "pizza_orders"
pizza_orders_subject_name = pizza_orders_topic_name + "-value"

pizza_orders_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_table_name

image.png

from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro

# Confluent から読み込み
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", pizza_orders_topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

# Key を Deserialize
key_des_df = df.withColumn(
    "key",
    col("Key").cast("string"),
)

# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
    "value",
    from_avro(
        col("value"),
        options=schema_registry_options,
        subject=pizza_orders_subject_name,
        schemaRegistryAddress=sr_url,
    ),
)

# Deserialize した Value のカラムを展開
tgt_cols = [
    "key",
    "value.*",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType",
]
src_df = decoded_df.select(tgt_cols)

image.png

(
    src_df.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", pizza_orders_checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}")
)

image.png

5-3. pizza_ordersテーブルへの書き込み確認

spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}").display()

image.png

5-4. pizza_orders_completedトピックからpizza_orders_completedテーブルへ書き込み

pizza_orders_completed_topic_name = "pizza_orders_completed"
pizza_orders_completed_subject_name = pizza_orders_completed_topic_name + "-value"

pizza_orders_completed_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_completed_topic_name

image.png

from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro

# Confluent から読み込み
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", pizza_orders_completed_topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

# Key を Deserialize
key_des_df = df.withColumn(
    "key",
    col("Key").cast("string"),
)

# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = key_des_df.withColumn(
    "value",
    from_avro(
        col("value"),
        options=schema_registry_options,
        subject=pizza_orders_completed_subject_name,
        schemaRegistryAddress=sr_url,
    ),
)

# Deserialize した Value のカラムを展開
tgt_cols = [
    "key",
    "value.*",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType",
]
src_df = decoded_df.select(tgt_cols)

image.png

(
    src_df.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", pizza_orders_completed_checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}")
)

image.png

5-5. pizza_orders_completedテーブルへの書き込み確認

spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}").display()

image.png

5-6. pizza_orders_completed_itemsテーブルへの書き込み

pizza_orders_completed_items_checkpoint_dir = checkpoint_volume_dir + "/" + pizza_orders_completed_items_table_name

image.png

# ピザ注文完了テーブル (pizza_orders_completed) を Streaming 読み込み
df_poc = (
    spark.readStream
         .table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_topic_name}")
         # 実際のイベント時刻となるカラム名を指定
         .withWatermark("timestamp", "3 minutes")
)

# ピザ注文テーブル (pizza_orders) を Streaming 読み込み
df_po = (
    spark.readStream
         .table(f"{catalog_name}.{schema_name}.{pizza_orders_table_name}")
         # 実際のイベント時刻となるカラム名を指定
         .withWatermark("timestamp", "3 minutes")
)

# store_order_id で内部結合
df_joined = df_poc.join(
    df_po,
    on=df_poc.store_order_id == df_po.store_order_id,
    how="inner"
)

# order_lines 列(Array[Struct])を行に展開
# explode は alias で struct の列を変数化できます (element.*)
from pyspark.sql.functions import explode

df_exploded = df_joined.select(
    df_poc["key"],
    df_poc["store_id"],
    df_po["date"],
    explode(df_po["order_lines"]).alias("element")
)

# element は Struct (product_id, category, quantity, unit_price, net_price)
# これらを単独の列にする
df_result = df_exploded.select(
    "key",
    "store_id",
    "date",
    "element.product_id",
    "element.category",
    "element.quantity",
    "element.unit_price",
    "element.net_price"
)

image.png

# Delta に append モードで書き込み (テーブルまたはパスを指定)
(
    df_result.writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation", pizza_orders_completed_items_checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}")
)

image.png

5-7. pizza_orders_completed_itemsテーブルへの書き込み確認

spark.table(f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}").display()

image.png

5-8. Databricks から pizza_orders_completed_itemsTopic への出力

schema_registry_options = {
    "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info": f"{sr_api_key}:{sr_api_secret}",
}

pizza_orders_completed_items_topic_name = "pizza_orders_completed_items"
pizza_orders_completed_items_value_subject_name = pizza_orders_completed_items_topic_name + "-value"

json_format_schema = """
{
  "fields": [
    {
      "name": "store_id",
      "type": "int"
    },
    {
      "name": "date",
      "type": {
        "logicalType": "date",
        "type": "int"
      }
    },
    {
      "name": "product_id",
      "type": "int"
    },
    {
      "name": "category",
      "type": "string"
    },
    {
      "name": "quantity",
      "type": "int"
    },
    {
      "name": "unit_price",
      "type": "double"
    },
    {
      "name": "net_price",
      "type": "double"
    }
  ],
  "name": "pizza_orders_completed_items",
  "namespace": "pizza_orders_completed_items",
  "type": "record"
}
"""

# 書き込み時に利用するチェックポイントのディレクトリを指定
pizza_orders_completed_items_kafka_checkpoint_dir = checkpoint_volume_dir + "/topic__" + pizza_orders_completed_items_topic_name

image.png

image.png

image.png

from pyspark.sql.functions import col, struct, lit
from pyspark.sql.avro.functions import to_avro

src_df = spark.readStream.table(
    f"{catalog_name}.{schema_name}.{pizza_orders_completed_items_table_name}"
)

src_df = src_df.withColumn("key", col("key"))
src_df = src_df.withColumn(
    "value",
    struct(
        "store_id",
        "date",
        "product_id",
        "category",
        "quantity",
        "unit_price",
        "net_price",
    ),
)
src_df = src_df.withColumn(
    "value",
    to_avro(
        data=col("value"),
        options=schema_registry_options,
        schemaRegistryAddress=sr_url,
        subject=lit(pizza_orders_completed_items_value_subject_name),
        jsonFormatSchema=json_format_schema,
    ),
)

# key 列と value 列のみを抽出
src_df = src_df.select("key", "value")

image.png

(
    src_df.writeStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("topic", pizza_orders_completed_items_topic_name)
    .option("checkpointLocation", pizza_orders_completed_items_kafka_checkpoint_dir)
    .trigger(availableNow=True)
    .start()
)

image.png

6. Confluent 上で実行確認とコネクターの再実行

6-1. pizza_orders_completed_itemsTopic にデータが書き込まれたことを確認

image.png

6-2. 2 つの Datagen Source Connector を再度稼働(Resume)

image.png
image.png

6-3. Databricks 上でデータ取り込み処理を再実行

image.png
image.png

6-4. Confluent 上でpizza_orders_completed_itemsTopic に追加データがあることを確認

image.png

7. 事後処理

7-1. Confluent の Environment を削除

image.png

7-2. Databricks で作成したカタログを削除

_ = spark.sql(f"DROP CATALOG IF EXISTS {catalog_name} CASCADE")

image.png

7-3. Databricks で作成したクラスターを削除

image.png

まとめ

今回紹介したように、Confluent(Kafka)Databricks を組み合わせることで、リアルタイムに生成されるストリーミングデータを柔軟かつ高パフォーマンスに取り扱うことができます。具体的には、以下のようなポイントが挙げられます。

  • Kafkaトピックへのリアルタイムデータ生成
    Confluent が提供する Datagen Source Connector を利用することで、Kafkaトピックへの継続的なメッセージ投入が非常に容易になります。テストデータやサンプルデータを生成しながら、すぐにストリーミングパイプラインを構築できます。

  • Databricks によるストリーミング処理
    Databricks 上の Spark Structured Streaming を活用することで、Kafka から取り込んだデータに対して変換や集計などの処理をリアルタイムに行えます。テーブルやビューを通じてデータを管理できるため、ジョブの切り替えやジョイン処理も簡単に実装できます。

  • Kafka トピックへの再書き込み
    Databricks で加工・分析したデータを再び Kafkaトピックとして Confluent 側へ送信することで、さまざまな用途に活用できます。データが必要なシステムやサービスとリアルタイムに連携するための土台が整います。

  • クラウド上での柔軟なスケーリング
    Databricks と Confluent のそれぞれがクラウドネイティブなサービスであるため、データ量の増加に応じてノードを柔軟にスケールアウトし、高可用性を維持しながら処理を継続できます。

以上の流れとポイントを踏まえると、エンドツーエンドのストリーミングパイプラインを比較的短時間で構築しやすくなり、リアルタイムなデータ収集から分析・可視化・システム連携に至るまでシームレスに管理できます。ピザ注文のように、リアルタイムに発生するイベント情報を扱うユースケースはもちろん、サーバーログや IoT センサーからのデータ、SNS の投稿、金融や小売業など幅広いシナリオに適用可能です。今回の手順をベースに、さまざまなリアルタイムアプリケーションの構築にぜひお役立てください。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?