1
1

More than 1 year has passed since last update.

Spark のデータオブジェクト のメタデータ取得方法

Last updated at Posted at 2022-10-05

概要

Spark のデータオブジェクトのメタデータを、PySpark 、および、Spark SQL により取得する方法を説明する。本記事では、Databricks での実行結果を掲載。

本記事の位置付け

次の開発ガイドシリーズにおけるメタデータデプロイ分野の補足記事であり、リンク先には記事にて記事の全体像を整理している。

GroupID 分野
T10 Spark概要
T20 データエンジニアリング
T30 データ品質チェック
T40 データサイエンス
T50 メタデータデプロイ
T60 テスト
T70 DevOps

事前準備

from pyspark.sql import Row
from decimal import *
import datetime
import pprint
import random, string
random_string = ''.join(random.choices(string.ascii_letters, k=5))
db_name = f'_metadata_test_database_{random_string}'
print(db_name)
# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)

sql = f'CREATE DATABASE {db_name}'
spark.sql(sql)
table_name      = '_metadata_test_table'
table_full_name = f'{db_name}.{table_name}'

sample_Data = [
    Row(
        string_column    = 'AAA', 
        byte_column      = 1, 
        integer_column   = 1, 
        bigint_column    = 1, 
        float_column     = 12.300000190734863, 
        double_column    = 12.3, 
        numeric_column   = Decimal('12'), 
        boolean_column   = True, 
        date_column      = datetime.date(2020, 1, 1), 
        timestamp_column = datetime.datetime(2021, 1, 1, 0, 0), 
        binary_column    = bytearray(b'A'), 
        struct_column    = Row(
            struct_string_column = 'AAA', 
            struct_int_column    = 1
        ), 
        array_column     = ['AAA', 'BBB', 'CCC'], 
        map_column       = {'AAA': 1}
    )
]


schema = '''
--文字型
string_column string,

--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,

--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,

--真偽型
boolean_column boolean,

--日付時刻
date_column date, 
timestamp_column timestamp,

--バイナリー型
binary_column binary,

--複合型
struct_column struct<
    struct_string_column :string,
    struct_int_column    :int
>,
array_column array<string>, 
map_column map<string, int>
'''

df = spark.createDataFrame(sample_Data, schema)
df.display()

# テーブルを作成
spark.sql(f'DROP TABLE IF EXISTS {table_full_name}')
df.write.format('delta').saveAsTable(table_full_name)
table_name_02      = '_metadata_test_table_02'
table_full_name_02 = f'{db_name}.{table_name_02}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_02}
'''
spark.sql(sql)

sql = f'''
CREATE TABLE {table_full_name_02}
(
    id BIGINT GENERATED ALWAYS AS IDENTITY
    ,string_column string
    ,integer_column integer
    ,date_column date
    ,generated_col INT GENERATED ALWAYS AS (integer_column)
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
PARTITIONED BY (
    string_column
)
COMMENT "table_comment"
'''
spark.sql(sql)

table_name_03      = '_metadata_test_table_03'
table_full_name_03 = f'{db_name}.{table_name_03}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_03}
'''
spark.sql(sql)

sql = f'''
CREATE OR REPLACE TABLE {table_full_name_03}
(
    col_001 string
    ,col_002 string
    ,col_003 string
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
'''
spark.sql(sql)

view_name      = '_v_metadata_test_table'
view_full_name = f'{db_name}.{view_name}'

sql = f'''
CREATE OR REPLACE VIEW {view_full_name}
AS
SELECT
  *
  FROM
    {table_full_name}
'''

spark.sql(sql)

Spark Schema (Database) のメタデータ取得方法

DESCRIBE DATABASE

sql = f"DESCRIBE DATABASE {db_name}"

spark.sql(sql).display()

image.png

SHOW SCHEMAS

sql = f"SHOW SCHEMAS LIKE '{db_name}'"

spark.sql(sql).display()

image.png

SHOW TABLES

sql = f"SHOW TABLES FROM {db_name}"

spark.sql(sql).display()

image.png

SHOW VIEWS

sql = f"SHOW VIEWS FROM {db_name}"

spark.sql(sql).display()

image.png

Spark Table のメタデータ

SHOW CREATE TABLE

sql = f'SHOW CREATE TABLE {table_full_name}'

spark.sql(sql).display()
print(spark.sql(sql).collect()[0][0]) 

image.png

DESCRIBE

sql = f'DESCRIBE {table_full_name}'

spark.sql(sql).display()

image.png

sql = f'DESCRIBE {table_full_name} integer_column'

spark.sql(sql).display()

image.png

DESCRIBE EXTENDED

sql = f'DESCRIBE EXTENDED {table_full_name}'

spark.sql(sql).display()

image.png

sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'

spark.sql(sql).display()

image.png

# 統計情報取得後に、カラムの統計情報を取得
sql = f'ANALYZE TABLE {table_full_name} COMPUTE STATISTICS FOR COLUMNS integer_column'
spark.sql(sql).display()
sql = f'DESCRIBE EXTENDED {table_full_name} integer_column'
spark.sql(sql).display()

image.png

DESCRIBE QUERY

sql = f'''
DESCRIBE QUERY
SELECT
  string_column
  ,integer_column
  ,COUNT(*) AS COUNT

  FROM 
    {table_full_name}
  GROUP BY
    string_column
    ,integer_column

'''

spark.sql(sql).display()

image.png

SHOW COLUMNS

sql = f'SHOW COLUMNS IN {table_full_name}'

spark.sql(sql).display()

image.png

SHOW TBLPROPERTIES

sql = f'SHOW TBLPROPERTIES {table_full_name}'

spark.sql(sql).display()

image.png

Delta Lake テーブルのメタデータ取得

DESCRIBE DETAIL

sql = f'DESCRIBE DETAIL {table_full_name}'

spark.sql(sql).display()

image.png

DESCRIBE HISTORY

sql = f'DESCRIBE HISTORY {table_full_name}'

spark.sql(sql).display()

image.png

Delta Transaction Log上のメタデータ

import json

# 最新の Delta Transaction Log の version を取得
max_version = (
    spark
    .sql(f'DESC HISTORY {table_full_name_02}')
    .sort('version', ascending=False)
    .select('version')
    .limit(1)
    .collect()[0][0]
)

# Delta Lake テーブルのファイルパスを取得
file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]

file_content = dbutils.fs.head(f'{file_path}/_delta_log/{max_version:020}.json')

# 1つファイルに内に格納される json 文字列群から、`metaData`の文字列を取得
delta_log_metadata = json.loads(
    [l for l in file_content.split('\n') if l != '' and json.loads(l).get('metaData') != None][0]
)

pprint.pprint(delta_log_metadata)

image.png

Spark Dataframe のメタデータ

DataFrame.printSchema

df.printSchema()

image.png

DataFrame.schema

df.schema

image.png

toDDL(DDL 文字列)

df_schema = df.schema
pprint.pprint(spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(df_schema.json()).toDDL())

image.png

DataFrame.columns

df.columns

image.png

DataFrame.dtypes

df.dtypes

image.png

DataFrame.storageLevel

df.storageLevel

image.png

DataFrame.inputFiles

filepath = "dbfs:/databricks-datasets/tpch/data-001/region/"

schema = """
    r_regionkey date,
    r_name string,
    r_comment string
"""
   
df_2 = (
    spark
    .read
    .format("csv")
    .schema(schema)
    .option("sep", "|")
    .load(filepath)
)

df_2.inputFiles()

image.png

Spark Config の設定値

spark_confs = spark.sparkContext.getConf().getAll()

spark.createDataFrame(spark_confs).limit(5).display()

image.png

sql = 'set -v'

spark.sql(sql).limit(5).display()

image.png

Databricks 固有機能

SHOW GRANTS ON

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
try:
    sql = f'SHOW GRANTS ON {table_full_name}'
    spark.sql(sql).display()
except:
    print('Databricks の テーブルアクセスコントロールができるクラスターで実行してください。')

image.png

リソースのクリーンアップ

# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)

image.png

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1