概要
Databricks にて json 形式の文字列をパラメータで渡すことによるノートブック実行処理の共通化する方法を共有します。下記のような json をノートブックに渡すことで、src_dir
(dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation
)にあるファイルをcheckpoint_dir
(dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1
)をチェックポイントとしてtgt_tbl_name
(hive_metastore.auto_loader_test.tbl_1
)で指定したテーブルに Databricks Auto Loader により書き込む方法を紹介します。manabian (著者)が「json を渡して Databricks の処理を共通化すればいいんだよ」とお伝えしたときには、本記事の方法を指しています。
{
"src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
"checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1",
"tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_1"
}
json 形式の文字列を渡して実行するベースノートブックでは次の処理を実行しています。
- Databricks ウィジェットによるパラメータの受け取り
- json 形式の文字列の内容を変数として定義
- Databricks Auto Loader の実行
- VACUUM や ANALYZE TABLE などの事後処理
- リターン値を返す終了処理
手順
事前準備
スキーマとテーブルを作成
%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;
CREATE SCHEMA IF NOT EXISTS auto_loader_test;
CREATE
OR REPLACE TABLE auto_loader_test.tbl_1 (
n_nationkey string
,n_name string
,n_regionkey string
,n_comment string,
ingest_date date,
ingest_timestamp TIMESTAMP,
_rescued_data string,
_metadata_file_path string
);
CREATE
OR REPLACE TABLE auto_loader_test.tbl_2 (
n_nationkey string
,n_name string
,n_regionkey string
,n_comment string,
ingest_date date,
ingest_timestamp TIMESTAMP,
_rescued_data string,
_metadata_file_path string
);
ソースファイルを配置
# ソースのディレクトリを初期化後にファイルを配置
src_dir = "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation"
dbutils.fs.rm(src_dir, True)
tgt_path = f"{src_dir}/ingest_date=2020-01-01/ingest_timestamp=2020-01-01 12%3A34%3A56/csv_file_01.csv"
nation_df = spark.table("samples.tpch.nation")
nation_df.write.format("csv").option("header", True).mode("overwrite").save(tgt_path)
# 配置したファイルを確認
src_df = spark.read.format("csv").option("header", True).load(tgt_path)
src_df.display()
1. ベースとなるノートブックを作成
db_auto_loader_by_parameters.py
というファイル名でコードをソースファイルとして import
# Databricks notebook source
import json
from pyspark.sql.functions import expr
# COMMAND ----------
# ウィジェットの初期化
dbutils.widgets.removeAll()
# COMMAND ----------
# パラメータの受け取り
debug_pipe_conf = """{
"src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
"checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_1",
"tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_1"
}
"""
pipe_conf_wiget_name = "pipe_conf"
dbutils.widgets.text(pipe_conf_wiget_name, debug_pipe_conf)
pipe_conf_wiget_value = dbutils.widgets.get(pipe_conf_wiget_name)
print(f"`pipe_conf_wiget_value`: {pipe_conf_wiget_value}")
# COMMAND ----------
# 変数定義
pipe_conf = json.loads(pipe_conf_wiget_value)
src_dir = pipe_conf["src_dir"]
checkpoint_dir = pipe_conf["checkpoint_dir"]
tgt_tbl_name = pipe_conf["tgt_tbl_name"]
# COMMAND ----------
# Databricks Auto Loader の実行
src_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_dir)
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.partitionColumns", "ingest_date,ingest_timestamp")
.option("inferSchema", True)
.load(src_dir)
)
add_cols = {
"_metadata_file_path": expr("_metadata.file_path"),
"ingest_date": expr("to_date(ingest_date)"),
"ingest_timestamp": expr("try_to_timestamp(replace(ingest_timestamp, '%3A', ':'))"),
}
src_df = src_df.withColumns(add_cols)
(
src_df.writeStream
.option("mergeSchema", True)
.option("checkpointLocation", checkpoint_dir)
.trigger(availableNow=True)
.toTable(tgt_tbl_name)
)
# COMMAND ----------
# 事後処理
spark.sql(f"VACUUM {tgt_tbl_name}")
spark.sql(f"ANALYZE TABLE {tgt_tbl_name} COMPUTE STATISTICS FOR ALL COLUMNS")
# COMMAND ----------
# 終了処理
retun_value = json.dumps({})
dbutils.notebook.exit(retun_value)
ノートブックの実行確認
テーブルへの書き込み確認
# 書き込み先のテーブルを表示
spark.table("hive_metastore.auto_loader_test.tbl_1").display()
2. json 形式の文字列を渡して実行
ベースとなるノートブックと同じディレクトリのノートブックにてノートブック実行に関する変数を定義
# 実行するノートブックのパスを定義
nb_path = "./db_auto_loader_by_parameters"
# タイムアウト時間を定義
timeout = 0
pipe_conf = """{
"src_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/src_data/nation",
"checkpoint_dir": "dbfs:/user/hive/warehouse/auto_loader_test.db/checkpoint/tbl_2",
"tgt_tbl_name": "hive_metastore.auto_loader_test.tbl_2"
}
"""
nb_para = {
"pipe_conf": pipe_conf,
}
dbutils.notebook.run
によるノートブック実行
# ノートブックを実行
nb_results = dbutils.notebook.run(
nb_path,
timeout,
nb_para,
)
print(nb_results)
テーブルへの書き込み確認
# 書き込み先のテーブルを表示
import json
spark.table(json.loads(pipe_conf)["tgt_tbl_name"]).display()
リソースの削除
%sql
DROP SCHEMA IF EXISTS auto_loader_test CASCADE;