Tutorial: Delta Lake | Databricks on AWS [2022/11/14時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
このチュートリアルでは、以下を含むDatabricksにおけるDelta Lakeの一般的なオペレーションを紹介します。
- テーブルの作成
- テーブルのUPSERT
- テーブルからの読み込み
- テーブル履歴の表示
- 以前のバージョンのテーブルに対するクエリー
- テーブルの最適化
- Z-orderインデックスの追加
- 参照されないファイルのVacuum
Databricksクラスターにアタッチされたノートブックで、本書のサンプル、Python、R、Scala、SQLコードを実行することができます。また、Databricks SQLのSQLウェアハウスに関連づけられたクエリーで本書のSQLコードを実行することができます。
注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューから構成される2レベルの名前空間記述(default.people10m
など)を用いています。これらのサンプルをUnity Catalogで使用するには、2レベルの名前空間をカタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベル名前空間(main.default.people10m
など)に置き換えてください。
テーブルの作成
Databricksで作成されるテーブルはすべてデフォルトでDelta Lakeを使用します。
注意
Databricksランタイム8.0以降では、すべての読み込み、書き込み、テーブル作成のコマンドのデフォルトはDelta Lakeとなります。Databricksランタイム7.3 LTSを使用している場合は、フォーマットを指定するためにdelta
キーワードを使用することができます。
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
上のオペレーションによって、データから推定されたスキーマを用いて新規のマネージドテーブルが作成されます。Deltaテーブルを作成する際に使用できるオプションに関しては、CREATE TABLEをご覧ください。
マネージドテーブルにおいては、Databricksがデータの格納場所を決定します。格納場所を取得するには以下のようにDESCRIBE DETAILを使用することができます。
display(spark.sql('DESCRIBE DETAIL people_10m'))
display(sql("DESCRIBE DETAIL people_10m"))
display(spark.sql("DESCRIBE DETAIL people_10m"))
DESCRIBE DETAIL people_10m;
データをインサートする前にスキーマを指定してテーブルを作成したいと思うかもしれません。SQLを用いてこれを実現することができます。
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
また、テーブルを作成するためにDelta LakeのDeltaTableBuilder
APIを使用することもできます。DataFrameWriter APIと比べて、このAPIはカラムのコメント、テーブルプロパティのような追加情報やジェネレーテッドカラムを容易に指定することができます。
プレビュー
本機能はパブリックプレビューです。
注意
この機能はDatabricksランタイム8.3以降で使用できます。
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
テーブルのUPSERT
既存のDeltaテーブルに対する一連の更新とインサートを管理するために、MERGE INTO文を使用します。例えば、以下の文はソーステーブルからデータを取り込み、ターゲットテーブルにマージします。両方のテーブル行がマッチする場合、Delta Lakeは指定されたエクスプレッションを用いてデータカラムを更新します。行がマッチしない場合、Delta Lakeは新規の行を追加します。このオペレーションはupsertと呼ばれるものです。
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
*
を指定する場合、ターゲットテーブルのすべてのカラムを更新あるいはインサートします。これは、ターゲットテーブルとソーステーブルが同じカラムを持っていることを前提としており、そうで無い場合にはクエリーは解析エラーで失敗します。
INSERT
オペレーションを実行する際には、テーブルのすべてのカラムを指定しなくてはなりません(例えば、既存データセットでマッチする行がない場合)。しかし、すべての値を更新する必要はありません。
結果を確認するには、テーブルにクエリーを実行します。
SELECT * FROM people_10m WHERE id >= 9999998
テーブルの読み込み
以下のサンプルに示すように、テーブル名あるいはテーブルパスを用いてDeltaテーブルにアクセスします。
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
people_df = tableToDF(table_name)
display(people_df)
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
テーブルへの書き込み
Delta Lakeはテーブルへのデータの書き込みに標準的な構文を使用します。
既存のDeltaテーブルに新規データを原子性を持って追加するには、以下のサンプルのようにappend
モードを使用します。
INSERT INTO people10m SELECT * FROM more_people
df.write.mode("append").saveAsTable("people10m")
df.write.mode("append").saveAsTable("people10m")
テーブルの更新
Deltaテーブルで述語にマッチするデータを更新することができます。例えば、people10m
という名前のテーブル、あるいは/tmp/delta/people-10m
のパスにあるテーブルにおいて、gender
カラムの略語をM
とF
からMale
とFemale
に置き換えたい場合、以下を実行します。
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
テーブルからの削除
Deltaテーブルから術後にマッチするデータを削除することができます。例えば、people10m
という名前のテーブル、あるいは/tmp/delta/people-10m
のパスにあるテーブルにおいて、birthDate
の値が1995
より小さい値を持つ人々のすべての行を削除するには以下を実行します。
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
重要!
delete
は最新バージョンのDeltaテーブルからデータを削除しますが、古いバージョンが明示的にvacuumされるまでは物理的なストレージからは削除されません。Vacuumをご覧ください。
テーブル履歴の表示
テーブルの履歴を参照するには、テーブルバージョン、オペレーション、ユーザーなどを含む来歴情報を提供するDESCRIBE HISTORY文を使用します。
DESCRIBE HISTORY people_10m
以前のバージョンのテーブルへのクエリー(タイムトラベル)
Delta Lakeのタイムトラベルによって、Deltaテーブルの過去のスナップショットに対してクエリーを行うことができます。
テーブルの過去のバージョンにクエリーを行うには、SELECT
文でバージョンあるいはタイムスタンプを指定します。例えば、上記の履歴からバージョン0
に対してクエリーを行うには以下を実行します。
SELECT * FROM people_10m VERSION AS OF 0
あるいは、以下を実行します。
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
タイムスタンプに関しては、"2019-01-01"
や"2019-01-01'T'00:00:00.000Z"
のような日付とタイムスタンプの文字列のみを指定することができます。
DataFrameReaderのオプションを指定することで、以下のPythonのサンプルのように、特定バージョンのテーブルに固定されたDeltaテーブルからデータフレームを作成できるようになります。
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
あるいは、
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
詳細はWork with Delta Lake table historyをご覧ください。
テーブルの最適化
テーブルに対して多数の変更を加えると、大量の小規模なファイルが生成される場合があります。クエリーの読み込みスピードを改善するには、小さなファイルを大きなファイルにまとめるためにOPTIMIZE
を使用することができます。
OPTIMIZE people_10m
カラムによるZ-order
さらに読み込み性能を改善するために、Z-Orderingによって関連する情報を同じファイルセットに共存させることができます。この局所性は読み込むべきデータの量を劇的に削減するために、Delta Lakeによるデータスキッピングアルゴリズムによって自動的に活用されます。データをZ-Orderするには、ZORDER BY
句に並び替えるカラムを指定します。gender
でZ-Orderするには以下を実行します。
OPTIMIZE people_10m
ZORDER BY (gender)
OPTIMIZE
を実行する際に指定できるオプションについては、Compact data files with optimize on Delta Lakeをご覧ください。
VACUUMによるスナップショットのクリーンアップ
Delta Lakeは読み込みにおいてスナップショットのアイソレーションを提供し、これはユーザーやジョブがテーブルにクエリーを行っている際においてもOPTIMIZE
を安全に実行できることを意味します。しかし、最終的には古いスナップショットを削除しなくてはならなくなります。VACUUM
コマンドを実行することでこれを実現することができます。
VACUUM people_10m
VACUUM
の効率的な利用法に関しては、VACUUMを用いてDeltaテーブルの未使用データを削除するをご覧ください。