概要
PySpark によるメタデータデプロイの実践について説明する。本記事では、Databricks での実行結果を掲載。
本記事のコードを含むノートブックを以下のリンクに保存してあり、Databricks 等の環境にインポートして実行が可能。
本記事の位置付け
次の開発ガイドシリーズにおけるメタデータデプロイ分野の1記事であり、リンク先には記事にて記事の全体像を整理している。
GroupID | 分野 |
---|---|
T10 | Spark概要 |
T20 | データエンジニアリング |
T30 | データ品質チェック |
T40 | データサイエンス |
T50 | メタデータデプロイ |
T60 | テスト |
T70 | DevOps |
データデプロイ概要
Spark におけるメタデータを保持する主なオブジェクトとして次のものがあり、CRUD(作成、読込、更新、削除)という観点で適切なメタデータデプロイが必要。Spark (+Delta Lake)を利用するメリットの1つにデータ参照のダウンタイムがほぼないことがあり、メタデータデプロイ時にもダウンタイムを最小限とすることが望ましい。
# | 取得元 | 取得元の詳細 |
---|---|---|
1 | Delta Lake ディレクトリ | - _delta_log |
2 | メタストア | - Spark Schema(Database) - Spark Table - Spark View |
3 | Spark Dataframe | - SparkSession - Data Sources |
Spark においては、メタストアにて多くのメタデータが格納される。一般的には Hive メタストアを用いることが多いが、Spark プロバイダー固有のメタストア(Databricks における Unity Catalog)が提供されていることがある。メタストアからメタデータを取得する際には、Spark 環境からDESCRIBE
などのコマンドにより参照する。Delta Lake を用いる場合にはメタストアに存在しないメタデータが Delta Lake ディレクトリに格納されることもある。
メタデータのデプロイ方法としては命令型と宣言型があるが、Spark にて宣言型でデプロイを行うためのツールは公開されていないため、宣言型デプロイを行うためにはスクラッチでの開発が必要。
# | デプロイ方法 | 実施方法 | 他データストアのツール例 |
---|---|---|---|
1 | 命令型 | デプロイ対象の DDL 文等の処理を随時実行する。 | |
2 | 宣言型 | デプロイ後のテーブル定義等を保持したをモデルの定義を行い、そのモデルとの差分を DDL 文等により反映する。 | - Flyway - データ層アプリケーション (DAC) - SQL Server |
メタデータの更新を行う際には、ALTER 文の実行のみでは完結しないものがあることに注意が必要。
- ALTER 文を実行のみで可能
- テーブル名を変更
- カラムの追加
- カラムのコメントを変更
- カラム順の変更
- テーブルプロパティ(TBLPROPERTIES)の変更
- Not NULL制約の設定と削除
- Check制約の設定と削除
- データの再書き込みが必要
- カラム名を変更
- カラムを削除
- カラムのデータ型を変更
- パーティションカラムを変更
- 生成列の追加
- 他のパスへのデータの書き込みが必要
- テーブルの保存場所(LOCATION)の変更
- Optimizeの実行が必要
- bloomfitlerの設定
事前準備
from pyspark.sql import Row
from decimal import *
import datetime
import pprint
import random, string
random_string = ''.join(random.choices(string.ascii_letters, k=5))
db_name = f'_metadata_test_database_{random_string}'
print(db_name)
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)
sql = f'CREATE DATABASE {db_name}'
spark.sql(sql)
table_name = '_metadata_test_table'
table_full_name = f'{db_name}.{table_name}'
sample_Data = [
Row(
string_column = 'AAA',
byte_column = 1,
integer_column = 1,
bigint_column = 1,
float_column = 12.300000190734863,
double_column = 12.3,
numeric_column = Decimal('12'),
boolean_column = True,
date_column = datetime.date(2020, 1, 1),
timestamp_column = datetime.datetime(2021, 1, 1, 0, 0),
binary_column = bytearray(b'A'),
struct_column = Row(
struct_string_column = 'AAA',
struct_int_column = 1
),
array_column = ['AAA', 'BBB', 'CCC'],
map_column = {'AAA': 1}
)
]
schema = '''
--文字型
string_column string,
--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,
--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,
--真偽型
boolean_column boolean,
--日付時刻
date_column date,
timestamp_column timestamp,
--バイナリー型
binary_column binary,
--複合型
struct_column struct<
struct_string_column :string,
struct_int_column :int
>,
array_column array<string>,
map_column map<string, int>
'''
df = spark.createDataFrame(sample_Data, schema)
df.display()
# テーブルを作成
spark.sql(f'DROP TABLE IF EXISTS {table_full_name}')
df.write.format('delta').saveAsTable(table_full_name)
table_name_02 = '_metadata_test_table_02'
table_full_name_02 = f'{db_name}.{table_name_02}'
sql = f'''
DROP TABLE IF EXISTS {table_full_name_02}
'''
spark.sql(sql)
sql = f'''
CREATE TABLE {table_full_name_02}
(
id BIGINT GENERATED ALWAYS AS IDENTITY
,string_column string
,integer_column integer
,date_column date
,generated_col INT GENERATED ALWAYS AS (integer_column)
)
USING delta
TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = True,
delta.autoOptimize.autoCompact = True,
delta.dataSkippingNumIndexedCols = 1
)
PARTITIONED BY (
string_column
)
COMMENT "table_comment"
'''
spark.sql(sql)
table_name_03 = '_metadata_test_table_03'
table_full_name_03 = f'{db_name}.{table_name_03}'
sql = f'''
DROP TABLE IF EXISTS {table_full_name_03}
'''
spark.sql(sql)
sql = f'''
CREATE OR REPLACE TABLE {table_full_name_03}
(
col_001 string
,col_002 string
,col_003 string
)
USING delta
TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = True,
delta.autoOptimize.autoCompact = True,
delta.dataSkippingNumIndexedCols = 1
)
'''
spark.sql(sql)
view_name = '_v_metadata_test_table'
view_full_name = f'{db_name}.{view_name}'
sql = f'''
CREATE OR REPLACE VIEW {view_full_name}
AS
SELECT
*
FROM
{table_full_name}
'''
spark.sql(sql)
Delta Lake ディレクトリにおけるメタデータ
Delta Lake ディレクトリでは、直下に作成される_delta_log
ディレクトリに情報が格納される。保持する情報については、Delta Lake の Gtihub レポジトリにて詳細が記載されている。
- Understanding the Delta Lake Transaction Log - Databricks Blog
- delta/PROTOCOL.md at master · delta-io/delta · GitHub
この章では、_delta_log
にどのような情報が格納されているかをコードの実行を通して説明する。
# DESCRIBE の実行結果は、メタストアから取得
sql = f'DESCRIBE {table_full_name_02}'
spark.sql(sql).display()
# DESCRIBE HISTORY の実行結果は、Delta lake テーブルのメタデータの一部を確認可能
sql = f'DESCRIBE HISTORY {table_full_name_02}'
# `userName` カラムにメールアドレスが含まれる
# spark.sql(sql).display()
spark.sql(sql).drop('userName').display()
file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json'))
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json')
file_contents = file_contents.split('\n')
import json
print('--protocol--')
print(json.loads(file_contents[0]))
print('--metaData--')
print(json.loads(file_contents[1]))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--Operation--')
print(json.loads(file_contents[2]))
sql = f'''
CREATE BLOOMFILTER INDEX
ON {table_full_name_02}
FOR COLUMNS(
integer_column
)
'''
spark.sql(sql)
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json'))
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json')
file_contents = file_contents.split('\n')
import json
print('--metaData--')
print(json.loads(file_contents[0]))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--Operation--')
print(json.loads(file_contents[1]))
sql = f'''
INSERT INTO {table_full_name_02}
(
string_column
,integer_column
,date_column
)
VALUES
('a', 1, '2020-01-01'),
('b', 2, '2020-02-01'),
('c', 3, '2020-03-01')
'''
spark.sql(sql)
display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.crc'))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json'))
file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json')
file_contents = file_contents.split('\n')
import json
print('--Operation--')
print(json.loads(file_contents[0]))
print('--Operation--')
print(json.loads(file_contents[1]))
print('--Operation--')
print(json.loads(file_contents[2]))
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--commitInfo--')
print(json.loads(file_contents[3]))
メタデータのデプロイ
メタデータの命令型デプロイ
spark.table(table_full_name_03).printSchema()
sql = f'''
ALTER TABLE {table_full_name_03}
ADD COLUMN
(
col_004 string
)
'''
spark.sql(sql)
spark.table(table_full_name_03).printSchema()
メタデータの宣言型デプロイ
# 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加するメソッドを追加
import inspect
def add_cols_to_spark_tbl(
cols_conf,
tbl_name,
):
"""Spark テーブルに対してカラムを追加
"""
# 期待値のカラムリストを取得
expected_cols_list = [d["name"] for d in cols_conf]
# 実際のカラムリストを取得
actual_cols_list = spark.table(tbl_name).columns
# 設定値のカラムと実際のカラムを比較して、追加対象のカラム一覧を取得
cols_to_be_add = list(set(sorted(expected_cols_list)) - set(sorted(actual_cols_list)))
# 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加
for col_to_be_add in cols_to_be_add:
for col_info_to_be_executed in [col_info for col_info in cols_conf if col_info['name'] == col_to_be_add]:
col_name = col_info_to_be_executed['name']
data_type = col_info_to_be_executed['type']
add_col_ddl = f'''
ALTER TABLE {tbl_name}
ADD COLUMNS {col_name} {data_type}'''
add_col_ddl = inspect.cleandoc(add_col_ddl)
spark.sql(add_col_ddl)
# 実行した DDL を表示
print(add_col_ddl)
# 現在のテーブルと同義の設定値を定義
tbl_conf = {
'table_name': table_full_name_03,
'schema': [
{'name': 'col_001', 'type': 'STRING'},
{'name': 'col_002', 'type': 'STRING'},
{'name': 'col_003', 'type': 'string'},
{'name': 'col_004', 'type': 'STRING'},
],
}
# カラム追加の処理が実行されない
add_cols_to_spark_tbl(
cols_conf = tbl_conf['schema'],
tbl_name = tbl_conf['table_name'],
)
# `col_005`を追加した設定値を定義
tbl_conf = {
'table_name': table_full_name_03,
'schema': [
{'name': 'col_001', 'type': 'STRING'},
{'name': 'col_002', 'type': 'STRING'},
{'name': 'col_003', 'type': 'string'},
{'name': 'col_004', 'type': 'STRING'},
{'name': 'col_005', 'type': 'STRING'},
],
}
# カラム追加の処理が実行される想定
add_cols_to_spark_tbl(
cols_conf = tbl_conf['schema'],
tbl_name = tbl_conf['table_name'],
)
# `col_005`が追加されたことを確認
spark.table(table_full_name_03).printSchema()
Spark のデータオブジェクト のメタデータ取得方法
詳細は、 Spark のデータオブジェクト のメタデータ取得方法 - Qiita の記事にて記載。
リソースのクリーンアップ
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)