Databricksクイックスタートガイドのコンテンツです。
Delta Lake quickstart | Databricks on AWS [2020/6/8時点]の翻訳です。
Delta Lakeはデータレイクに信頼性をもたらすオープンソースのストレージレイヤーです。Delta LakeはACIDトランザクション、スケーラブルなメタデータハンドリング、バッチとストリーミング処理の統合を実現します。Delta Lakeは既存のデータレイク上で稼働し、Apache Spark APIと互換性があります。
Delta Lakeクイックスタートガイドは、Delta Lakeで行う作業の概要を提供するものです。クイックスタートガイドでは、JSONデータをDeltaテーブルに読み込み、テーブルを編集し、テーブルを読み込み、テーブルの履歴を表示し、最後にテーブルの最適化を行います。
これらの機能をデモンストレートするノートブックに関しては、こちらの入門ノートブック(英語)を参照ください。
Delta Lakeの概要に関してはこちらの動画を参照ください。
テーブルの作成
Deltaテーブルを作成するには、既存のApache Spark SQLのコードを使用して、parquet
、csv
、json
などのフォーマットからdelta
形式に変換することができます。
全てのファイルタイプに対して、ファイルをデータフレームに読み込み、delta
形式で書き出します。
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
library(SparkR)
sparkR.session()
events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`
これらの操作によってJSONデータから推定されるスキーマを用いて、新たにアンマネージドなテーブルが作成されます。新たにDeltaテーブルを作成する際に利用できるオプションに関しては、Create a table(英語)、Write to a table(英語)を参照ください。
元のファイルがParquet形式の場合は、その場でアンマネージドテーブルを作成できるSQLのConvert to Delta
文を使用できます。
CONVERT TO DELTA parquet.`/mnt/delta/events`
データのパーティション
パーティションを作成できるカラム(例えば日付)に対する検索条件を含むクエリを高速化するのであれば、データのパーティションを作成できます。
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.partitionBy("date").format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events", partitionBy = "date")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
SQLでDeltaテーブルを作成する際にはPARTITIONED BY
でカラムを指定してください。
CREATE TABLE events (
date DATE,
eventId STRING,
eventType STRING,
data STRING)
USING delta
PARTITIONED BY (date)
テーブルの編集
Delta Lakeはテーブルを編集するための様々な操作を提供します。
ストリーミングによるテーブルへの書き込み
構造化ストリーミングを使用して、Deltaテーブルにデータを書き込むことができます。Delta Lakeのトランザクションログは、テーブルに対して同時にストリーミングとバッチのクエリが実行されたとしても、「exactly-once(確実に一度のみの実行)」処理を保証します。デフォルトでは、ストリーミングは、テーブルに新規レコードを追加するappendモードで実行されます。
from pyspark.sql.types import *
inputPath = "/databricks-datasets/structured-streaming/events/"
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])
eventsDF = (
spark
.readStream
.schema(jsonSchema) # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.json(inputPath)
)
(eventsDF.writeStream
.outputMode("append")
.option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
.table("events")
)
inputPath <- "/databricks-datasets/structured-streaming/events/"
tablePath <- "/mnt/delta/events/"
jsonSchema <- structType(structField("time", "timestamp", T), structField("action", "string", T))
eventsStream <- read.stream(
"json",
path = inputPath,
schema = jsonSchema,
maxFilesPerTrigger = 1)
write.stream(
eventsStream,
path = tablePath,
mode = "append",
checkpointLocation = "/mnt/delta/events/_checkpoints/etl-from-json")
Delta Lakeと構造化ストリーミングの連携に関しては、Table streaming reads and writes(英語)を参照ください。
バッチによるアップサート
既存のテーブルに対してアップデートとインサートのセットを適用するには、MERGE INTO
文を使用します。例えば、以下の文はストリーミングによるアップデートをevents
テーブルに適用しています。同じeventId
のイベントが既に存在する場合には、Delta Lakeは指定された書式に則りdataカラムを更新します。合致するイベントが存在しない場合には、新規に行を追加します。
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
UPDATE SET
events.data = updates.data
WHEN NOT MATCHED
THEN INSERT (date, eventId, data) VALUES (date, eventId, data)
INSERT
を実行する際(例えば、既存のデータセットに合致する行が存在しない場合)にはテーブルの全ての列をしていなくてはなりません。しかし、アップデートを行う際には、その必要はありません。
テーブルの読み込み
本章では以下を説明します:
DBFSパス("/mnt/delta/events"
)、あるいはテーブル("events"
)を指定することで、Deltaテーブルにアクセスすることができます。
val events = spark.read.format("delta").load("/mnt/delta/events")
val events = spark.table("events")
events <- read.df(path = "/mnt/delta/events", source = "delta")
events <- tableToDF("events")
SELECT * FROM delta.`/mnt/delta/events`
SELECT * FROM events
テーブル履歴の表示
テーブル履歴を表示するには、個々のテーブル書き込みに対応する、テーブルのバージョン、操作、ユーザーなどを含む来歴情報を表示するDESCRIBE HISTORY
文を使用します。詳細はRetrieve Delta table history(英語)を参照ください。
以前のバージョンのテーブルに対するクエリ(タイムトラベル)
Delta Lakeのタイムトラベルにより、以前のDeltaテーブルのスナップショットをクエリできます。
timestamp_string
に対しては、日付、タイムスタンプ形式のみ利用可能です。例えば、"2019-01-01"
や"2019-01-01'T'00:00:00.000Z"
と言った形式です。
過去のバージョンのテーブルを取得するには、SELECT
文でバージョン、タイムスタンプを指定してください。例えば、バージョン0のテーブルを取得するには以下のクエリを実行します:
SELECT * FROM events VERSION AS OF 0
SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'
注意
ここでは、バージョン1のタイムスタンプが'2019-01-29 00:38:10'
であるため、バージョン0のテーブルをクエリするには、'2019-01-29 00:37:58'
から'2019-01-29 00:38:09'
のレンジのタイムスタンプを指定することができます。
DataFrameReaderのオプションを指定することで、特定のバージョンのDeltaテーブルからデータフレームを作成することができます。
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")
テーブルの最適化
テーブルに対して何度も変更を加えた結果、大量の小さいファイルが発生します。読み取りクエリーの性能を改善するために、小さいファイルを大きいファイルにまとめるOPTIMIZE
を使用することができます。
OPTIMIZE delta.`/mnt/delta/events`
OPTIMIZE events
Z-order by カラム
さらに性能を改善するために、Z-orderingを用いて、関連する情報を同一グループのファイル内に配置することができます。この再配置は、読み取りデータ量を劇的に削減するDelta Lakeのデータスキッピングアルゴリズムに自動的に活用されます。データをZ-orderするためには、ZORDER BY
句で並び替えを行うカラムを指定します。例えば、eventTypeに基づいて再配置を行う場合には以下を実行します。
OPTIMIZE events
ZORDER BY (eventType)
OPTIMIZEの全てのオプションに関しては、Compaction (bin-packing)(英語)を参照ください。
スナップショットのクリーンアップ
Delta Lakeは読み取りに関してはスナップショットのアイソレーションを提供します。これは、テーブルに対するクエリが実行されている時にOPTIMIZE
を実行しても安全であることを意味します。しかし、最終的には古いスナップショットを削除しなくてはなりません。VACUUM
を実行することでクリーンアップを行うことができます。
VACUUM events
スナップショットを保持する期間をRETAIN <N> HOURS
オプションで指定できます。
VACUUM events RETAIN 24 HOURS
VACUUM
の詳細に関しては、Remove files no longer referenced by a Delta table(英語)を参照ください。