LoginSignup
3
1

More than 1 year has passed since last update.

PySpark によるメタデータデプロイの実践

Last updated at Posted at 2022-10-05

概要

PySpark によるメタデータデプロイの実践について説明する。本記事では、Databricks での実行結果を掲載。

本記事のコードを含むノートブックを以下のリンクに保存してあり、Databricks 等の環境にインポートして実行が可能。

本記事の位置付け

次の開発ガイドシリーズにおけるメタデータデプロイ分野の1記事であり、リンク先には記事にて記事の全体像を整理している。

GroupID 分野
T10 Spark概要
T20 データエンジニアリング
T30 データ品質チェック
T40 データサイエンス
T50 メタデータデプロイ
T60 テスト
T70 DevOps

データデプロイ概要

Spark におけるメタデータを保持する主なオブジェクトとして次のものがあり、CRUD(作成、読込、更新、削除)という観点で適切なメタデータデプロイが必要。Spark (+Delta Lake)を利用するメリットの1つにデータ参照のダウンタイムがほぼないことがあり、メタデータデプロイ時にもダウンタイムを最小限とすることが望ましい。

# 取得元 取得元の詳細
1 Delta Lake ディレクトリ - _delta_log
2 メタストア - Spark Schema(Database)
- Spark Table
- Spark View
3 Spark Dataframe - SparkSession
- Data Sources

Spark においては、メタストアにて多くのメタデータが格納される。一般的には Hive メタストアを用いることが多いが、Spark プロバイダー固有のメタストア(Databricks における Unity Catalog)が提供されていることがある。メタストアからメタデータを取得する際には、Spark 環境からDESCRIBEなどのコマンドにより参照する。Delta Lake を用いる場合にはメタストアに存在しないメタデータが Delta Lake ディレクトリに格納されることもある。

メタデータのデプロイ方法としては命令型と宣言型があるが、Spark にて宣言型でデプロイを行うためのツールは公開されていないため、宣言型デプロイを行うためにはスクラッチでの開発が必要。

# デプロイ方法 実施方法 他データストアのツール例
1 命令型 デプロイ対象の DDL 文等の処理を随時実行する。
2 宣言型 デプロイ後のテーブル定義等を保持したをモデルの定義を行い、そのモデルとの差分を DDL 文等により反映する。 - Flyway
- データ層アプリケーション (DAC) - SQL Server

メタデータの更新を行う際には、ALTER 文の実行のみでは完結しないものがあることに注意が必要。

  • ALTER 文を実行のみで可能
    1. テーブル名を変更
    2. カラムの追加
    3. カラムのコメントを変更
    4. カラム順の変更
    5. テーブルプロパティ(TBLPROPERTIES)の変更
    6. Not NULL制約の設定と削除
    7. Check制約の設定と削除
  • データの再書き込みが必要
    1. カラム名を変更
    2. カラムを削除
    3. カラムのデータ型を変更
    4. パーティションカラムを変更
    5. 生成列の追加
  • 他のパスへのデータの書き込みが必要
    1. テーブルの保存場所(LOCATION)の変更
  • Optimizeの実行が必要
    1. bloomfitlerの設定

事前準備

from pyspark.sql import Row
from decimal import *
import datetime
import pprint

image.png

import random, string
random_string = ''.join(random.choices(string.ascii_letters, k=5))

image.png

db_name = f'_metadata_test_database_{random_string}'
print(db_name)

image.png

# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)

sql = f'CREATE DATABASE {db_name}'
spark.sql(sql)

image.png

table_name      = '_metadata_test_table'
table_full_name = f'{db_name}.{table_name}'

sample_Data = [
    Row(
        string_column    = 'AAA', 
        byte_column      = 1, 
        integer_column   = 1, 
        bigint_column    = 1, 
        float_column     = 12.300000190734863, 
        double_column    = 12.3, 
        numeric_column   = Decimal('12'), 
        boolean_column   = True, 
        date_column      = datetime.date(2020, 1, 1), 
        timestamp_column = datetime.datetime(2021, 1, 1, 0, 0), 
        binary_column    = bytearray(b'A'), 
        struct_column    = Row(
            struct_string_column = 'AAA', 
            struct_int_column    = 1
        ), 
        array_column     = ['AAA', 'BBB', 'CCC'], 
        map_column       = {'AAA': 1}
    )
]


schema = '''
--文字型
string_column string,

--整数型
byte_column byte,
integer_column integer,
bigint_column bigint,

--浮動小数点型
float_column float,
double_column double,
numeric_column numeric,

--真偽型
boolean_column boolean,

--日付時刻
date_column date, 
timestamp_column timestamp,

--バイナリー型
binary_column binary,

--複合型
struct_column struct<
    struct_string_column :string,
    struct_int_column    :int
>,
array_column array<string>, 
map_column map<string, int>
'''

df = spark.createDataFrame(sample_Data, schema)
df.display()

# テーブルを作成
spark.sql(f'DROP TABLE IF EXISTS {table_full_name}')
df.write.format('delta').saveAsTable(table_full_name)

image.png

table_name_02      = '_metadata_test_table_02'
table_full_name_02 = f'{db_name}.{table_name_02}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_02}
'''
spark.sql(sql)

sql = f'''
CREATE TABLE {table_full_name_02}
(
    id BIGINT GENERATED ALWAYS AS IDENTITY
    ,string_column string
    ,integer_column integer
    ,date_column date
    ,generated_col INT GENERATED ALWAYS AS (integer_column)
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
PARTITIONED BY (
    string_column
)
COMMENT "table_comment"
'''
spark.sql(sql)

image.png

table_name_03      = '_metadata_test_table_03'
table_full_name_03 = f'{db_name}.{table_name_03}'

sql = f'''
DROP TABLE IF EXISTS {table_full_name_03}
'''
spark.sql(sql)

sql = f'''
CREATE OR REPLACE TABLE {table_full_name_03}
(
    col_001 string
    ,col_002 string
    ,col_003 string
)
USING delta
TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
'''
spark.sql(sql)

image.png

view_name      = '_v_metadata_test_table'
view_full_name = f'{db_name}.{view_name}'

sql = f'''
CREATE OR REPLACE VIEW {view_full_name}
AS
SELECT
  *
  FROM
    {table_full_name}
'''

spark.sql(sql)

image.png

Delta Lake ディレクトリにおけるメタデータ

Delta Lake ディレクトリでは、直下に作成される_delta_log ディレクトリに情報が格納される。保持する情報については、Delta Lake の Gtihub レポジトリにて詳細が記載されている。

この章では、_delta_logにどのような情報が格納されているかをコードの実行を通して説明する。

# DESCRIBE の実行結果は、メタストアから取得
sql = f'DESCRIBE {table_full_name_02}'

spark.sql(sql).display()

image.png

# DESCRIBE HISTORY の実行結果は、Delta lake テーブルのメタデータの一部を確認可能
sql = f'DESCRIBE HISTORY {table_full_name_02}'

# `userName` カラムにメールアドレスが含まれる
# spark.sql(sql).display()
spark.sql(sql).drop('userName').display()

image.png

file_path = spark.sql(f'DESC EXTENDED {table_full_name_02}').where("col_name = 'Location'").select('data_type').collect()[0][0]

image.png

display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

image.png

print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))

image.png

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json'))

image.png

file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.json')
file_contents = file_contents.split('\n')

import json
print('--protocol--')
print(json.loads(file_contents[0]))
print('--metaData--')
print(json.loads(file_contents[1]))

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--Operation--')
print(json.loads(file_contents[2]))

image.png

sql = f'''
CREATE BLOOMFILTER INDEX
ON {table_full_name_02}
FOR COLUMNS(
  integer_column
)
'''
spark.sql(sql)

image.png

display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

image.png

print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000000.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))

image.png

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json'))

image.png

file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.json')
file_contents = file_contents.split('\n')

import json
print('--metaData--')
print(json.loads(file_contents[0]))

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--Operation--')
print(json.loads(file_contents[1]))

image.png

sql = f'''
INSERT INTO {table_full_name_02}
(
    string_column
    ,integer_column
    ,date_column
)
VALUES
('a', 1, '2020-01-01'),
('b', 2, '2020-02-01'),
('c', 3, '2020-03-01')

'''

spark.sql(sql)

image.png

display(dbutils.fs.ls(file_path))
display(dbutils.fs.ls(f'{file_path}/_delta_log'))

image.png

print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000001.crc'))
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.crc'))

image.png

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print(dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json'))

image.png

file_contents = dbutils.fs.head(f'{file_path}/_delta_log/00000000000000000002.json')
file_contents = file_contents.split('\n')

import json
print('--Operation--')
print(json.loads(file_contents[0]))
print('--Operation--')
print(json.loads(file_contents[1]))
print('--Operation--')
print(json.loads(file_contents[2]))

# 下記については、ユーザー名(メールアドレス)が表示されるため実行結果共有時に注意が必要
print('--commitInfo--')
print(json.loads(file_contents[3]))

image.png

メタデータのデプロイ

メタデータの命令型デプロイ

spark.table(table_full_name_03).printSchema()

image.png

sql = f'''
ALTER TABLE {table_full_name_03}
  ADD COLUMN 
  (
    col_004 string
  )
'''

spark.sql(sql)

image.png

spark.table(table_full_name_03).printSchema()

image.png

メタデータの宣言型デプロイ

# 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加するメソッドを追加
import inspect

def add_cols_to_spark_tbl(
    cols_conf,
    tbl_name,
):
    """Spark テーブルに対してカラムを追加
    """
    # 期待値のカラムリストを取得
    expected_cols_list = [d["name"] for d in cols_conf]

    # 実際のカラムリストを取得
    actual_cols_list = spark.table(tbl_name).columns

    # 設定値のカラムと実際のカラムを比較して、追加対象のカラム一覧を取得
    cols_to_be_add = list(set(sorted(expected_cols_list)) - set(sorted(actual_cols_list)))

    # 現在のテーブルカラムと設定値を比較して差分がある場合にカラムを追加
    for col_to_be_add in cols_to_be_add:
        for col_info_to_be_executed in [col_info for col_info in cols_conf if col_info['name'] == col_to_be_add]:
            col_name = col_info_to_be_executed['name']
            data_type = col_info_to_be_executed['type']
            add_col_ddl = f'''
            ALTER TABLE {tbl_name}
              ADD COLUMNS {col_name} {data_type}'''
            add_col_ddl = inspect.cleandoc(add_col_ddl)
            spark.sql(add_col_ddl)

            # 実行した DDL を表示
            print(add_col_ddl)

image.png

# 現在のテーブルと同義の設定値を定義
tbl_conf = {
    'table_name': table_full_name_03,
    'schema': [
        {'name': 'col_001', 'type': 'STRING'},
        {'name': 'col_002', 'type': 'STRING'},
        {'name': 'col_003', 'type': 'string'},
        {'name': 'col_004', 'type': 'STRING'},
    ],
}

image.png

# カラム追加の処理が実行されない
add_cols_to_spark_tbl(
    cols_conf = tbl_conf['schema'],
    tbl_name = tbl_conf['table_name'],
)

image.png

# `col_005`を追加した設定値を定義
tbl_conf = {
    'table_name': table_full_name_03,
    'schema': [
        {'name': 'col_001', 'type': 'STRING'},
        {'name': 'col_002', 'type': 'STRING'},
        {'name': 'col_003', 'type': 'string'},
        {'name': 'col_004', 'type': 'STRING'},
        {'name': 'col_005', 'type': 'STRING'},
    ],
}

image.png

# カラム追加の処理が実行される想定
add_cols_to_spark_tbl(
    cols_conf = tbl_conf['schema'],
    tbl_name = tbl_conf['table_name'],
)

image.png

# `col_005`が追加されたことを確認
spark.table(table_full_name_03).printSchema()

image.png

Spark のデータオブジェクト のメタデータ取得方法

詳細は、 Spark のデータオブジェクト のメタデータ取得方法 - Qiita の記事にて記載。

リソースのクリーンアップ

# データベースを作成
sql = f'DROP DATABASE IF EXISTS {db_name} CASCADE'
spark.sql(sql)

image.png

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1