8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Databricksクイックスタートガイドのコンテンツです。

Delta Lake quickstart | Databricks on AWS [2020/6/8時点]の翻訳です。

Delta Lakeはデータレイクに信頼性をもたらすオープンソースのストレージレイヤーです。Delta LakeはACIDトランザクション、スケーラブルなメタデータハンドリング、バッチとストリーミング処理の統合を実現します。Delta Lakeは既存のデータレイク上で稼働し、Apache Spark APIと互換性があります。

Delta Lakeクイックスタートガイドは、Delta Lakeで行う作業の概要を提供するものです。クイックスタートガイドでは、JSONデータをDeltaテーブルに読み込み、テーブルを編集し、テーブルを読み込み、テーブルの履歴を表示し、最後にテーブルの最適化を行います。

これらの機能をデモンストレートするノートブックに関しては、こちらの入門ノートブック(英語)を参照ください。

Delta Lakeの概要に関してはこちらの動画を参照ください。

テーブルの作成

Deltaテーブルを作成するには、既存のApache Spark SQLのコードを使用して、parquetcsvjsonなどのフォーマットからdelta形式に変換することができます。

全てのファイルタイプに対して、ファイルをデータフレームに読み込み、delta形式で書き出します。

Python
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
R
library(SparkR)
sparkR.session()

events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
SQL
CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`

これらの操作によってJSONデータから推定されるスキーマを用いて、新たにアンマネージドなテーブルが作成されます。新たにDeltaテーブルを作成する際に利用できるオプションに関しては、Create a table(英語)Write to a table(英語)を参照ください。

元のファイルがParquet形式の場合は、その場でアンマネージドテーブルを作成できるSQLのConvert to Delta文を使用できます。

CONVERT TO DELTA parquet.`/mnt/delta/events`

データのパーティション

パーティションを作成できるカラム(例えば日付)に対する検索条件を含むクエリを高速化するのであれば、データのパーティションを作成できます。

Python
events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.partitionBy("date").format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")
R
events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events", partitionBy = "date")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")

SQLでDeltaテーブルを作成する際にはPARTITIONED BYでカラムを指定してください。

SQL
CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING delta
PARTITIONED BY (date)

テーブルの編集

Delta Lakeはテーブルを編集するための様々な操作を提供します。

ストリーミングによるテーブルへの書き込み

構造化ストリーミングを使用して、Deltaテーブルにデータを書き込むことができます。Delta Lakeのトランザクションログは、テーブルに対して同時にストリーミングとバッチのクエリが実行されたとしても、「exactly-once(確実に一度のみの実行)」処理を保証します。デフォルトでは、ストリーミングは、テーブルに新規レコードを追加するappendモードで実行されます。

Python
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

eventsDF = (
  spark
    .readStream
    .schema(jsonSchema) # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

(eventsDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")
)
R
inputPath <- "/databricks-datasets/structured-streaming/events/"
tablePath <- "/mnt/delta/events/"

jsonSchema <- structType(structField("time", "timestamp", T), structField("action", "string", T))
eventsStream <- read.stream(
  "json",
  path = inputPath,
  schema = jsonSchema,
  maxFilesPerTrigger = 1)

write.stream(
  eventsStream,
  path = tablePath,
  mode = "append",
  checkpointLocation = "/mnt/delta/events/_checkpoints/etl-from-json")

Delta Lakeと構造化ストリーミングの連携に関しては、Table streaming reads and writes(英語)を参照ください。

バッチによるアップサート

既存のテーブルに対してアップデートとインサートのセットを適用するには、MERGE INTO文を使用します。例えば、以下の文はストリーミングによるアップデートをeventsテーブルに適用しています。同じeventIdのイベントが既に存在する場合には、Delta Lakeは指定された書式に則りdataカラムを更新します。合致するイベントが存在しない場合には、新規に行を追加します。

SQL
MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

INSERTを実行する際(例えば、既存のデータセットに合致する行が存在しない場合)にはテーブルの全ての列をしていなくてはなりません。しかし、アップデートを行う際には、その必要はありません。

テーブルの読み込み

本章では以下を説明します:

DBFSパス("/mnt/delta/events")、あるいはテーブル("events")を指定することで、Deltaテーブルにアクセスすることができます。

Scala
val events = spark.read.format("delta").load("/mnt/delta/events")
Scala
val events = spark.table("events")
R
events <- read.df(path = "/mnt/delta/events", source = "delta")
R
events <- tableToDF("events")
SQL
SELECT * FROM delta.`/mnt/delta/events`
SQL
SELECT * FROM events

テーブル履歴の表示

テーブル履歴を表示するには、個々のテーブル書き込みに対応する、テーブルのバージョン、操作、ユーザーなどを含む来歴情報を表示するDESCRIBE HISTORY文を使用します。詳細はRetrieve Delta table history(英語)を参照ください。

以前のバージョンのテーブルに対するクエリ(タイムトラベル)

Delta Lakeのタイムトラベルにより、以前のDeltaテーブルのスナップショットをクエリできます。

timestamp_stringに対しては、日付、タイムスタンプ形式のみ利用可能です。例えば、"2019-01-01""2019-01-01'T'00:00:00.000Z"と言った形式です。

過去のバージョンのテーブルを取得するには、SELECT文でバージョン、タイムスタンプを指定してください。例えば、バージョン0のテーブルを取得するには以下のクエリを実行します:

SQL
SELECT * FROM events VERSION AS OF 0
SQL
SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'

注意
ここでは、バージョン1のタイムスタンプが'2019-01-29 00:38:10'であるため、バージョン0のテーブルをクエリするには、'2019-01-29 00:37:58'から'2019-01-29 00:38:09'のレンジのタイムスタンプを指定することができます。

DataFrameReaderのオプションを指定することで、特定のバージョンのDeltaテーブルからデータフレームを作成することができます。

Python
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")

テーブルの最適化

テーブルに対して何度も変更を加えた結果、大量の小さいファイルが発生します。読み取りクエリーの性能を改善するために、小さいファイルを大きいファイルにまとめるOPTIMIZEを使用することができます。

SQL
OPTIMIZE delta.`/mnt/delta/events`
SQL
OPTIMIZE events

Z-order by カラム

さらに性能を改善するために、Z-orderingを用いて、関連する情報を同一グループのファイル内に配置することができます。この再配置は、読み取りデータ量を劇的に削減するDelta Lakeのデータスキッピングアルゴリズムに自動的に活用されます。データをZ-orderするためには、ZORDER BY句で並び替えを行うカラムを指定します。例えば、eventTypeに基づいて再配置を行う場合には以下を実行します。

SQL
OPTIMIZE events
  ZORDER BY (eventType)

OPTIMIZEの全てのオプションに関しては、Compaction (bin-packing)(英語)を参照ください。

スナップショットのクリーンアップ

Delta Lakeは読み取りに関してはスナップショットのアイソレーションを提供します。これは、テーブルに対するクエリが実行されている時にOPTIMIZEを実行しても安全であることを意味します。しかし、最終的には古いスナップショットを削除しなくてはなりません。VACUUMを実行することでクリーンアップを行うことができます。

SQL
VACUUM events

スナップショットを保持する期間をRETAIN <N> HOURSオプションで指定できます。

SQL
VACUUM events RETAIN 24 HOURS

VACUUMの詳細に関しては、Remove files no longer referenced by a Delta table(英語)を参照ください。

Databricks 無料トライアル

Databricks 無料トライアル

8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?