LoginSignup
3
2

More than 1 year has passed since last update.

Delta Lakeテーブルのバッチ読み込み・書き込み

Last updated at Posted at 2022-03-06

Table batch reads and writes | Databricks on AWS [2022/2/2時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Delta Lakeでは、テーブルに対するバッチでの読み書きを実行するためのApache Spark DataFrameの読み書きのためのAPIで提供されるオプションの大部分をサポートしています。

Delta LakeのSQLコマンドに関しては以下を参照ください。

テーブルの作成

Delta Lakeでは2つのタイプのテーブルの作成をサポートしています。メタストアで定義されるテーブルとパスによって定義されるテーブルです。

以下の方法でテーブルを作成することができます。

  • SQL DDLコマンド: Deltaテーブルを作成するために、Apache Sparkでサポートされている標準的なSQL DDLコマンド(例:CREATE TABLEREPLACE TABLE)を使用することができます。

    SQL
    CREATE TABLE IF NOT EXISTS default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
    CREATE OR REPLACE TABLE default.people10m (
      id INT,
      firstName STRING,
      middleName STRING,
      lastName STRING,
      gender STRING,
      birthDate TIMESTAMP,
      ssn STRING,
      salary INT
    ) USING DELTA
    
  • DataFrameWriter API: テーブルの作成と、Sparkデータフレーム、データセットからのデータの挿入を同時に行いたい場合には、SparkのDataFrameWriter(ScalaまたはJavaPython)を使用することができます。

    Python
    # Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it
     df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    
    Scala
    // Create table in the metastore using DataFrame's schema and write data to it
    df.write.format("delta").saveAsTable("default.people10m")
    
    // Create table with path using DataFrame's schema and write data to it
    df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
    
    • Databricksランタイム8.0以降では、Delta Lakeがデフォルトのフォーマットとなっているので、USING DELTA, format("delta")using("delta")を指定する必要はありません。
    • Databricksランタイム7.0以降では、Deltaテーブルを作成するにSparkのDataFrameWriterV2 APIを使用することができます。
  • DeltaTableBuilder API: テーブルの作成にDelta LakeのDeltaTableBuilder APIを使用することもできます。DataFrameWriter APIと比べて、このAPIではカラムのコメントやテーブルプロパティ、ジェネレーテッドカラムのような追加情報を簡単に指定することができます。

プレビュー
本機能はパブリックプレビューです。

注意
本機能はDatabricksランタイム8.3以降で利用できます。

Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

詳細はAPI documentationをご覧ください。

テーブルの作成もご覧ください。

データのパーティション

パーティションカラムを含む述語を持つクエリー、DMLを高速化するためにデータのパーティションを作成することができます。Deltaテーブルを作成する際にデータのパーティションをサック制するには、カラムでパーティションを指定します。以下の例ではgenderでパーティションを作成しています。

SQL
-- Create table in the metastore
CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)
Python
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()
Scala
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")

DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

データの保存場所の制御

メタストアで定義されるテーブルに対して、オプションでパスをLOCATIONで指定することができます。LOCATIONを指定して作成されたテーブルは、メタストアによる管理がされないもの(アンマネージド)とみなされます。パスが指定されないマネージドテーブルと異なり、アンマネージドテーブルのファイルは、テーブルをDROPしても削除されません。

Delta Lakeを用いてすでにデータが格納されている場所にLOCATIONを指定してCREATE TABLEを実行すると、Delta Lakeは以下を実行します。

  • 以下のようにテーブル名と場所のみを指定したとします。

    SQL
    CREATE TABLE default.people10m
    USING DELTA
    LOCATION '/tmp/delta/people10m'
    

    メタストアのテーブルは自動で既存データのスキーマ、パーティション、テーブルプロパティを継承します。この機能はメタストアにデータを「インポート」するために使用されます。

  • 何かしらの設定(スキーマ、パーティション、テーブルプロパティ)を行なった場合、Delta Lakeは指定された内容が既存データと完全に一致するかどうかを検証します。

重要!
指定された設置が既存の設定と完全に一致しない場合、Delta Lakeは不一致を告げる例外をスローします。

注意
メタストアはDeltaテーブルの最新情報に関する信頼できる情報源(the source of truth)ではありません。実際には、メタストアのテーブル定義はスキーマやプロパティのような全てのメタデータを保持していない場合があります。これは、テーブルの場所を保持しており、その場所にあるテーブルのトランザクションログが信頼できる情報源となります。システムからDelta固有の変化に気づいていないメタストアにクエリーを実行した場合、不完全あるいは古いテーブル情報を参照する場合があります。

ジェネレーテッドカラムの使用

プレビュー
本機能はパブリックプレビューです。

注意
本機能はDatabricksランタイム8.3以降で利用できます。

Delta Lakeでは、Deltaテーブルの他のテーブルに対してユーザー定義関数を適用することで自動的に値を生成する特殊なカラムであるジェネレーテッドカラムをサポートしています。ジェネレーテッドカラムを持つテーブルに書き込みを行い、当該カラムに値を明示的に指定しない場合、Delta Lakeは自動で値を計算します。例えば、タイムスタンプのカラムから自動で(日付でテーブルをパーティショニングするために)日付カラムを作成することができます。等がテーブルに書き込みを行う際にはタイムスタンプのデータだけで十分です。しかし、これらのカラムに明示的に値を指定した場合には、制約(<value> <=> <generation expression>) IS TRUEを満足する必要があり、満たさない場合にはエラーとともに失敗します。

以下の例では、ジェネレーテッドカラムを持つテーブルの作成方法を示しています。

SQL
CREATE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  dateOfBirth DATE GENERATED ALWAYS AS (CAST(birthDate AS DATE)),
  ssn STRING,
  salary INT
)
USING DELTA
PARTITIONED BY (gender)
Python
DeltaTable.create(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .partitionedBy("gender") \
  .execute()
Scala
DeltaTable.create(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn(
    DeltaTable.columnBuilder("dateOfBirth")
     .dataType(DateType)
     .generatedAlwaysAs("CAST(dateOfBirth AS DATE)")
     .build())
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .partitionedBy("gender")
  .execute()

ジェネレーテッドカラムは通常カラムのように格納され、ストレージ容量を必要とします。

ジェネレーテッドカラムには以下の制限が適用されます。

  • ジェネレーションの表現では、以下のタイプの関数以外で、同じ引数に対して同じ値を返却するSparkのSQL関数を使用することができます。
    • ユーザー定義関数
    • 集約関数
    • ウィンドウ関数
    • 複数行を返却する関数
  • Databricksランタイム9.1以降では、spark.databricks.delta.schema.autoMerge.enabledをtrueに設定することで、MERGEオペレーションでジェネレーテッドカラムを使用することができます。

テーブルの読み込み

テーブル名あるいはパスを指定することでDeltaテーブルをデータフレームにロードすることができます。

SQL
SELECT * FROM default.people10m   -- query table in the metastore

SELECT * FROM delta.`/tmp/delta/people10m`  -- query table by path
Python
spark.table("default.people10m")    # query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  # query table by path
Scala
spark.table("default.people10m")      // query table in the metastore

spark.read.format("delta").load("/tmp/delta/people10m")  // create table by path

import io.delta.implicits._
spark.read.delta("/tmp/delta/people10m")

いかなるクエリーにおいても、返却されるデータフレームは自動で最新のテーブルのスナップショットを読み込みます。REFRESH TABLEを実行する必要はありません。Delta Lakeはクエリーに利用できる述語が存在する場合には、最小限の量のデータを読み込むように自動でパーティションや統計情報を使用します。

過去のテーブルのスナップショットのクエリー(タイムトラベル)

Delta Lakeのタイムトラベルを用いることで、Deltaテーブルの過去のスナップショットに対してクエリーを実行することができます。タイムトラベルには以下のように多くのユースケースが存在しています。

  • 分析、レポート、出力(例えば、機械学習モデルの出力)の再作成
  • 複雑な一時的なクエリーの記述
  • データの誤りの修正
  • 高速に変化するテーブルに対するクエリーのスナップショット分離の提供

このセクションでは、過去のバージョンのテーブルに対するクエリーでサポートされている方法とデータ保持に関する注意事項、サンプルを説明します。

文法

このセクションでは、どのようにDeltaテーブルの過去のバージョンに対してクエリーを実行するのかを説明します。

SQLのAS OF文法

SQL
SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version

ここでは、

  • timestamp_expressionは以下のいずれかとなります。
    • '2018-10-18T22:15:12.013Z'、すなわち、タイムスタンプにキャストできる文字列
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'、すなわち、日付の文字列
    • Databricksランタイム6.6以降では以下も使用できます。
      • current_timestamp() - interval 12 hours
      • date_sub(current_date(), 1)
      • タイムスタンプ、あるいはタイムスタンプにキャストできる他の任意の表現
  • versionは、DESCRIBE HISTORY table_specの出力から取得できるlong値です。

timestamp_expressionversionをサブクエリーで使用することはできません。

サンプル

SQL
SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

DataFrameReaderのオプション

DataFrameReaderのオプションを用いることで、特定バージョンのDeltaテーブルからデータフレームを作成することができます。

Python
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")

timestamp_stringに対しては、日付、タイムスタンプの文字列のみを指定することができます。例えば、"2019-01-01""2019-01-01T00:00:00.000Z"といったものです。

一般的なパターンには、Deltaテーブルの最新状態を使用して、下流のアプリケーションをアップデートするDatabricksジョブの実行を行うというものがあります。

Deltaテーブルは自動でアップデートするので、背後のデータがアップデートされている間に読み込まれると、Deltaテーブルからロードしたデータフレームは異なる結果を返却する場合があります。タイムトラベルを用いることで、呼び出しによって返却されるデータフレームのデータを固定することができます。

Python
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/tmp/delta/people10m`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")

タイムトラベルのサンプル

  • ユーザー111を間違って削除してしまった際の復旧
SQL
INSERT INTO my_table
  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
  WHERE userId = 111
  • テーブルに対する誤ったアップデートからの復旧
SQL
MERGE INTO my_table target
  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
  ON source.userId = target.userId
  WHEN MATCHED THEN UPDATE SET *
  • 先週追加された新規顧客数に対するクエリー
SQL
SELECT count(distinct userId) - (
  SELECT count(distinct userId)
  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))

データの保持

以前のバージョンにタイムトラベルするには、当該バージョンのログとデータファイルの両方を保持する必要があります。

Deltaテーブルの背後にあるデータファイルは自動では決して削除されません。データファイルはあなたがVACUUMを実行した時のみ削除されます。VACUUMはDeltaログファイルを削除しません。ログファイルはチェックポイントが書き込まれた後に自動でクリーンアップされます。

デフォルトでは以下のことを行わない限り、過去30日までのDeltaテーブルにタイムトラベルすることができます。

  • お使いのDeltaテーブルにVACUUMを実行する。

  • 以下のテーブルプロパティを用いてデータ、ログファイルの保持期間を変更する。

    • delta.logRetentionDuration = "interval <interval>": テーブル履歴を保持する期間を制御します。デフォルトはinterval 30 daysです。

    チェックポイントが書き込まれるたびに、Databricksは自動で保持期間より古いログのエントリーをクリーンアップします。この設定を大きくすれば、どれだけ多くのログエントリーが保持されます。ログに対するオペレーションの時間は一定なので、これはパフォーマンスにインパクトを及ぼしません。履歴に対するオペレーションは並列化されますが、ログのサイズが増えるとよりコストを要するものになるでしょう。

    • delta.deletedFileRetentionDuration = "interval <interval>": VACUUMの候補になる前に、どれだけ古いファイルが削除されるべきかを制御します。デフォルトはinterval 7 daysです。

    DeltaテーブルにVACUUMを実行しても30日の履歴データにアクセスできるようにするためには、delta.deletedFileRetentionDuration = "interval 30 days"を設定します。この設定によってストレージコストが増加する場合があります。

テーブルへの書き込み

追加

既存のDeltaテーブルに原子的に新規データを追加するには、appendモードを使用します。

SQL
INSERT INTO default.people10m SELECT * FROM morePeople
Python
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
Scala
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("append").delta("/tmp/delta/people10m")

上書き

テーブルの全データを原子的に置き換えるには、overwriteモードを使用します。

SQL
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
Python
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
Scala
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")

データフレームを用いることで、任意の表現に合致するデータのみを選択的に上書きすることができます。この機能はDatabricksランタイム9.1LTS以降で利用できます。以下のコマンドは、start_dateでパーティション分けされており、dfに格納されているターゲットテーブルの1月のイベントのみを原子的に置換します。

Python
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
  .save("/tmp/delta/events")
Scala
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

このサンプルコードはdfのデータを書き出し、述語にに全てが合致しているかを検証し、原子的な置換を行います。述語に合致していないデータを書き出し、ターゲットテーブルと合致する行を置換したい場合には、spark.databricks.delta.replaceWhere.constraintCheck.enabledをfalseに設定して制約チェックを無効化することができます。

Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

Databricksランタイム9.0以下では、replaceWhereはパーティションカラムのみに対する述語に合致するデータを上書きます。以下のコマンドは、dateでパーティション分けされており、dfに格納されているターゲットテーブルの1月のイベントのみを原子的に置換します。

Python
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
  .save("/tmp/delta/people10m")
Scala
df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Databricksランタイム9.1以降で過去の挙動に戻したい場合には、spark.databricks.delta.replaceWhere.dataColumns.enabledフラグを無効にします。

Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

注意
Apache Sparkのfile APIと異なり、Delta Lakeはテーブルのスキーマを記憶し、強制します。これはデフォルトでは、上書きは既存テーブルのスキーマを置換しないことを意味します。

ユーザー定義コミットメタデータの設定

これらのオペレーションによってなされるコミットにおけるメタデータとして、DataFrameWriterのオプションuserMetadataあるいはSparkSessionの設定spark.databricks.delta.commitInfo.userMetadataを用いて、ユーザー定義の文字列を指定することができます。両方が指定された場合、オプションの方が優先されます。このユーザー定義メタデータはhistoryオペレーションで参照することができます。

SQL
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople
Python
df.write.format("delta") \
  .mode("overwrite") \
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \
  .save("/tmp/delta/people10m")
Scala
df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/tmp/delta/people10m")

スキーマの検証

Delta Lakeは、書き込まれたデータフレームのスキーマとテーブルのスキーマに互換性があるのかを自動的に検証します。Delta Lakeはテーブルへのデータフレームの書き込みに互換性があるのかどうかを以下のルールを用いて検証します。

  • データフレームのカラムはターゲットテーブルに存在しなくてはなりません。データフレームのカラムがテーブルに存在しない場合、例外が起きます。テーブルに存在し、データフレームに存在しないカラムはnullに設定されます。
  • データフレームのカラムのデータ型とターゲットテーブルのデータ型は一致しなくてはなりません。合致しない場合、例外が起きます。
  • データフレームのカラム名は大文字小文字で区別されません。これは、同じテーブルに「Foo」、「foo」のようなカラムを定義できないことを意味します。Sparkをケースセンシティブモード、あるいはケースインセンシティブ(デフォルト)モードで動作させることはできますが、Parquetは格納時、カラム情報返却時においてケースセンシティブとなります。Delta Lakeでは大文字小文字は保存されますが、スキーマを保存する際には大文字小文字を区別せず、潜在的な誤りやデータ破壊、損失の問題を回避するためにこの制約を設けています。

Delta Lakeでは明示的に新規カラムを追加するためのDDLと、自動でスキーマをアップデートする機能をサポートしています。

partitionByのような他のオプションとappendノードを組み合わせて指定した場合、Delta Lakeはこれらがカッチするかを検証し、不一致がある場合には例外をスローします。partitionByが存在しない場合、追加は自動で既存データのパーティションに従います。

注意
Databricksランタイム7.0以降では、INSERT文ではスキーマ強制を提供しており、スキーマ進化をサポートしています。カラムのデータ型が安全にお使いのDelta Lakeテーブルのデータ型にキャストできない場合、ランタイム例外がスローされます。スキーマ進化が有効化されている場合、スキーマが進化できるように、お使いのスキーマ(あるいはネストされたカラム)の最後のカラムとして新規カラムが追加されます。

Delta Lakeにおけるスキーマ強制、スキーマ進化の詳細に関しては、以下のYouTube動画(55分)をご覧ください。

テーブルスキーマのアップデート

Delta Lakeではテーブルのスキーマをアップデートすることができます。以下のタイプの変更をサポートしています。

  • (任意の場所への)新規カラムの追加
  • 既存カラムの並び替え
  • 既存カラムの名前の変更

DDLを用いて明示的に変更することや、DMLを用いて暗黙的に変更することができます。

重要!
Deltaテーブルのスキーマをアップデートする際には、当該テーブルを読み込むストリームは停止されます。ストリームを継続する際には再始動してください。

推奨する方法に関しては、Databricksにおける構造化ストリーミングの本格運用をご覧ください。

明示的なスキーマのアップデート

テーブルのスキーマを明示的に変更するには、以下のDDLを使用することができます。

カラムの追加

SQL
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

デフォルトでは、null値の許可はtrueです。

ネストされたフィールドにカラムを追加するには以下を実行します。

SQL
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

サンプル

ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが以下のようなものだった場合、

- root
| - colA
| - colB
| +-field1
| +-field2

実行後のスキーマは以下のようになります。

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

注意
ネストされたカラムの追加はstructsのみがサポートされています。arrayやmapはサポートされていません。

カラムのコメント、並びの変更

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

ネストされたフィールドのカラムを変更するには、以下を使用します。

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]

サンプル

ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRSTを実行する前のスキーマが以下のようなものだった場合、

- root
| - colA
| - colB
| +-field1
| +-field2

実行後は以下のようになります。

- root
| - colA
| - colB
| +-field2
| +-field1

カラムの置換

SQL
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

サンプル

以下のDDLを実行すると、

SQL
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

以下のスキーマは

- root
| - colA
| - colB
| +-field1
| +-field2

このようになります。

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

カラム名の変更

プレビュー
本機能はパブリックプレビューです。

要件

  • Databricksランタイム10.2以降が必要です。
  • カラムに存在しているデータを変更せずにカラムの名前を変更するには、テーブルのカラムマッピングを有効化する必要があります。Delta column mappingを参照下さい。

カラム名を変更するには以下を実行します。

SQL
ALTER TABLE <table_name> RENAME COLUMN old_col_name TO new_col_name

ネストされたフィールドの名称を変更するには以下を実行します。

SQL
ALTER TABLE <table_name> RENAME COLUMN col_name.old_nested_field TO new_nested_field

サンプル

以下のコマンドを実行すると、

SQL
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

以下のスキーマは、

- root
| - colA
| - colB
| +-field1
| +-field2

以下のように変更されます。

- root
| - colA
| - colB
| +-field001
| +-field2

Delta column mappingを参照ください。

カラムの型、名称の変更

テーブルを再書き込みすることで、カラムのデータ型の変更や、カラムの削除が可能です。これを行うためには、overwriteSchemaオプションを使用します。

カラム型の変更

Python
spark.read.table(...) \
  .withColumn("birthDate", col("birthDate").cast("date")) \
  .write \
  .format("delta") \
  .mode("overwrite")
  .option("overwriteSchema", "true") \
  .saveAsTable(...)

カラム名の変更

Python
spark.read.table(...) \
  .withColumnRenamed("dateOfBirth", "birthDate") \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable(...)

自動スキーマアップデート

Delta Lakeは、DMLのトランザクション(追加あるいは上書き)の一部としてテーブルのスキーマを自動でアップデートし、画期込まれたデータをスキーマの互換性を確保します。

カラムの追加

以下のケースでは、データフレームに存在しているがテーブルにはないカラムは書き込みトランザクションの一部として自動で追加されます。

  • writeあるいはwriteStreamに、オプション.option("mergeSchema", "true")がある。
  • spark.databricks.delta.schema.autoMerge.enabledtrueになっている。

両方のオプションが設定されている場合、DataFrameWriterのオプションが優先されます。追加されたカラムは存在するstructの最後に追加されます。新規カラムを追加する際、大文字小文字は保持されます。

注意

  • テーブルアクセスコントロールが有効化されている場合(MODIFYを必要とするリクエストをALL PRIVILEGESを必要とするリクエストに引き上げることになるため)、mergeSchemaはサポートされません。
  • INSERT INTO.write.insertInto()ではmergeSchemaは使用できません。

NullTypeのカラム

ParquetではNullTypeをサポートしていないので、Deltaテーブルに書き込む際、データフレームからNullType カラムは削除されますが、スキーマには保存されます。このカラムに異なるデータ型を受け取った際、Delta Lakeは新たなデータ型にスキーマをマージします。Delta Lakeが既存カラムに対してNullTypeを受け取ると、書き込みの際に古いスキーマは維持され、新たなカラムは削除されます。

ストリーミングでのNullTypeはサポートされていません。ストリーミングを使用する際にはスキーマを設定する必要があるためであり、これは非常にまれなことです。また、ArrayTypeMapTypeのような複雑な型でもNullTypeは受け付けられません。

テーブルスキーマの置換

デフォルトでは、テーブルのデータの上書きではスキーマを上書きません。replaceWhereなしにmode("overwrite")を用いてテーブルを上書きする際、書き込まれるテーブルのスキーマを上書きしたいケースがあるかもしれません。overwriteSchemaオプションをtrueに設定することで、テーブルのスキーマやパーティションを置き換えることができます。

Python
df.write.option("overwriteSchema", "true")

テーブルに対するビュー

Delta Lakeでは、データソースのテーブルで行うのと同じように、Deltaテーブルに対するビューを作成することができます。

これらのビューとテーブルアクセスコントロールを組み合わせることで、行列レベルのセキュリティを実現することができます。

ビューを操作する際の大きな課題には、スキーマの解決があります。Deltaテーブルのスキーマを変更した場合、スキーマに追加されたあらゆる変更を考慮するように、派生ビューを再作成する必要があります。例えば、Deltaテーブルに新規カラムを追加した場合、ベースのテーブルから作成されたビューでこの新規カラムが利用できることを確認する必要があります。

テーブルプロパティ

CREATEALTERTBLPROPERTIESを用いることでテーブルプロパティとしてメタデータを格納することができます。

TBLPROPERTIESはDeltaテーブルのメタデータの一部として格納されます。特定の場所にDeltaテーブルが存在している場合、CREATE文で新たなTBLPROPERTIESを定義することはできません。詳細はテーブルの作成を参照ください。

さらに、挙動とパフォーマンスを調整するために、Delta Lakeでは特定のDeltaテーブルプロパティを提供しています。

  • Deltaテーブルにおける削除・更新のブロック: delta.appendOnly=true
  • タイムトラベルの保持期間の設定: delta.logRetentionDuration=<interval-string>delta.deletedFileRetentionDuration=<interval-string>。詳細はデータの保持を参照ください。
  • 統計情報を収集するカラムの数: delta.dataSkippingNumIndexedCols=<number-of-columns>。このプロパティは新規データが書き込まれるときにのみ反映されます。
  • S3メタデータのホットスポットを回避するためにプレフィックスをランダムにする: delta.randomizeFilePrefixes=true。大量の(秒あたり数千リクエスト)高速な読み書きオペレーションを必要とするテーブルに対しては、テーブル専用のS3バケットを作成し、ベストな体験を得るためにファイルのプレフィックスをランダムにすることを強くお勧めします。

注意
Deltaテーブルプロパティの変更は、他の同時書き込みオペレーションと競合しうる書き込みオペレーションであり、その場合処理が失敗します。テーブルに対する同時書き込みオペレーションが存在しな場合にのみ、テーブルプロパティを変更することをお勧めします。

また、Spark設定を用いて、Deltaテーブルへの最初のコミットからdelta.のプレフィックスのプロパティを設定することができます。例えば、プロパティdelta.appendOnly=trueでDeltaテーブルを初期化するには、Spark設定spark.databricks.delta.properties.defaults.appendOnlyを以下のようにtrueに設定します。

SQL
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
Python
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
Scala
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")

テーブルのメタデータ

Delta Lakeでは、テーブルメタデータを探索するための豊富な機能を提供しています。

SHOW [PARTITIONS | COLUMNS]DESCRIBE TABLEをサポートしています。以下を参照下さい。

また、以下のユニークなコマンドも提供しています。

DESCRIBE DETAIL

スキーマ、パーティション、テーブルサイズなどの情報を提供します。詳細はテーブル詳細を取得するを参照ください。

DESCRIBE HISTORY

オペレーション、ユーザーなどを含む履歴情報、テーブルへの書き込みに関するオペレーションメトリクスを提供します。テーブル履歴は30日保持されます。詳細はテーブル履歴を取得するを参照ください。

Dataサイドバーは、Deltaテーブルの詳細情報、履歴に対するビジュアルなビューを提供します。テーブルスキーマ、サンプルデータに加えて、Historyタブをクリックすることで、DESCRIBE HISTORYによって提供されるテーブル履歴を参照することができます。

ストレージ認証情報の設定

Delta LakeはストレージシステムにアクセスするためにHadoop FileSystem APIを使用します。ストレージシステムに対する認証情報はHadoop設定を通じて設定することができます。Delta LakeではApache Spark同様に、Hadoop設定を行うための複数の方法を提供しています。

Spark設定

クラスターでSparkアプリケーションを起動する際、カスタムのHadoop設定を渡すために、spark.hadoop.*の形式でSpark設定を行うことができます。例えば、spark.hadoop.a.b.cの値を設定すると、Hadoop設定a.b.cの値が引き渡され、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。

詳細はSparkの設定をご覧下さい。

SQLセッション設定

Spark SQLは現在の全てのSQLセッション設定をDelta Lakeに引渡し、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。例えば、SET a.b.c=x.y.zはDelta Lakeに対して、Hadoop設定a.b.cとして値x.y.zを引渡し、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。

DataFrameのオプション

Spark(クラスター)設定やSQLセッション設定を通じて、Hadoopのファイルシステム設定を行うことに加えDeltaでは、DataFrameReader.load(path)DataFrameWriter.save(path)を用いてテーブルを読み書きする際、DataFrameReaderDataFrameWriterのオプションからHadoopファイルシステムの設定の読み込みをサポートしています。

注意
この機能はDatabricksランタイム10.1以降で利用できます。

例えば、DataFrameのオプション経由でストレージの認証情報を引き渡すことができます。

Python
df1 = spark.read.format("delta") \
  .option("fs.s3a.access.key", "<access-key-1>") \
      .option("fs.s3a.secret.key", "<secret-key-1>") \
  .read("...")
df2 = spark.read.format("delta") \
  .option("fs.s3a.access.key", "<access-key-1>") \
      .option("fs.s3a.secret.key", "<secret-key-2>") \
  .read("...")
df1.union(df2).write.format("delta") \
  .mode("overwrite") \
  .option("fs.s3a.access.key", "<access-key-3>") \
      .option("fs.s3a.secret.key", "<secret-key-3>") \
  .save("...")
Scala
val df1 = spark.read.format("delta")
  .option("fs.s3a.access.key", "<access-key-1>")
      .option("fs.s3a.secret.key", "<secret-key-1>")
  .read("...")
val df2 = spark.read.format("delta")
  .option("fs.s3a.access.key", "<access-key-2>")
      .option("fs.s3a.secret.key", "<secret-key-2>")
  .read("...")
df1.union(df2).write.format("delta")
  .mode("overwrite")
  .option("fs.s3a.access.key", "<access-key-3>")
      .option("fs.s3a.secret.key", "<secret-key-3>")
  .save("...")

Data sources で、お使いのストレージに対するHadoopファイルシステム設定の詳細を確認することができます。

ノートブック

さまざまなDeltaテーブルのメタデータコマンドのサンプルに関しては、以下のノートブックの最後をご覧ください。

Delta Lakeバッチコマンドのノートブック

Databricks 無料トライアル

Databricks 無料トライアル

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2