概要
Databricks にて Confluent から JSON 形式の Topic のデータ取得を手動でスキーマを指定して実施する方法を紹介します。
本記事は、以下のシリーズの一部です。
引用元:Databricks と Confluent をつなぐ:データ連携の全体像を徹底解説 #Spark - Qiita
事前準備
1. Confluent にて Topic を作成
{
"properties": {
"gender": {
"connect.index": 3,
"type": "string"
},
"regionid": {
"connect.index": 2,
"type": "string"
},
"registertime": {
"connect.index": 0,
"connect.type": "int64",
"type": "integer"
},
"userid": {
"connect.index": 1,
"type": "string"
}
},
"title": "ksql.users",
"type": "object"
}
2. Confluent にて API キーを主塔
データを取得する手順
1. Databricks 上で API キーを変数にセット
# Confluent に対する認証情報をセット
bootstrap_servers = "pkc-921aa.us-east-2.aws.confluent.cloud:9092"
sasl_username = "U4BML4OPRCaaaa"
sasl_password = "zzEjUzyUwYYJneNE240L7jgPVrHAtRBANo6wnnzCgKlGIJ9OxJqaJZWRvaaaaa"
2. Topic 名とスキーマを変数にセット
from pyspark.sql.types import StructType, StructField, StringType, LongType
topic_name = "json_user"
schema = StructType(
[
StructField("registertime", LongType(), True),
StructField("userid", StringType(), True),
StructField("regionid", StringType(), True),
StructField("gender", StringType(), True),
]
)
3. Condluent からデータを取得
from pyspark.sql.functions import col, expr, from_json
# Confluent から読み込み
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", bootstrap_servers)
.option("kafka.security.protocol", "SASL_SSL")
.option(
"kafka.sasl.jaas.config",
f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{sasl_username}" password="{sasl_password}";',
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("subscribe", topic_name)
.option("startingOffsets", "earliest")
.load()
)
# Key を Deserialize
key_des_df = df.withColumn(
"key",
col("Key").cast("string"),
)
# Value から Magic Byte と Schema ID を除去
no_magic_df = key_des_df.withColumn(
"value",
expr("substring(value, 6, length(value) - 5)"),
)
# Magic Byte と Schema ID を取り除いた Value を Deserialize
decoded_df = no_magic_df.withColumn(
"decoded_value",
from_json(
col("value").cast("string"),
schema,
),
)
# Deserialize した Value のカラムを展開
tgt_cols = [
"key",
"decoded_value.*",
"topic",
"partition",
"offset",
"timestamp",
"timestampType",
]
src_df = decoded_df.select(tgt_cols)
4. データを表示
src_df.display()