概要
Spark のデータオブジェクトのメタデータを、PySpark 、および、Spark SQL により取得する方法を説明する。本記事では、Databricks での実行結果を掲載。
本記事の位置付け
次の開発ガイドシリーズにおけるメタデータデプロイ分野の補足記事であり、リンク先には記事にて記事の全体像を整理している。
GroupID | 分野 |
---|---|
T10 | Spark概要 |
T20 | データエンジニアリング |
T30 | データ品質チェック |
T40 | データサイエンス |
T50 | メタデータデプロイ |
T60 | テスト |
T70 | DevOps |
事前準備
from pyspark.sql import Row
from decimal import *
import datetime
import pprint
import random, string
random_string = ''.join(random.choices(string.ascii_letters, k=5))
db_name = f'_metadata_test_database_{random_string}'
print(db_name)
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)
sql = f'CREATE DATABASE {db_name}'
spark.sql(sql)
table_name = '_metadata_test_table'
table_full_name = f'{db_name}.{table_name}'
sample_Data = [
Row(
string_column = 'AAA',
byte_column = 1,
integer_column = 1,
bigint_column = 1,
float_column = 12.300000190734863,
double_column = 12.3,
numeric_column = Decimal('12'),
boolean_column = True,
date_column = datetime.date(2020, 1, 1),
timestamp_column = datetime.datetime(2021, 1, 1, 0, 0),
binary_column = bytearray(b'A'),
struct_column = Row(
struct_string_column = 'AAA',
struct_int_column = 1
),
array_column = ['AAA', 'BBB', 'CCC'],
map_column = {'AAA': 1}
)
]
schema = '''
--文字型
string_column string,
--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,
--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,
--真偽型
boolean_column boolean,
--日付時刻
date_column date,
timestamp_column timestamp,
--バイナリー型
binary_column binary,
--複合型
struct_column struct<
struct_string_column :string,
struct_int_column :int
>,
array_column array<string>,
map_column map<string, int>
'''
df = spark.createDataFrame(sample_Data, schema)
df.display()
# テーブルを作成
spark.sql(f'DROP TABLE IF EXISTS {table_full_name}')
df.write.format('delta').saveAsTable(table_full_name)
table_name_02 = '_metadata_test_table_02'
table_full_name_02 = f'{db_name}.{table_name_02}'
sql = f'''
DROP TABLE IF EXISTS {table_full_name_02}
'''
spark.sql(sql)
sql = f'''
CREATE TABLE {table_full_name_02}
(
id BIGINT GENERATED ALWAYS AS IDENTITY
,string_column string
,integer_column integer
,date_column date
,generated_col INT GENERATED ALWAYS AS (integer_column)
)
USING delta
TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = True,
delta.autoOptimize.autoCompact = True,
delta.dataSkippingNumIndexedCols = 1
)
PARTITIONED BY (
string_column
)
COMMENT "table_comment"
'''
spark.sql(sql)
table_name_03 = '_metadata_test_table_03'
table_full_name_03 = f'{db_name}.{table_name_03}'
sql = f'''
DROP TABLE IF EXISTS {table_full_name_03}
'''
spark.sql(sql)
sql = f'''
CREATE OR REPLACE TABLE {table_full_name_03}
(
col_001 string
,col_002 string
,col_003 string
)
USING delta
TBLPROPERTIES (
delta.autoOptimize.optimizeWrite = True,
delta.autoOptimize.autoCompact = True,
delta.dataSkippingNumIndexedCols = 1
)
'''
spark.sql(sql)
view_name = '_v_metadata_test_table'
view_full_name = f'{db_name}.{view_name}'
sql = f'''
CREATE OR REPLACE VIEW {view_full_name}
AS
SELECT
*
FROM
{table_full_name}
'''
spark.sql(sql)
Spark Schema (Database) のメタデータ取得方法
DESCRIBE DATABASE
sql = f"DESCRIBE DATABASE {db_name}"
spark.sql(sql).display()
SHOW SCHEMAS
sql = f"SHOW SCHEMAS LIKE '{db_name}'"
spark.sql(sql).display()
SHOW TABLES
sql = f"SHOW TABLES FROM {db_name}"
spark.sql(sql).display()
SHOW VIEWS
sql = f"SHOW VIEWS FROM {db_name}"
spark.sql(sql).display()
Spark Table のメタデータ
SHOW CREATE TABLE
sql = f'SHOW CREATE TABLE {table_full_name}'
spark.sql(sql).display()
print(spark.sql(sql).collect()[0][0])
DESCRIBE
sql = f'DESCRIBE {table_full_name}'
spark.sql(sql).display()
sql = f'DESCRIBE {table_full_name} integer_column'
spark.sql(sql).display()
DESCRIBE EXTENDED
sql = f'DESCRIBE EXTENDED {table_full_name}'
spark.sql(sql).display()
sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'
spark.sql(sql).display()
# 統計情報取得後に、カラムの統計情報を取得
sql = f'ANALYZE TABLE {table_full_name} COMPUTE STATISTICS FOR COLUMNS integer_column'
spark.sql(sql).display()
sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'
spark.sql(sql).display()
DESCRIBE QUERY
sql = f'''
DESCRIBE QUERY
SELECT
string_column
,integer_column
,COUNT(*) AS COUNT
FROM
{table_full_name}
GROUP BY
string_column
,integer_column
'''
spark.sql(sql).display()
SHOW COLUMNS
sql = f'SHOW COLUMNS IN {table_full_name}'
spark.sql(sql).display()
SHOW TBLPROPERTIES
sql = f'SHOW TBLPROPERTIES {table_full_name}'
spark.sql(sql).display()
Delta Lake テーブルのメタデータ取得
DESCRIBE DETAIL
sql = f'DESCRIBE DETAIL {table_full_name}'
spark.sql(sql).display()
DESCRIBE HISTORY
sql = f'DESCRIBE HISTORY {table_full_name}'
spark.sql(sql).display()
Delta Transaction Log
上のメタデータ
import json
# 最新の Delta Transaction Log の version を取得
max_version = (
spark
.sql(f'DESC HISTORY {table_full_name_02}')
.sort('version', ascending=False)
.select('version')
.limit(1)
.collect()[0][0]
)
# Delta Lake テーブルのファイルパスを取得
file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]
file_content = dbutils.fs.head(f'{file_path}/_delta_log/{max_version:020}.json')
# 1つファイルに内に格納される json 文字列群から、`metaData`の文字列を取得
delta_log_metadata = json.loads(
[l for l in file_content.split('\n') if l != '' and json.loads(l).get('metaData') != None][0]
)
pprint.pprint(delta_log_metadata)
Spark Dataframe のメタデータ
DataFrame.printSchema
df.printSchema()
DataFrame.schema
df.schema
toDDL(DDL 文字列)
df_schema = df.schema
pprint.pprint(spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(df_schema.json()).toDDL())
DataFrame.columns
df.columns
DataFrame.dtypes
df.dtypes
DataFrame.storageLevel
df.storageLevel
DataFrame.inputFiles
filepath = "dbfs:/databricks-datasets/tpch/data-001/region/"
schema = """
r_regionkey date,
r_name string,
r_comment string
"""
df_2 = (
spark
.read
.format("csv")
.schema(schema)
.option("sep", "|")
.load(filepath)
)
df_2.inputFiles()
Spark Config の設定値
spark_confs = spark.sparkContext.getConf().getAll()
spark.createDataFrame(spark_confs).limit(5).display()
sql = 'set -v'
spark.sql(sql).limit(5).display()
Databricks 固有機能
SHOW GRANTS ON
# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
try:
sql = f'SHOW GRANTS ON {table_full_name}'
spark.sql(sql).display()
except:
print('Databricks の テーブルアクセスコントロールができるクラスターで実行してください。')
リソースのクリーンアップ
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)