0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

manabianAdvent Calendar 2024

Day 10

Databricks にて Confluent から JSON 形式の Topic のデータ取得を手動でスキーマを指定して実施する方法

Last updated at Posted at 2024-12-25

概要

Databricks にて Confluent から JSON 形式の Topic のデータ取得を手動でスキーマを指定して実施する方法を紹介します。

本記事は、以下のシリーズの一部です。

image.png

引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita

事前準備

1. Confluent にて Topic を作成

{
  "properties": {
    "gender": {
      "connect.index": 3,
      "type": "string"
    },
    "regionid": {
      "connect.index": 2,
      "type": "string"
    },
    "registertime": {
      "connect.index": 0,
      "connect.type": "int64",
      "type": "integer"
    },
    "userid": {
      "connect.index": 1,
      "type": "string"
    }
  },
  "title": "ksql.users",
  "type": "object"
}

image.png

2. Confluent にて API キーを主塔

image.png

データを取得する手順

1. Databricks 上で API キーを変数にセット

# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"

image.png

2. Topic 名とスキーマを変数にセット

from pyspark.sql.types import StructType, StructField, StringType, LongType

topic_name = "json_user"
schema = StructType(
    [
        StructField("registertime", LongType(), True),
        StructField("userid", StringType(), True),
        StructField("regionid", StringType(), True),
        StructField("gender", StringType(), True),
    ]
)

image.png

3. Condluent からデータを取得

from pyspark.sql.functions import col, expr, from_json

# Confluent から読み込み
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("kafka.security.protocol", "SASL_SSL")
    .option(
        "kafka.sasl.jaas.config",
        f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
    )
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("subscribe", topic_name)
    .option("startingOffsets", "earliest")
    .load()
)

# Key を Deserialize
key_des_df = df.withColumn(
    "key",
    col("Key").cast("string"),
)

# Value から Magic Byte と Schema ID を除去
no_magic_df = key_des_df.withColumn(
    "value",
    expr("substring(value, 6, length(value) - 5)"),
)

# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = no_magic_df.withColumn(
    "decoded_value",
    from_json(
        col("value").cast("string"),
        schema,
    ),
)

# Deserialize した Value のカラムを展開
tgt_cols = [
    "key",
    "decoded_value.*",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType",
]
src_df = decoded_df.select(tgt_cols)

image.png

4. データを表示

src_df.display()

image.png

事後処理

Databricks ノートブックの実行を停止

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?