9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PySpark によるデータエンジニアリング実践

Last updated at Posted at 2022-10-05

概要

PySparkにてデータエンジニアリングを実施する際に知っておくべき次のテーマを説明する。

  1. ACID トランザクションの保証されているデータフォーマットの利用
  2. レイテンシーに応じたデータ処理方法の選択
  3. メダリオンアーキテクチャによるデータエンジニアリング
  4. Spark によるデータエンジニアリングに利用すべきプログラミング言語とライブラリー
  5. パフォーマン最適化
  6. データエンジニアリングにおける Spark 関連
  7. 処理の共通化

本記事のコードを含むノートブックを以下のリンクに保存しておりますので、興味がある方は Databricks 等の環境にインポートして実行してみてください。

本記事の位置付け

次の開発ガイドシリーズにおけるデータエンジニアリング分野の1記事であり、リンク先には記事にて記事の全体像を整理している。

GroupID 分野
T10 Spark概要
T20 データエンジニアリング
T30 データ品質チェック
T40 データサイエンス
T50 メタデータデプロイ
T60 テスト
T70 DevOps

1. ACID トランザクションが保証されるデータフォーマットの利用

データレイクでは、ソースから連携される CSV などのファイルをそのまま蓄積するものという固定概念があるが、構造データと半構造データを ACID トランザクションをサポートしているファイル形式(実際には複数種のファイルやディレクトリで構成されていることからファイルレイヤーと記載されていることもあり)として保存する方法が普及している。

OSSとして次のファイル形式がよく利用さている。Databricks では、基本的には Delta Lake 形式を用いることが推奨事項である。

Delta Lake 形式では、差分連携時に Merge 処理( Upsert 処理)が最適な方法として提供されているなどデータエンジニアリングのプログラムがシンプルとなる。従来の DWH 製品では更新処理が性能ボトルネックとなることからデータ書き込みプログラムが複雑となることがあった。

分離レベルがWriteSerializableであるため、従来の DWH 製品で実施される参照ロックを行われないこともあり、利用者が少ない夜間などのバッチ処理としてデータの書き込みを実施する必要もない。

スキーマ適用( schema enforcement )とスキーマ展開( schema evolution )により、プログラムの拡張性と信頼性のバランスをとることができる。

2. レイテンシーに応じたデータ処理方法の選択

データの生成からデータが利用可能になるまでの時間差(レイテンシー)要件に応じて、データ処理方法を選択する必要がある。データ処理方法に合わせて、ワークフローとして管理する方法の検討も行う。

# レイテンシーに応じた処理方法 実装例
1 バッチ 1-1. スケジュールトリガーによるSparkデータフレーム処理
1-2. Delta live tableのトリガーパイプラインによる処理
2 準リアルタイムとイベント駆動 2-1. ファイル到着イベントトリガーによる実行
3 リアルタイム 3-1. Sparkストリーミング処理
3-2. Databricksオートローダーによる処理
3-3. Delta live tableの連続パイプラインによる処理

リアルタイム処理を実装する際には、システム間を直接連携(例: Kafka -> Spark )させるのではなく、ストレージに対するオートローダーによる処理(例: Kafka -> ストレージ -> Databricks )がコストや可用性の観点で有効な場合もある。

3. メダリオンアーキテクチャによるデータエンジニアリング

メダリオンアーキテクチャとはソースシステムのデータを、Bronze、Silver、Goldの三層で管理する手法であり、それをベースにデータレイヤーの設計を行うべき。

データレイヤーに分けることにより、次のようなメリットがある。

  • データレイヤーごとの役割が明確となること
  • データ品質が担保されたデータの提供が可能となること
  • ローデータから再度テーブルを再作成できること

参考リンク

次がレイヤーのサンプルである。

# データレイヤー 概要 類義語
1 Bronze 生データを保持するレイヤー Raw、Data lake
2 Silver ソースシステムと同様の粒度で検証済みのデータを保持するレイヤー Enriched、DWH
3 Gold 集計したデータを保持するレイヤー Curated、Data Mart

Bronze の特徴について

  • 取り込んだ生データのコピーを、履歴として保持。
  • データを削除する場合には、物理削除ではなく、論理削除が推奨。
  • スキーマ展開を許可するなどソースシステム側の変更対応を容易化。
  • データ型を文字型として保持するなどシステムエラーの発生を低減。

Silver の特徴について

  • Bronze のデータに基づき、ソースシステムと同等のデータ粒度で保持。
  • スキーマを適用し、dropDuplicates 関数を利用した重複排除等によるデータ品質チェック処理を実施。

Gold の特徴について

  • データ利活用(消費)の目的に合致するように編成・集計したデータを保持。
  • ACLや行レベルセキュリティ等のデータアクセス制御を考慮することが多い。

参考リンク

3-1. Bronze テーブルへのデータ書き込み例

# 事前準備
db_name = 'sample_tpch'
brz_tbl_name = 'part_raw'
brz_tbl_full_name = f'{db_name}.{brz_tbl_name}'

spark.sql(f'''
CREATE DATABASE IF NOT EXISTS {db_name}
''')

# 
spark.sql(f"""
CREATE OR REPLACE TABLE {brz_tbl_full_name}
(
  p_partkey string,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size string,
  p_container string,
  p_retailprice string,
  p_comment string,
  _datasource string,
  _ingest_timestamp timestamp
)
USING delta
""")

image.png

src_file_path = 'dbfs:/databricks-datasets/tpch/data-001/part/part.tbl'

image.png

# ファイル内容の一部を確認
print(dbutils.fs.head(src_file_path, 1030))

image.png

from  pyspark.sql.functions import input_file_name,current_timestamp

# ソースファイルから読み込み
df = (spark
      .read
      .format("csv")
      .option("header", "False")
      .option("inferSchema", "False")
      .option("sep", "|")
      .load(src_file_path)
    )

# ソースファイルにヘッダーがないため、カラム名を変更
renamed_cols_names = {
    '_c0':'p_partkey',
    '_c1':'p_name',
    '_c2':'p_mfgr',
    '_c3':'p_brand',
    '_c4':'p_type',
    '_c5':'p_size',
    '_c6':'p_container',
    '_c7':'p_retailprice',
    '_c8':'p_comment',
}
for existing_col,new_col in renamed_cols_names.items():
    df = df.withColumnRenamed(existing_col, new_col)

# 最後のカラムを削除
df = df.drop('_c9')

# 監査列として、`_datasource`列と`_ingest_timestamp`列を追加
df = (df
        .withColumn("_datasource", input_file_name())
        .withColumn("_ingest_timestamp", current_timestamp())
     )

# `append`によりデータの書き込み
(df.write
     .format('delta')
     .mode('append')
     .option("mergeSchema", "true")
     .saveAsTable(brz_tbl_full_name)
)

image.png

# データを確認
spark.table(brz_tbl_full_name).printSchema()
display(spark.table(brz_tbl_full_name))

image.png

Spark SQL にて同等のことを実施することも可能

%sql
-- テキスト区切りファイルからデータを読み込む
CREATE OR REPLACE TEMPORARY VIEW tmp_part_001
USING csv
OPTIONS (
  path 'dbfs:/databricks-datasets/tpch/data-001/part/part.tbl',
  header false,
  SEP '|'
)
;

-- ソースファイルにヘッダーがないためカラム名を変更
-- 最後のカラム('_c9)を削除
CREATE OR REPLACE TEMPORARY VIEW tmp_part_002
AS
SELECT
  _c0 AS p_partkey,
  _c1 AS p_name,
  _c2 AS p_mfgr,
  _c3 AS p_brand,
  _c4 AS p_type,
  _c5 AS p_size,
  _c6 AS p_container,
  _c7 AS p_retailprice,
  _c8 AS p_comment
  FROM
    tmp_part_001
;

-- 監査列として、`_datasource`列と`_ingest_timestamp`列を追加
CREATE OR REPLACE TEMPORARY VIEW tmp_part_003
AS
SELECT
  *
  ,input_file_name() AS _datasource
  ,current_timestamp() AS _ingest_timestamp
  FROM 
    tmp_part_002
;

image.png

spark.conf.set('da.brz_tbl_full_name', brz_tbl_full_name)

image.png

%sql
-- `append`によりデータの書き込み

-- スキーマ展開を許可
SET spark.databricks.delta.schema.autoMerge = True;

INSERT INTO  ${da.brz_tbl_full_name}
SELECT
  *
  FROM
    tmp_part_003

image.png

%sql
SELECT
  version,
  operation,
  operationParameters

  FROM (
    DESCRIBE HISTORY sample_tpch.part_raw
  )

image.png

3-2. Silver テーブルへのデータ書き込み例

# 事前準備
slv_tbl_name = 'part'
slv_tbl_full_name = f'{db_name}.{slv_tbl_name}'

spark.sql(f"""
CREATE OR REPLACE TABLE {slv_tbl_full_name}
(
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string,
  _datasource STRING,
  _ingest_timestamp timestamp
)
USING delta
""")

image.png

from  pyspark.sql.functions import current_timestamp,lit

# 下記の処理を実行したデータフレーム(df)を作成
## 1. ブロンズテーブルから主キー(`p_partkey`)ごとに`_ingest_timestamp`列の最大日を抽出したサブセットを作成
## 2. 主キー+`_ingest_timestamp`列の条件で、1のサブセットとブロンズテーブルを結合
## 3. ブロンズテーブルのデータ型をシルバーテーブルと同一のデータ型に変換
brz_to_slv_sql = f'''
with slv_records (
  SELECT
    p_partkey,
    MAX(_ingest_timestamp) AS max_ingest_timestamp
    
    FROM
      {brz_tbl_full_name}
    GROUP BY
      p_partkey      
)

SELECT
  brz.p_partkey::long,
  brz.p_name,
  brz.p_mfgr,
  brz.p_brand,
  brz.p_type,
  brz.p_size::int,
  brz.p_container,
  brz.p_retailprice::decimal(12, 2),
  brz.p_comment,
  brz._datasource,
  brz._ingest_timestamp
  
  FROM
    {brz_tbl_full_name} AS brz
  INNER JOIN 
    slv_records AS slv
    ON 
      brz.p_partkey =  slv.p_partkey
      AND brz._ingest_timestamp =  slv.max_ingest_timestamp
'''
df = spark.sql(brz_to_slv_sql)

# dropDuplicates関数にて、主キーの一意性を保証。連携日ごとの一意性が保証されないことがあるため。
df = df.drop_duplicates(['p_partkey'])



# 一時ビューからシルバーテーブルに対して、MERGE文によりアップサート処理を実施。
# 一時ビューの`_ingest_timestamp`列がシルバーテーブルの`_ingest_timestamp`列以降である場合のみ、UPDATE処理を実行。

# 一時ビューを作成
temp_view_name = f'_tmp_{brz_tbl_name}'
df.createOrReplaceTempView(temp_view_name)


# Merge処理を実行
spark.sql(f'''
MERGE INTO {slv_tbl_full_name} AS tgt
  USING {temp_view_name} AS src
  
  ON tgt.p_partkey = src.p_partkey
  
  WHEN MATCHED
  AND tgt._ingest_timestamp < src._ingest_timestamp
    THEN UPDATE SET *
  WHEN NOT MATCHED
    THEN INSERT *
''')

image.png

spark.table(slv_tbl_full_name).display()

image.png

3-3. Gold テーブルへのデータ書き込み例

# 事前準備
gld_tbl_name = 'part_counts_by_mfgr'
gld_tbl_full_name = f'{db_name}.{gld_tbl_name}'

# `p_mfgr`ごとのカウント数を保持したデータフレームを定義
slv_to_gld_sql = f"""
SELECT
  p_mfgr,
  count(*) AS part_counts
  
  FROM
    {slv_tbl_full_name}
  GROUP BY
    p_mfgr    
"""
df = spark.sql(slv_to_gld_sql)


# CTAS(CREAT TABLE AS SLECT)により、テーブルを作成。
## 一時ビューを作成
tmp_view_name = f'_tmp_{slv_tbl_name}'
df.createOrReplaceTempView(tmp_view_name)

## CTASを実行
ctas_sql = f'''
create or replace table {gld_tbl_full_name}
  using delta
  TBLPROPERTIES (
    delta.autoOptimize.optimizeWrite = True, 
    delta.autoOptimize.autoCompact   = True,
    delta.dataSkippingNumIndexedCols = 1
  )
  AS 
  select 
    * 
    from 
      {tmp_view_name}
'''
spark.sql(ctas_sql)

image.png

spark.table(gld_tbl_full_name).display()

image.png

4. Spark によるデータエンジニアリングに利用すべきプログラミング言語とライブラリー

4-1. PySpark、および、Spark SQLの利用が推奨

Spark では、ScalaやRなどの多様な言語で実装できるが、プログラミング言語の普及率やSparkに関する情報量の観点で、PySpark、および、Spark SQL を用いることが推奨。

異なる言語でデータフレームのやり取りを行う際には、 Spark 一時ビュー経由で行う方法が容易である。

データエンジニアリング時には、PySpark、あるいは、Spark SQL のいずれかでのみでの実施も可能であるが、変数やコンポーネント化を実施するために Python の基本的なスキルが必要となる。

4-2. PySpark(Spark SQL)、pandas-on-Spark、Pandas の利用指針

PySpark(Spark SQL)、pandas-on-Spark、Pandas の順での利用が推奨。PySpark -> pandas-on-Spark -> PySpark の順で利用する場合には性能に課題をかかえることがある。

Excel や SAS などのファイルを読み込む際には、Pandas で読み込み、PySpark に変換することで追加のコンポーネントが必要なくなる場合があり、データ量を想定した上で対応方針を検討する。

5. 処理の共通化

5-1. 処理を共通化する方法

システム規模やメンバーのスキルセットに応じて、次のような開発方針を定める。

  1. PySpark、あるいは、Spark SQL により個別処理を実装する方法
  2. Python により処理を共通化して実装する方法
  3. 共通化処理を保持させたライブラリーを用いて実装する方法

共通化処理を保持させたライブラリーを用いた実装を複数チームで行う場合には、複数プロジェクトの総意をリードするCoE(center of excellence)のような組織が必要となるなど難易度は高い。

1. PySpark、あるいは、Spark SQL により個別処理を実装する方法

from  pyspark.sql.functions import input_file_name,current_timestamp

# ソースファイルから読み込み
df = (spark
      .read
      .format("csv")
      .option("header", "False")
      .option("inferSchema", "False")
      .option("sep", "|")
      .load(src_file_path)
    )

# ソースファイルにヘッダーがないため、カラム名を変更
renamed_cols_names = {
    '_c0':'p_partkey',
    '_c1':'p_name',
    '_c2':'p_mfgr',
    '_c3':'p_brand',
    '_c4':'p_type',
    '_c5':'p_size',
    '_c6':'p_container',
    '_c7':'p_retailprice',
    '_c8':'p_comment',
}
for existing_col,new_col in renamed_cols_names.items():
    df = df.withColumnRenamed(existing_col, new_col)

# 最後のカラムを削除
dropped_cols_names = ['_c9']
for dropped_cols_name in dropped_cols_names:
    df = df.drop(dropped_cols_name)

# 監査列として、`_datasource`列と`_ingest_timestamp`列を追加
df = (df
      .withColumn("_datasource", input_file_name())
       .withColumn("_ingest_timestamp", current_timestamp())
     )

image.png

2. Python により処理を共通化して実装する方法

# 共通化した関数
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name,current_timestamp

def read_from_csv(
    header,
    inferSchema,
    sep,
    src_file_path,
):
    spark = SparkSession.getActiveSession()
    return (
        spark
        .read
        .format("csv")
        .option("header", header)
        .option("inferSchema", inferSchema)
        .option("sep", sep)
        .load(src_file_path)
    )

def rename_cols(
    df,
    renamed_cols_names,
):
    for existing_col,new_col in renamed_cols_names.items():
        df = df.withColumnRenamed(existing_col, new_col)
    return df

def drop_cols(
    df,
    cols,
):
    for col in cols:
        df = df.drop(col)
    return df

def with_audit_cols(
    df,
):
    return (
        df
        .withColumn("_datasource", input_file_name())
        .withColumn("_ingest_timestamp", current_timestamp())
    )

image.png

# 個別処理の設定値
src_file_path = 'dbfs:/databricks-datasets/tpch/data-001/part/part.tbl'

image.png

# データエンジニアリングのコード
spark = SparkSession.builder.getOrCreate()

df_1 = read_from_csv(
    header = "False",
    inferSchema = "False",
    sep = "|",
    src_file_path = src_file_path,
)

renamed_cols_names = {
    '_c0':'p_partkey',
    '_c1':'p_name',
    '_c2':'p_mfgr',
    '_c3':'p_brand',
    '_c4':'p_type',
    '_c5':'p_size',
    '_c6':'p_container',
    '_c7':'p_retailprice',
    '_c8':'p_comment',
}
df_2 = rename_cols(df_1,renamed_cols_names) 

dropped_cols_names = ['_c9']
df_3 = drop_cols(df_2,dropped_cols_names)

df_4 = with_audit_cols(df_3)

image.png

# データを確認
df_4.display()

image.png

3. 共通化処理を保持させたライブラリーを用いて実装する方法

# クラスを定義
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name,current_timestamp

class DataEngineering:

    def __init__(self):
        # `data_engineer_main*`で利用している変数
        self.spark = SparkSession.builder.getOrCreate()
        self.read_stage_dataframes = {}

    def data_engineering_main(
            self,
            data_eng_conf,
        ):
        read_stage_dataframes = {}
        for tasks_config_name, tasks_config_value in data_eng_conf.items():
            method_name = tasks_config_value['method']
            task_options = tasks_config_value['task_options']

            # メソッド実行時の引数の文字列を生成
            task_options_paras_list = []
            # 2回目以降のループ時のみ、以前の処理結果のデータフレームを取得
            if self.read_stage_dataframes != {}:
                read_stage_df = list(self.read_stage_dataframes.values())[-1]
                task_options_paras_list.append('tgt_df=read_stage_df')

            for task_options_key, task_options_value in task_options.items():
                # 引数が文字の場合には、`"""`で囲む
                if type(task_options_value) == str:
                    task_options_value = f'"""{task_options_value}"""'
                task_options_paras_list.append(f'{task_options_key}={task_options_value}')
            task_options_paras = ','.join(task_options_paras_list)

            read_stage_dataframe = eval(f'{method_name}({task_options_paras})')
            self.read_stage_dataframes[tasks_config_name] = read_stage_dataframe

    @staticmethod
    def read_from_csv(
        header,
        inferSchema,
        sep,
        src_file_path,
    ):
        spark = SparkSession.getActiveSession()
        return (
            spark
            .read
            .format("csv")
            .option("header", header)
            .option("inferSchema", inferSchema)
            .option("sep", sep)
            .load(src_file_path)
        )

    @staticmethod
    def rename_cols(
        tgt_df,
        renamed_cols_names,
    ):
        for existing_col,new_col in renamed_cols_names.items():
            tgt_df = tgt_df.withColumnRenamed(existing_col, new_col)
        return tgt_df

    @staticmethod
    def drop_cols(
        tgt_df,
        dropped_cols_names,
    ):
        for col in dropped_cols_names:
            tgt_df = tgt_df.drop(col)
        return tgt_df

    @staticmethod
    def with_audit_cols(
        tgt_df,
    ):
        return (
            tgt_df
            .withColumn("_datasource", input_file_name())
            .withColumn("_ingest_timestamp", current_timestamp())
        )

image.png

# config を設定
data_eng_conf = {
    'read_from_data_source': {
        'method': 'DataEngineering.read_from_csv',
        'task_options': {
            'header': 'False',
            'inferSchema': 'False',
            'sep': '|',
            'src_file_path': 'dbfs:/databricks-datasets/tpch/data-001/part/part.tbl',
        },
    },
    'rename_cols_names': {
        'method': 'DataEngineering.rename_cols',
        'task_options': {
            'renamed_cols_names': {
                '_c0':'p_partkey',
                '_c1':'p_name',
                '_c2':'p_mfgr',
                '_c3':'p_brand',
                '_c4':'p_type',
                '_c5':'p_size',
                '_c6':'p_container',
                '_c7':'p_retailprice',
                '_c8':'p_comment',
            },
        },
    },
    'drop_cols': {
        'method': 'DataEngineering.drop_cols',
        'task_options': {
            'dropped_cols_names': ['_c9'],
        },
    },
    'with_audit_cols': {
        'method': 'DataEngineering.with_audit_cols',
        'task_options': {},
    },
}

image.png

# 処理を実行
data_engineering = DataEngineering()
data_engineering.data_engineering_main(data_eng_conf)

image.png

# 実行計画を確認
import pprint
pprint.pprint(data_engineering.read_stage_dataframes)

image.png

# 最終的なデータフレームを確認
display(list(data_engineering.read_stage_dataframes.values())[-1])

image.png

5-2. 監査列の付与方法

付与する監査列を検討を行い、その監査列を付与する関数の利用がおすすめ。

本ノートブックでは次のカラム付与を行っている。

# カラム名 概要
1 _datasource 取り込みファイルの完全パス
2 _ingest_timestamp データ取り込みを実施した日付時刻(グリニッジ標準時)
def with_audit_cols(tgt_df):
    return (tgt_df
        .withColumn("_datasource", input_file_name())
        .withColumn("_ingest_timestamp", current_timestamp())
     )

image.png

5-3. UDF(User-defined scalar functions)の利用方針

性能のボトルネットとなるため、基本的には利用しないこと。利用する場合には、UDFの管理業務フローを含めて設計すること。

6. データエンジニアリングにおける Spark 関連機能

6-1. Spark コア機能

1. データフレーム、および、データストリーム時におけるデータソースごとのパラメータ

CSV や JSON などのデータソースごとに設定できるパラメータが異なるため、Spark Dcos 、および、PySpark Docs にて確認する。

JDBC 、Spark コネクターについては、提供元のドキュメント等にて確認する。

2. Spark テーブルにおける外部テーブルとマネージドテーブルの利用指針

Spark テーブルには、外部テーブル(アンマネージドテーブル)とマネージドテーブルがあるが、基本的には外部テーブルを利用すること。

テスト実行時や短期的な利用を目的とする場合には、マネージドテーブルを利用してもよい。マネージドテーブルのロケーションを参照した外部テーブルを作成することも可能。

マネージドテーブルを削除する際には、データサイズによっては事前にデータを削除する必要あり。

def get_table_location(
    tgt_full_name: str,
):
    spark = SparkSession.getActiveSession()

    location = (
        spark.sql(f"DESC TABLE EXTENDED {tgt_full_name}")
        .filter('col_name = "Location"')
        .select("data_type")
        .collect()[0][0]
    )
    return location

image.png

db_name = 'sample_tpch'
tbl_name = 'orders'
tbl_full_name = f'{db_name}.{tbl_name}'

spark.sql(f'''
CREATE OR REPLACE TABLE {tbl_full_name}
(
    o_orderkey long,
    o_custkey long,
    o_orderstatus string,
    o_totalprice decimal(12, 2),
    o_orderdate date,
    o_orderpriority string,
    o_clerk string,
    o_shippriority int,
    o_comment string
)
USING delta
''')

image.png

# tbl_location を取得
tbl_location = get_table_location(tbl_full_name)
print(tbl_location)
spark.table(f"delta.`{tbl_location}`").printSchema()

image.png

# マネージドテーブルの Location を参照した外部テーブルを作成
db_name = 'sample_tpch'
tbl_name_2 = 'orders_2'
tbl_full_name_2 = f'{db_name}.{tbl_name_2}'

spark.sql(f'''
DROP TABLE IF EXISTS {tbl_full_name_2}
''')
spark.sql(f'''
CREATE TABLE {tbl_full_name_2}
USING delta
LOCATION '{tbl_location}'
''')

print(spark.sql(f'SHOW CREATE TABLE {tbl_full_name_2}').collect()[0][0])

image.png

# 外部テーブルを削除しても、マネージドテーブルは残っている
spark.sql(f'''
DROP TABLE IF EXISTS {tbl_full_name_2}
''')

spark.table(f"delta.`{tbl_location}`").printSchema()

image.png

3. シークレットの利用

Spark を利用する際には、パスワード等のシークレットが平文で表示されないように、Spark プロバイダーが提供するシークレット管理機能を用いる必要がある。

4. サロゲートキー列の付与

サロゲートキー列を付与する場合には、プログラムの複雑度と要求性能に応じて、次のいずれの方法での実装を検討。

自テーブルのサロゲートキー列の値を他テーブルに保持させる場合には、プログラムが複雑になる可能性がある。

# 実装方法 サロゲートキーカラムのデータ型 再現性
1 主キー項目によるハッシュ値を利用する方法 文字列
2 Delta Lake の Identity Column 機能を利用する方法 数値型
3 テーブルへの書き込み時に連番を付与する方法 数値型

5. Spark におけるタイムスタンプ型カラムの仕様

Spark におけるタイムスタンプ型には、次の注意事項がある。

  • タイムゾーンのオフセットは、Spark で処理する際に損失し、Spark セッションのタイムゾーンに統一される。
  • データ取得元のタイムゾーンを保持する場合には、別カラムにタイムゾーンの値を保持する方法、あるいは、タイムゾーンのオフセットを加減した値を保持する方法のいずれかを実施する必要がある。
  • Spark ではTIMESTAMP WITH SESSION TIME ZONEの timestamp 型のみがサポートさていることから、後者の方法を採用する場合にはspark.sql.session.timeZoneの固定化が必要となる。

参考リンク

Oracle database などのデータベースでは TIMESTAMP WITH TIME ZONE がサポートされており、Spark からデータを取得する場合には注意が必要となる。

# 事前準備
from pyspark.sql.functions import from_utc_timestamp,col
df = spark.createDataFrame(
        [
            (2020, 6, 28, 10, 31, 30, 'UTC'),
            (2019, 3, 1, 0, 1, 2, 'America/Los_Angeles'), 
            (2019, 11, 3, 1, 30, 2, 'America/Los_Angeles'), 
            (2019, 2, 28, 9, 29, 1, 'Asia/Tokyo'),
        ],
        ['YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'TZ']
)

image.png

# デフォルトのタイムゾーンを確認
spark.conf.unset('spark.sql.session.timeZone')
print(spark.conf.get('spark.sql.session.timeZone'))

image.png

# デフォルトのタイムゾーンに統一される
df.selectExpr(
    'make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as timestamp_with_session_time_zone',
).display()

image.png

# タイムゾーンを指定すると、それに統一される
spark.conf.set('spark.sql.session.timeZone', 'Asia/Tokyo')

df.selectExpr(
    'make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as timestamp_with_session_time_zone',
).display()

image.png

別カラムにタイムゾーンの値を保持する方法を実施する方法の例

spark.conf.unset('spark.sql.session.timeZone')
(
    df.selectExpr(
        'make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as timestamp_with_session_time_zone',
        'TZ AS time_zone_id',
    )
    .display()
)

image.png

タイムゾーンのオフセットを加減した値を保持する方法の例

spark.conf.unset('spark.sql.session.timeZone')
(
    df
    .selectExpr(
        'make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, TZ) as timestamp_with_session_time_zone',
        'TZ AS time_zone_id',
    )
    .withColumn(
        'timestamp_without_time_zone',
        from_utc_timestamp(col('timestamp_with_session_time_zone'),col('time_zone_id'))
    )
    .drop('time_zone_id')
    .display()
)

image.png

6. 破損したファイル(corrupt files)を参照する場合の対応方法

CSV や JSON などのファイルを適切に読めこめないことをファイル破損(corrupt files)と言われており、PySpark では次の方法により破損理由を探すことができる。

区切りテキストファイル(CSV、TSV) のファイル破損(corrupt CSV files)の原因としては次のようなものがある。

  • 全体の設定値が適切でないこと
    • ファイルの文字エンコーディング(encoding)(例: UTF-8 、 Shift_JIS )が適切でない
    • 区切り文字(separator)が適切でない
    • 改行コード(delimiter)が適切でない
    • テキスト修飾子(quote)が適切でない
    • テキスト修飾子のエスケープ(escape)が適切でない
    • 改行コード(line separator)適切でない
    • ヘッダーの有無(header)が適切でない
    • ファイル圧縮の形式(compression)が適切でない
  • 個別の値が適切でないこと
    • テキスト修飾子が指定されてない場合に、改行コードが値に含まれている
    • テキスト修飾子のエスケープが追記されていないテキスト修飾子が値に含まれている
    • 想定のデータ型と一致していない
      • 桁数が想定値を上回っている
      • 数値型である値に、,$などの記号が記載されている
      • 日付型のフォーマット(dateFormat)、あるいは、タイムスタンプ型のフォーマット(timestampFormat)が異なる
      • ヘッダーが有無の設定が適切でなく、ヘッダーのカラム名がレコードとして認識されている

区切りテキストファイルにおける設定値は、次のリンクが参考となる。

6-2. Delta Lake 機能

1. Delta Lake におけるタイムトラベル機能

Delta Lake形式のテーブルでは、parquet ファイルが追記される仕様であることから、過去の時点のバージョンへのクエリ(タイムトラベル)が可能。

保存期間は次のパラメータと VACUUM 操作に依存。

  • delta.logRetentionDuration
  • delta.deletedFileRetentionDuration

参考リンク

誤って書き込みを実施してしまったデータを削除する場合には、追加のオペレーションが必要となる場合がある。

2. Delta Lake における変更データフィード機能

変更データフィードを有効にすることで行レベルでの追跡が有効となるため、基本的には有効とすること。

設定による性能への大きな影響はないとの記載あり。

変更データ フィードを有効にするオーバーヘッドはありますか。
大きな影響はありません。

引用元:データ フィードの変更 - Azure Databricks | Microsoft Docs

3. Delta Lake における CLONE 機能

マルチクラウド・マルチリージョンでの展開を行う際に、増分更新が実際されることもあり、データ書き込み後のDEEP CLONE が有効。

2022年7月25日時点で、CLONE は Databricks でのみ利用でき、OSS の Delta Lake に実装予定。

4. Delta Lake における Delta Sharing 機能

# Delta Sharing 機能が GA 後に検証予定

7. パフォーマン最適化

7-1. 他データストアと連携する場合にSpark コネクターを優先して利用

Spark にてデータベースからデータ連携を行う場合には、jdbc や Python のライブラリーを利用せずに、Spark コネクターを利用すること。

7-2. Delta Lakeにおけるパフォーマンス最適化

次のようなドキュメントを参考に、テーブルプロパティやデータ連携前後の処理を検討。

7-3. データエンジニアリング時におけるクラスターの利用指針

分散処理でデータエンジニアリングを行う場合には、複数台の汎用的なサーバーにて処理を行われることがあるが、次のような記載がある通り、小数のサーバーの方が性能が高くなることがある。

複数のテーブルにまたがる和集合や結合を必要とする処理など、より複雑な ETL ジョブは、シャッフルされるデータ量を最小限に抑えることができれば最適に実行されます。 クラスター内のワーカー数を減らすことでシャッフルを最小限に抑えることができるため、クラスター D のような大規模なクラスターよりも、次の図のクラスター A のような小さなクラスターを検討することをお勧めします。

引用元:複雑なバッチ ETL | Microsoft Docs

リソースのクリーンアップ

db_name = 'sample_tpch'

spark.sql(f'''
DROP DATABASE {db_name} CASCADE
''')

image.png

9
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?