0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks にて json 文字列をパラメータで渡すことによるノートブック実行処理の共通化

Last updated at Posted at 2024-06-09

概要

Databricks にて json 形式の文字列をパラメータで渡すことによるノートブック実行処理の共通化する方法を共有します。下記のような json をノートブックに渡すことで、src_dirdbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation)にあるファイルをcheckpoint_dirdbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1)をチェックポイントとしてtgt_tbl_namehive_metastore.auto_loader_test.tbl_1)で指定したテーブルに Databricks Auto Loader により書き込む方法を紹介します。manabian (著者)が「json を渡して Databricks の処理を共通化すればいいんだよ」とお伝えしたときには、本記事の方法を指しています。

{
    "src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
    "checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1",
    "tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_1"
}

json 形式の文字列を渡して実行するベースノートブックでは次の処理を実行しています。

  1. Databricks ウィジェットによるパラメータの受け取り
  2. json 形式の文字列の内容を変数として定義
  3. Databricks Auto Loader の実行
  4. VACUUM や ANALYZE TABLE などの事後処理
  5. リターン値を返す終了処理

手順

事前準備

スキーマとテーブルを作成

%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;
CREATE SCHEMA IF NOT EXISTS auto_loader_test;
CREATE
OR REPLACE TABLE auto_loader_test.tbl_1 (
  n_nationkey  string
  ,n_name       string
  ,n_regionkey  string 
  ,n_comment    string,
  ingest_date date,
  ingest_timestamp TIMESTAMP,
  _rescued_data string,
  _metadata_file_path string
);

CREATE
OR REPLACE TABLE auto_loader_test.tbl_2 (
  n_nationkey  string
  ,n_name       string
  ,n_regionkey  string 
  ,n_comment    string,
  ingest_date date,
  ingest_timestamp TIMESTAMP,
  _rescued_data string,
  _metadata_file_path string
);

image.png

ソースファイルを配置

# ソースのディレクトリを初期化後にファイルを配置
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation"
dbutils.fs.rm(src_dir, True)
tgt_path = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/csv_file_01.csv"
nation_df = spark.table("samples.tpch.nation")
nation_df.write.format("csv").option("header", True).mode("overwrite").save(tgt_path)

# 配置したファイルを確認
src_df = spark.read.format("csv").option("header", True).load(tgt_path)
src_df.display()

image.png

1. ベースとなるノートブックを作成

db_auto_loader_by_parameters.pyというファイル名でコードをソースファイルとして import

# Databricks notebook source
import json

from pyspark.sql.functions import expr

# COMMAND ----------

# ウィジェットの初期化
dbutils.widgets.removeAll()

# COMMAND ----------

# パラメータの受け取り
debug_pipe_conf = """{
    "src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
    "checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1",
    "tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_1"
}
"""
pipe_conf_wiget_name = "pipe_conf"
dbutils.widgets.text(pipe_conf_wiget_name, debug_pipe_conf)
pipe_conf_wiget_value = dbutils.widgets.get(pipe_conf_wiget_name)
print(f"`pipe_conf_wiget_value`: {pipe_conf_wiget_value}")

# COMMAND ----------

# 変数定義
pipe_conf = json.loads(pipe_conf_wiget_value)

src_dir = pipe_conf["src_dir"]
checkpoint_dir = pipe_conf["checkpoint_dir"]
tgt_tbl_name = pipe_conf["tgt_tbl_name"]

# COMMAND ----------

# Databricks Auto Loader の実行
src_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", checkpoint_dir)
    .option("cloudFiles.inferColumnTypes", False)
    .option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
    .option("inferSchema", True)
    .load(src_dir)
)

add_cols = {
    "_metadata_file_path": expr("_metadata.file_path"),
    "ingest_date": expr("to_date(ingest_date)"),
    "ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}

src_df = src_df.withColumns(add_cols)

(
    src_df.writeStream
    .option("mergeSchema", True)
    .option("checkpointLocation", checkpoint_dir)
    .trigger(availableNow=True)
    .toTable(tgt_tbl_name)
)


# COMMAND ----------

# 事後処理
spark.sql(f"VACUUM {tgt_tbl_name}")
spark.sql(f"ANALYZE TABLE {tgt_tbl_name} COMPUTE STATISTICS FOR ALL COLUMNS")

# COMMAND ----------

# 終了処理
retun_value = json.dumps({})

dbutils.notebook.exit(retun_value)

image.png

ノートブックの実行確認

image.png

テーブルへの書き込み確認

# 書き込み先のテーブルを表示
spark.table("hive_metastore.auto_loader_test.tbl_1").display()

image.png

2. json 形式の文字列を渡して実行

ベースとなるノートブックと同じディレクトリのノートブックにてノートブック実行に関する変数を定義

# 実行するノートブックのパスを定義
nb_path = "./db_auto_loader_by_parameters"

# タイムアウト時間を定義
timeout = 0

pipe_conf = """{
    "src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
    "checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_2",
    "tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_2"
}
"""
nb_para = {
    "pipe_conf": pipe_conf,
}

image.png

dbutils.notebook.runによるノートブック実行

# ノートブックを実行
nb_results = dbutils.notebook.run(
    nb_path,
    timeout,
    nb_para,
)

print(nb_results)

image.png

image.png

テーブルへの書き込み確認

# 書き込み先のテーブルを表示
import json
spark.table(json.loads(pipe_conf)["tgt_tbl_name"]).display()

image.png

リソースの削除

%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?