0
0

More than 1 year has passed since last update.

Tutorial: Delta Lake | Databricks on AWS [2022/11/14時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

このチュートリアルでは、以下を含むDatabricksにおけるDelta Lakeの一般的なオペレーションを紹介します。

Databricksクラスターにアタッチされたノートブックで、本書のサンプル、Python、R、Scala、SQLコードを実行することができます。また、Databricks SQLSQLウェアハウスに関連づけられたクエリーで本書のSQLコードを実行することができます。

注意
以下のコードサンプルのいくつかでは、スキーマ(データベース)とテーブル・ビューから構成される2レベルの名前空間記述(default.people10mなど)を用いています。これらのサンプルをUnity Catalogで使用するには、2レベルの名前空間をカタログ、スキーマ、テーブル・ビューから構成されるUnity Catalogの3レベル名前空間(main.default.people10mなど)に置き換えてください。

テーブルの作成

Databricksで作成されるテーブルはすべてデフォルトでDelta Lakeを使用します。

注意
Databricksランタイム8.0以降では、すべての読み込み、書き込み、テーブル作成のコマンドのデフォルトはDelta Lakeとなります。Databricksランタイム7.3 LTSを使用している場合は、フォーマットを指定するためにdeltaキーワードを使用することができます。

Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

上のオペレーションによって、データから推定されたスキーマを用いて新規のマネージドテーブルが作成されます。Deltaテーブルを作成する際に使用できるオプションに関しては、CREATE TABLEをご覧ください。

マネージドテーブルにおいては、Databricksがデータの格納場所を決定します。格納場所を取得するには以下のようにDESCRIBE DETAILを使用することができます。

Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;

データをインサートする前にスキーマを指定してテーブルを作成したいと思うかもしれません。SQLを用いてこれを実現することができます。

SQL
CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

また、テーブルを作成するためにDelta LakeのDeltaTableBuilder APIを使用することもできます。DataFrameWriter APIと比べて、このAPIはカラムのコメント、テーブルプロパティのような追加情報やジェネレーテッドカラムを容易に指定することができます。

プレビュー
本機能はパブリックプレビューです。

注意
この機能はDatabricksランタイム8.3以降で使用できます。

Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

テーブルのUPSERT

既存のDeltaテーブルに対する一連の更新とインサートを管理するために、MERGE INTO文を使用します。例えば、以下の文はソーステーブルからデータを取り込み、ターゲットテーブルにマージします。両方のテーブル行がマッチする場合、Delta Lakeは指定されたエクスプレッションを用いてデータカラムを更新します。行がマッチしない場合、Delta Lakeは新規の行を追加します。このオペレーションはupsertと呼ばれるものです。

SQL
CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

*を指定する場合、ターゲットテーブルのすべてのカラムを更新あるいはインサートします。これは、ターゲットテーブルとソーステーブルが同じカラムを持っていることを前提としており、そうで無い場合にはクエリーは解析エラーで失敗します。

INSERTオペレーションを実行する際には、テーブルのすべてのカラムを指定しなくてはなりません(例えば、既存データセットでマッチする行がない場合)。しかし、すべての値を更新する必要はありません。

結果を確認するには、テーブルにクエリーを実行します。

SQL
SELECT * FROM people_10m WHERE id >= 9999998

テーブルの読み込み

以下のサンプルに示すように、テーブル名あるいはテーブルパスを用いてDeltaテーブルにアクセスします。

Python
people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)
R
people_df = tableToDF(table_name)

display(people_df)
Scala
val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)
SQL
SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

テーブルへの書き込み

Delta Lakeはテーブルへのデータの書き込みに標準的な構文を使用します。

既存のDeltaテーブルに新規データを原子性を持って追加するには、以下のサンプルのようにappendモードを使用します。

SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")

テーブルの更新

Deltaテーブルで述語にマッチするデータを更新することができます。例えば、people10mという名前のテーブル、あるいは/tmp/delta/people-10mのパスにあるテーブルにおいて、genderカラムの略語をMFからMaleFemaleに置き換えたい場合、以下を実行します。

SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

テーブルからの削除

Deltaテーブルから術後にマッチするデータを削除することができます。例えば、people10mという名前のテーブル、あるいは/tmp/delta/people-10mのパスにあるテーブルにおいて、birthDateの値が1995より小さい値を持つ人々のすべての行を削除するには以下を実行します。

SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

重要!
deleteは最新バージョンのDeltaテーブルからデータを削除しますが、古いバージョンが明示的にvacuumされるまでは物理的なストレージからは削除されません。Vacuumをご覧ください。

テーブル履歴の表示

テーブルの履歴を参照するには、テーブルバージョン、オペレーション、ユーザーなどを含む来歴情報を提供するDESCRIBE HISTORY文を使用します。

SQL
DESCRIBE HISTORY people_10m

以前のバージョンのテーブルへのクエリー(タイムトラベル)

Delta Lakeのタイムトラベルによって、Deltaテーブルの過去のスナップショットに対してクエリーを行うことができます。

テーブルの過去のバージョンにクエリーを行うには、SELECT文でバージョンあるいはタイムスタンプを指定します。例えば、上記の履歴からバージョン0に対してクエリーを行うには以下を実行します。

SQL
SELECT * FROM people_10m VERSION AS OF 0

あるいは、以下を実行します。

SQL
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

タイムスタンプに関しては、"2019-01-01""2019-01-01'T'00:00:00.000Z"のような日付とタイムスタンプの文字列のみを指定することができます。

DataFrameReaderのオプションを指定することで、以下のPythonのサンプルのように、特定バージョンのテーブルに固定されたDeltaテーブルからデータフレームを作成できるようになります。

Python
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

あるいは、

Python
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

詳細はWork with Delta Lake table historyをご覧ください。

テーブルの最適化

テーブルに対して多数の変更を加えると、大量の小規模なファイルが生成される場合があります。クエリーの読み込みスピードを改善するには、小さなファイルを大きなファイルにまとめるためにOPTIMIZEを使用することができます。

SQL
OPTIMIZE people_10m

カラムによるZ-order

さらに読み込み性能を改善するために、Z-Orderingによって関連する情報を同じファイルセットに共存させることができます。この局所性は読み込むべきデータの量を劇的に削減するために、Delta Lakeによるデータスキッピングアルゴリズムによって自動的に活用されます。データをZ-Orderするには、ZORDER BY句に並び替えるカラムを指定します。genderでZ-Orderするには以下を実行します。

SQL
OPTIMIZE people_10m
ZORDER BY (gender)

OPTIMIZEを実行する際に指定できるオプションについては、Compact data files with optimize on Delta Lakeをご覧ください。

VACUUMによるスナップショットのクリーンアップ

Delta Lakeは読み込みにおいてスナップショットのアイソレーションを提供し、これはユーザーやジョブがテーブルにクエリーを行っている際においてもOPTIMIZEを安全に実行できることを意味します。しかし、最終的には古いスナップショットを削除しなくてはならなくなります。VACUUMコマンドを実行することでこれを実現することができます。

SQL
VACUUM people_10m

VACUUMの効率的な利用法に関しては、VACUUMを用いてDeltaテーブルの未使用データを削除するをご覧ください。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0