Table batch reads and writes | Databricks on AWS [2022/2/2時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Lakeでは、テーブルに対するバッチでの読み書きを実行するためのApache Spark DataFrameの読み書きのためのAPIで提供されるオプションの大部分をサポートしています。
Delta LakeのSQLコマンドに関しては以下を参照ください。
- Databricksランタイム7.x以降: Delta Lake statements
- Databricksランタイム5.5 LTSおよび6.x: SQL reference for Databricks Runtime 5.5 LTS and 6.x
テーブルの作成
Delta Lakeでは2つのタイプのテーブルの作成をサポートしています。メタストアで定義されるテーブルとパスによって定義されるテーブルです。
以下の方法でテーブルを作成することができます。
-
SQL DDLコマンド: Deltaテーブルを作成するために、Apache Sparkでサポートされている標準的なSQL DDLコマンド(例:
CREATE TABLE
、REPLACE TABLE
)を使用することができます。SQLCREATE TABLE IF NOT EXISTS default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA CREATE OR REPLACE TABLE default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA
-
DataFrameWriter
API: テーブルの作成と、Sparkデータフレーム、データセットからのデータの挿入を同時に行いたい場合には、SparkのDataFrameWriter
(ScalaまたはJava、Python)を使用することができます。Python# Create table in the metastore using DataFrame's schema and write data to it df.write.format("delta").saveAsTable("default.people10m") # Create or replace partitioned table with path using DataFrame's schema and write/overwrite data to it df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
Scala// Create table in the metastore using DataFrame's schema and write data to it df.write.format("delta").saveAsTable("default.people10m") // Create table with path using DataFrame's schema and write data to it df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
- Databricksランタイム8.0以降では、Delta Lakeがデフォルトのフォーマットとなっているので、
USING DELTA, format("delta")
やusing("delta")
を指定する必要はありません。 - Databricksランタイム7.0以降では、Deltaテーブルを作成するにSparkの
DataFrameWriterV2
APIを使用することができます。
- Databricksランタイム8.0以降では、Delta Lakeがデフォルトのフォーマットとなっているので、
-
DeltaTableBuilder
API: テーブルの作成にDelta LakeのDeltaTableBuilder
APIを使用することもできます。DataFrameWriter APIと比べて、このAPIではカラムのコメントやテーブルプロパティ、ジェネレーテッドカラムのような追加情報を簡単に指定することができます。
プレビュー
本機能はパブリックプレビューです。
注意
本機能はDatabricksランタイム8.3以降で利用できます。
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
詳細はAPI documentationをご覧ください。
テーブルの作成もご覧ください。
データのパーティション
パーティションカラムを含む述語を持つクエリー、DMLを高速化するためにデータのパーティションを作成することができます。Deltaテーブルを作成する際にデータのパーティションをサック制するには、カラムでパーティションを指定します。以下の例ではgenderでパーティションを作成しています。
-- Create table in the metastore
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.partitionedBy("gender")
.execute()
データの保存場所の制御
メタストアで定義されるテーブルに対して、オプションでパスをLOCATION
で指定することができます。LOCATION
を指定して作成されたテーブルは、メタストアによる管理がされないもの(アンマネージド)とみなされます。パスが指定されないマネージドテーブルと異なり、アンマネージドテーブルのファイルは、テーブルをDROP
しても削除されません。
Delta Lakeを用いてすでにデータが格納されている場所にLOCATION
を指定してCREATE TABLE
を実行すると、Delta Lakeは以下を実行します。
-
以下のようにテーブル名と場所のみを指定したとします。
SQLCREATE TABLE default.people10m USING DELTA LOCATION '/tmp/delta/people10m'
メタストアのテーブルは自動で既存データのスキーマ、パーティション、テーブルプロパティを継承します。この機能はメタストアにデータを「インポート」するために使用されます。
-
何かしらの設定(スキーマ、パーティション、テーブルプロパティ)を行なった場合、Delta Lakeは指定された内容が既存データと完全に一致するかどうかを検証します。
重要!
指定された設置が既存の設定と完全に一致しない場合、Delta Lakeは不一致を告げる例外をスローします。
注意
メタストアはDeltaテーブルの最新情報に関する信頼できる情報源(the source of truth)ではありません。実際には、メタストアのテーブル定義はスキーマやプロパティのような全てのメタデータを保持していない場合があります。これは、テーブルの場所を保持しており、その場所にあるテーブルのトランザクションログが信頼できる情報源となります。システムからDelta固有の変化に気づいていないメタストアにクエリーを実行した場合、不完全あるいは古いテーブル情報を参照する場合があります。
ジェネレーテッドカラムの使用
プレビュー
本機能はパブリックプレビューです。
注意
本機能はDatabricksランタイム8.3以降で利用できます。
Delta Lakeでは、Deltaテーブルの他のテーブルに対してユーザー定義関数を適用することで自動的に値を生成する特殊なカラムであるジェネレーテッドカラムをサポートしています。ジェネレーテッドカラムを持つテーブルに書き込みを行い、当該カラムに値を明示的に指定しない場合、Delta Lakeは自動で値を計算します。例えば、タイムスタンプのカラムから自動で(日付でテーブルをパーティショニングするために)日付カラムを作成することができます。等がテーブルに書き込みを行う際にはタイムスタンプのデータだけで十分です。しかし、これらのカラムに明示的に値を指定した場合には、制約(<value> <=> <generation expression>) IS TRUE
を満足する必要があり、満たさない場合にはエラーとともに失敗します。
以下の例では、ジェネレーテッドカラムを持つテーブルの作成方法を示しています。
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
dateOfBirth DATE GENERATED ALWAYS AS (CAST(birthDate AS DATE)),
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
DeltaTable.create(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn(
DeltaTable.columnBuilder("dateOfBirth")
.dataType(DateType)
.generatedAlwaysAs("CAST(dateOfBirth AS DATE)")
.build())
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.partitionedBy("gender")
.execute()
ジェネレーテッドカラムは通常カラムのように格納され、ストレージ容量を必要とします。
ジェネレーテッドカラムには以下の制限が適用されます。
- ジェネレーションの表現では、以下のタイプの関数以外で、同じ引数に対して同じ値を返却するSparkのSQL関数を使用することができます。
- ユーザー定義関数
- 集約関数
- ウィンドウ関数
- 複数行を返却する関数
- Databricksランタイム9.1以降では、
spark.databricks.delta.schema.autoMerge.enabled
をtrueに設定することで、MERGE
オペレーションでジェネレーテッドカラムを使用することができます。
テーブルの読み込み
テーブル名あるいはパスを指定することでDeltaテーブルをデータフレームにロードすることができます。
SELECT * FROM default.people10m -- query table in the metastore
SELECT * FROM delta.`/tmp/delta/people10m` -- query table by path
spark.table("default.people10m") # query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") # query table by path
spark.table("default.people10m") // query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") // create table by path
import io.delta.implicits._
spark.read.delta("/tmp/delta/people10m")
いかなるクエリーにおいても、返却されるデータフレームは自動で最新のテーブルのスナップショットを読み込みます。REFRESH TABLE
を実行する必要はありません。Delta Lakeはクエリーに利用できる述語が存在する場合には、最小限の量のデータを読み込むように自動でパーティションや統計情報を使用します。
過去のテーブルのスナップショットのクエリー(タイムトラベル)
Delta Lakeのタイムトラベルを用いることで、Deltaテーブルの過去のスナップショットに対してクエリーを実行することができます。タイムトラベルには以下のように多くのユースケースが存在しています。
- 分析、レポート、出力(例えば、機械学習モデルの出力)の再作成
- 複雑な一時的なクエリーの記述
- データの誤りの修正
- 高速に変化するテーブルに対するクエリーのスナップショット分離の提供
このセクションでは、過去のバージョンのテーブルに対するクエリーでサポートされている方法とデータ保持に関する注意事項、サンプルを説明します。
文法
このセクションでは、どのようにDeltaテーブルの過去のバージョンに対してクエリーを実行するのかを説明します。
SQLのAS OF
文法
SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
ここでは、
-
timestamp_expression
は以下のいずれかとなります。-
'2018-10-18T22:15:12.013Z'
、すなわち、タイムスタンプにキャストできる文字列 cast('2018-10-18 13:36:32 CEST' as timestamp)
-
'2018-10-18'
、すなわち、日付の文字列 - Databricksランタイム6.6以降では以下も使用できます。
current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- タイムスタンプ、あるいはタイムスタンプにキャストできる他の任意の表現
-
-
version
は、DESCRIBE HISTORY table_spec
の出力から取得できるlong値です。
timestamp_expression
やversion
をサブクエリーで使用することはできません。
サンプル
SELECT * FROM default.people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123
DataFrameReaderのオプション
DataFrameReaderのオプションを用いることで、特定バージョンのDeltaテーブルからデータフレームを作成することができます。
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")
timestamp_string
に対しては、日付、タイムスタンプの文字列のみを指定することができます。例えば、"2019-01-01"
や"2019-01-01T00:00:00.000Z"
といったものです。
一般的なパターンには、Deltaテーブルの最新状態を使用して、下流のアプリケーションをアップデートするDatabricksジョブの実行を行うというものがあります。
Deltaテーブルは自動でアップデートするので、背後のデータがアップデートされている間に読み込まれると、Deltaテーブルからロードしたデータフレームは異なる結果を返却する場合があります。タイムトラベルを用いることで、呼び出しによって返却されるデータフレームのデータを固定することができます。
latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/tmp/delta/people10m`)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")
タイムトラベルのサンプル
- ユーザー
111
を間違って削除してしまった際の復旧
INSERT INTO my_table
SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
WHERE userId = 111
- テーブルに対する誤ったアップデートからの復旧
MERGE INTO my_table target
USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
- 先週追加された新規顧客数に対するクエリー
SELECT count(distinct userId) - (
SELECT count(distinct userId)
FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
データの保持
以前のバージョンにタイムトラベルするには、当該バージョンのログとデータファイルの両方を保持する必要があります。
Deltaテーブルの背後にあるデータファイルは自動では決して削除されません。データファイルはあなたがVACUUMを実行した時のみ削除されます。VACUUM
はDeltaログファイルを削除しません。ログファイルはチェックポイントが書き込まれた後に自動でクリーンアップされます。
デフォルトでは以下のことを行わない限り、過去30日までのDeltaテーブルにタイムトラベルすることができます。
-
お使いのDeltaテーブルに
VACUUM
を実行する。 -
以下のテーブルプロパティを用いてデータ、ログファイルの保持期間を変更する。
-
delta.logRetentionDuration = "interval <interval>"
: テーブル履歴を保持する期間を制御します。デフォルトはinterval 30 days
です。
チェックポイントが書き込まれるたびに、Databricksは自動で保持期間より古いログのエントリーをクリーンアップします。この設定を大きくすれば、どれだけ多くのログエントリーが保持されます。ログに対するオペレーションの時間は一定なので、これはパフォーマンスにインパクトを及ぼしません。履歴に対するオペレーションは並列化されますが、ログのサイズが増えるとよりコストを要するものになるでしょう。
-
delta.deletedFileRetentionDuration = "interval <interval>"
:VACUUM
の候補になる前に、どれだけ古いファイルが削除されるべきかを制御します。デフォルトはinterval 7 days
です。
Deltaテーブルに
VACUUM
を実行しても30日の履歴データにアクセスできるようにするためには、delta.deletedFileRetentionDuration = "interval 30 days"
を設定します。この設定によってストレージコストが増加する場合があります。 -
テーブルへの書き込み
追加
既存のDeltaテーブルに原子的に新規データを追加するには、append
モードを使用します。
INSERT INTO default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("append").delta("/tmp/delta/people10m")
上書き
テーブルの全データを原子的に置き換えるには、overwrite
モードを使用します。
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")
データフレームを用いることで、任意の表現に合致するデータのみを選択的に上書きすることができます。この機能はDatabricksランタイム9.1LTS以降で利用できます。以下のコマンドは、start_date
でパーティション分けされており、df
に格納されているターゲットテーブルの1月のイベントのみを原子的に置換します。
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
.save("/tmp/delta/events")
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
このサンプルコードはdf
のデータを書き出し、述語にに全てが合致しているかを検証し、原子的な置換を行います。述語に合致していないデータを書き出し、ターゲットテーブルと合致する行を置換したい場合には、spark.databricks.delta.replaceWhere.constraintCheck.enabled
をfalseに設定して制約チェックを無効化することができます。
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
Databricksランタイム9.0以下では、replaceWhere
はパーティションカラムのみに対する述語に合致するデータを上書きます。以下のコマンドは、date
でパーティション分けされており、df
に格納されているターゲットテーブルの1月のイベントのみを原子的に置換します。
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
.save("/tmp/delta/people10m")
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
Databricksランタイム9.1以降で過去の挙動に戻したい場合には、spark.databricks.delta.replaceWhere.dataColumns.enabled
フラグを無効にします。
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
注意
Apache Sparkのfile APIと異なり、Delta Lakeはテーブルのスキーマを記憶し、強制します。これはデフォルトでは、上書きは既存テーブルのスキーマを置換しないことを意味します。
ユーザー定義コミットメタデータの設定
これらのオペレーションによってなされるコミットにおけるメタデータとして、DataFrameWriterのオプションuserMetadata
あるいはSparkSessionの設定spark.databricks.delta.commitInfo.userMetadata
を用いて、ユーザー定義の文字列を指定することができます。両方が指定された場合、オプションの方が優先されます。このユーザー定義メタデータはhistoryオペレーションで参照することができます。
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/tmp/delta/people10m")
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "overwritten-for-fixing-incorrect-data")
.save("/tmp/delta/people10m")
スキーマの検証
Delta Lakeは、書き込まれたデータフレームのスキーマとテーブルのスキーマに互換性があるのかを自動的に検証します。Delta Lakeはテーブルへのデータフレームの書き込みに互換性があるのかどうかを以下のルールを用いて検証します。
- データフレームのカラムはターゲットテーブルに存在しなくてはなりません。データフレームのカラムがテーブルに存在しない場合、例外が起きます。テーブルに存在し、データフレームに存在しないカラムはnullに設定されます。
- データフレームのカラムのデータ型とターゲットテーブルのデータ型は一致しなくてはなりません。合致しない場合、例外が起きます。
- データフレームのカラム名は大文字小文字で区別されません。これは、同じテーブルに「Foo」、「foo」のようなカラムを定義できないことを意味します。Sparkをケースセンシティブモード、あるいはケースインセンシティブ(デフォルト)モードで動作させることはできますが、Parquetは格納時、カラム情報返却時においてケースセンシティブとなります。Delta Lakeでは大文字小文字は保存されますが、スキーマを保存する際には大文字小文字を区別せず、潜在的な誤りやデータ破壊、損失の問題を回避するためにこの制約を設けています。
Delta Lakeでは明示的に新規カラムを追加するためのDDLと、自動でスキーマをアップデートする機能をサポートしています。
partitionBy
のような他のオプションとappendノードを組み合わせて指定した場合、Delta Lakeはこれらがカッチするかを検証し、不一致がある場合には例外をスローします。partitionBy
が存在しない場合、追加は自動で既存データのパーティションに従います。
注意
Databricksランタイム7.0以降では、INSERT
文ではスキーマ強制を提供しており、スキーマ進化をサポートしています。カラムのデータ型が安全にお使いのDelta Lakeテーブルのデータ型にキャストできない場合、ランタイム例外がスローされます。スキーマ進化が有効化されている場合、スキーマが進化できるように、お使いのスキーマ(あるいはネストされたカラム)の最後のカラムとして新規カラムが追加されます。
Delta Lakeにおけるスキーマ強制、スキーマ進化の詳細に関しては、以下のYouTube動画(55分)をご覧ください。
テーブルスキーマのアップデート
Delta Lakeではテーブルのスキーマをアップデートすることができます。以下のタイプの変更をサポートしています。
- (任意の場所への)新規カラムの追加
- 既存カラムの並び替え
- 既存カラムの名前の変更
DDLを用いて明示的に変更することや、DMLを用いて暗黙的に変更することができます。
重要!
Deltaテーブルのスキーマをアップデートする際には、当該テーブルを読み込むストリームは停止されます。ストリームを継続する際には再始動してください。
推奨する方法に関しては、Databricksにおける構造化ストリーミングの本格運用をご覧ください。
明示的なスキーマのアップデート
テーブルのスキーマを明示的に変更するには、以下のDDLを使用することができます。
カラムの追加
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
デフォルトでは、null値の許可はtrue
です。
ネストされたフィールドにカラムを追加するには以下を実行します。
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
サンプル
ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)
を実行する前のスキーマが以下のようなものだった場合、
- root
| - colA
| - colB
| +-field1
| +-field2
実行後のスキーマは以下のようになります。
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注意
ネストされたカラムの追加はstructsのみがサポートされています。arrayやmapはサポートされていません。
カラムのコメント、並びの変更
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
ネストされたフィールドのカラムを変更するには、以下を使用します。
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
サンプル
ALTER TABLE boxes CHANGE COLUMN colB.field2 field2 STRING FIRST
を実行する前のスキーマが以下のようなものだった場合、
- root
| - colA
| - colB
| +-field1
| +-field2
実行後は以下のようになります。
- root
| - colA
| - colB
| +-field2
| +-field1
カラムの置換
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
サンプル
以下のDDLを実行すると、
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
以下のスキーマは
- root
| - colA
| - colB
| +-field1
| +-field2
このようになります。
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
カラム名の変更
プレビュー
本機能はパブリックプレビューです。
要件
- Databricksランタイム10.2以降が必要です。
- カラムに存在しているデータを変更せずにカラムの名前を変更するには、テーブルのカラムマッピングを有効化する必要があります。Delta column mappingを参照下さい。
カラム名を変更するには以下を実行します。
ALTER TABLE <table_name> RENAME COLUMN old_col_name TO new_col_name
ネストされたフィールドの名称を変更するには以下を実行します。
ALTER TABLE <table_name> RENAME COLUMN col_name.old_nested_field TO new_nested_field
サンプル
以下のコマンドを実行すると、
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
以下のスキーマは、
- root
| - colA
| - colB
| +-field1
| +-field2
以下のように変更されます。
- root
| - colA
| - colB
| +-field001
| +-field2
Delta column mappingを参照ください。
カラムの型、名称の変更
テーブルを再書き込みすることで、カラムのデータ型の変更や、カラムの削除が可能です。これを行うためには、overwriteSchema
オプションを使用します。
カラム型の変更
spark.read.table(...) \
.withColumn("birthDate", col("birthDate").cast("date")) \
.write \
.format("delta") \
.mode("overwrite")
.option("overwriteSchema", "true") \
.saveAsTable(...)
カラム名の変更
spark.read.table(...) \
.withColumnRenamed("dateOfBirth", "birthDate") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable(...)
自動スキーマアップデート
Delta Lakeは、DMLのトランザクション(追加あるいは上書き)の一部としてテーブルのスキーマを自動でアップデートし、画期込まれたデータをスキーマの互換性を確保します。
カラムの追加
以下のケースでは、データフレームに存在しているがテーブルにはないカラムは書き込みトランザクションの一部として自動で追加されます。
-
write
あるいはwriteStream
に、オプション.option("mergeSchema", "true")
がある。 -
spark.databricks.delta.schema.autoMerge.enabled
がtrue
になっている。
両方のオプションが設定されている場合、DataFrameWriter
のオプションが優先されます。追加されたカラムは存在するstructの最後に追加されます。新規カラムを追加する際、大文字小文字は保持されます。
注意
-
テーブルアクセスコントロールが有効化されている場合(
MODIFY
を必要とするリクエストをALL PRIVILEGES
を必要とするリクエストに引き上げることになるため)、mergeSchema
はサポートされません。 -
INSERT INTO
や.write.insertInto()
ではmergeSchema
は使用できません。
NullType
のカラム
ParquetではNullType
をサポートしていないので、Deltaテーブルに書き込む際、データフレームからNullType
カラムは削除されますが、スキーマには保存されます。このカラムに異なるデータ型を受け取った際、Delta Lakeは新たなデータ型にスキーマをマージします。Delta Lakeが既存カラムに対してNullType
を受け取ると、書き込みの際に古いスキーマは維持され、新たなカラムは削除されます。
ストリーミングでのNullType
はサポートされていません。ストリーミングを使用する際にはスキーマを設定する必要があるためであり、これは非常にまれなことです。また、ArrayType
やMapType
のような複雑な型でもNullType
は受け付けられません。
テーブルスキーマの置換
デフォルトでは、テーブルのデータの上書きではスキーマを上書きません。replaceWhere
なしにmode("overwrite")
を用いてテーブルを上書きする際、書き込まれるテーブルのスキーマを上書きしたいケースがあるかもしれません。overwriteSchema
オプションをtrue
に設定することで、テーブルのスキーマやパーティションを置き換えることができます。
df.write.option("overwriteSchema", "true")
テーブルに対するビュー
Delta Lakeでは、データソースのテーブルで行うのと同じように、Deltaテーブルに対するビューを作成することができます。
これらのビューとテーブルアクセスコントロールを組み合わせることで、行列レベルのセキュリティを実現することができます。
ビューを操作する際の大きな課題には、スキーマの解決があります。Deltaテーブルのスキーマを変更した場合、スキーマに追加されたあらゆる変更を考慮するように、派生ビューを再作成する必要があります。例えば、Deltaテーブルに新規カラムを追加した場合、ベースのテーブルから作成されたビューでこの新規カラムが利用できることを確認する必要があります。
テーブルプロパティ
CREATE
やALTER
でTBLPROPERTIES
を用いることでテーブルプロパティとしてメタデータを格納することができます。
TBLPROPERTIES
はDeltaテーブルのメタデータの一部として格納されます。特定の場所にDeltaテーブルが存在している場合、CREATE
文で新たなTBLPROPERTIES
を定義することはできません。詳細はテーブルの作成を参照ください。
さらに、挙動とパフォーマンスを調整するために、Delta Lakeでは特定のDeltaテーブルプロパティを提供しています。
- Deltaテーブルにおける削除・更新のブロック:
delta.appendOnly=true
-
タイムトラベルの保持期間の設定:
delta.logRetentionDuration=<interval-string>
とdelta.deletedFileRetentionDuration=<interval-string>
。詳細はデータの保持を参照ください。 - 統計情報を収集するカラムの数:
delta.dataSkippingNumIndexedCols=<number-of-columns>
。このプロパティは新規データが書き込まれるときにのみ反映されます。 - S3メタデータのホットスポットを回避するためにプレフィックスをランダムにする:
delta.randomizeFilePrefixes=true
。大量の(秒あたり数千リクエスト)高速な読み書きオペレーションを必要とするテーブルに対しては、テーブル専用のS3バケットを作成し、ベストな体験を得るためにファイルのプレフィックスをランダムにすることを強くお勧めします。
注意
Deltaテーブルプロパティの変更は、他の同時書き込みオペレーションと競合しうる書き込みオペレーションであり、その場合処理が失敗します。テーブルに対する同時書き込みオペレーションが存在しな場合にのみ、テーブルプロパティを変更することをお勧めします。
また、Spark設定を用いて、Deltaテーブルへの最初のコミットからdelta.
のプレフィックスのプロパティを設定することができます。例えば、プロパティdelta.appendOnly=true
でDeltaテーブルを初期化するには、Spark設定spark.databricks.delta.properties.defaults.appendOnly
を以下のようにtrue
に設定します。
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
テーブルのメタデータ
Delta Lakeでは、テーブルメタデータを探索するための豊富な機能を提供しています。
SHOW [PARTITIONS | COLUMNS]
とDESCRIBE TABLE
をサポートしています。以下を参照下さい。
- Databricksランタイム7.x以降: SHOW PARTITIONS
- Databricksランタイム5.5LTSおよび6.x: Show Partitions、Show Partitions、Describe Table
また、以下のユニークなコマンドも提供しています。
DESCRIBE DETAIL
スキーマ、パーティション、テーブルサイズなどの情報を提供します。詳細はテーブル詳細を取得するを参照ください。
DESCRIBE HISTORY
オペレーション、ユーザーなどを含む履歴情報、テーブルへの書き込みに関するオペレーションメトリクスを提供します。テーブル履歴は30日保持されます。詳細はテーブル履歴を取得するを参照ください。
Dataサイドバーは、Deltaテーブルの詳細情報、履歴に対するビジュアルなビューを提供します。テーブルスキーマ、サンプルデータに加えて、Historyタブをクリックすることで、DESCRIBE HISTORY
によって提供されるテーブル履歴を参照することができます。
ストレージ認証情報の設定
Delta LakeはストレージシステムにアクセスするためにHadoop FileSystem APIを使用します。ストレージシステムに対する認証情報はHadoop設定を通じて設定することができます。Delta LakeではApache Spark同様に、Hadoop設定を行うための複数の方法を提供しています。
Spark設定
クラスターでSparkアプリケーションを起動する際、カスタムのHadoop設定を渡すために、spark.hadoop.*
の形式でSpark設定を行うことができます。例えば、spark.hadoop.a.b.c
の値を設定すると、Hadoop設定a.b.c
の値が引き渡され、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。
詳細はSparkの設定をご覧下さい。
SQLセッション設定
Spark SQLは現在の全てのSQLセッション設定をDelta Lakeに引渡し、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。例えば、SET a.b.c=x.y.z
はDelta Lakeに対して、Hadoop設定a.b.c
として値x.y.z
を引渡し、Delta LakeはHadoop FileSystem APIにアクセスするためにこの値を使用します。
DataFrameのオプション
Spark(クラスター)設定やSQLセッション設定を通じて、Hadoopのファイルシステム設定を行うことに加えDeltaでは、DataFrameReader.load(path)
やDataFrameWriter.save(path)
を用いてテーブルを読み書きする際、DataFrameReader
、DataFrameWriter
のオプションからHadoopファイルシステムの設定の読み込みをサポートしています。
注意
この機能はDatabricksランタイム10.1以降で利用できます。
例えば、DataFrameのオプション経由でストレージの認証情報を引き渡すことができます。
df1 = spark.read.format("delta") \
.option("fs.s3a.access.key", "<access-key-1>") \
.option("fs.s3a.secret.key", "<secret-key-1>") \
.read("...")
df2 = spark.read.format("delta") \
.option("fs.s3a.access.key", "<access-key-1>") \
.option("fs.s3a.secret.key", "<secret-key-2>") \
.read("...")
df1.union(df2).write.format("delta") \
.mode("overwrite") \
.option("fs.s3a.access.key", "<access-key-3>") \
.option("fs.s3a.secret.key", "<secret-key-3>") \
.save("...")
val df1 = spark.read.format("delta")
.option("fs.s3a.access.key", "<access-key-1>")
.option("fs.s3a.secret.key", "<secret-key-1>")
.read("...")
val df2 = spark.read.format("delta")
.option("fs.s3a.access.key", "<access-key-2>")
.option("fs.s3a.secret.key", "<secret-key-2>")
.read("...")
df1.union(df2).write.format("delta")
.mode("overwrite")
.option("fs.s3a.access.key", "<access-key-3>")
.option("fs.s3a.secret.key", "<secret-key-3>")
.save("...")
Data sources で、お使いのストレージに対するHadoopファイルシステム設定の詳細を確認することができます。
ノートブック
さまざまなDeltaテーブルのメタデータコマンドのサンプルに関しては、以下のノートブックの最後をご覧ください。
Delta Lakeバッチコマンドのノートブック