こちらのノートブックをウォークスルーした内容となっています。
ノートブックの日本語訳はこちらにあります。
重要!
こちらの例では、i3.xlarge(メモリ30.5GB、4コア)のシングルノードのクラスターを使用しています。こちらで示している性能値は参考値と捉えてください。
Delta Lakeとは
Delta Lakeはデータレイク上で高性能のデータ処理、データ品質の確保を可能にするストレージレイヤーソフトウェアです。Apache Parquetをベースとしたオープンなデータフォーマットを提供します。
データレイクの概要、歴史、課題に関してはこちらを参照ください。
データレイクとは、膨大な量のデータを集約し、未加工のネイティブな形式で格納するレポジトリです。ファイルやフォルダ内にデータを格納する階層型データウェアハウスと比較して、データレイクはフラットなアーキテクチャとオブジェクトストレージでデータを格納します。データをメタデータタグと一意の識別子で格納するオブジェクトストレージにより、リージョン間でのデータの検索と取得は容易で、性能を向上させます。安価なオブジェクトストレージとオープンフォーマットを活用し、データレイクは、多くのアプリケーションにおけるデータ活用を可能にします。
Delta Lakeは以下のような機能を提供することで、高性能・高信頼のデータレイクを実現します。
- SparkにおけるACIDトランザクション: シリアライズ可能なアイソレーションレベルによって、読み取り側で一貫性のないデータを参照することがなくなります。
- スケーラブルなメタデータのハンドリング: 数十億のファイルから構成されるパタバイト規模の全てのメタデータを容易に取り扱うことができる、Sparkの分散処理のパワーを活用します。
- ストリーミングとバッチの統合: Delta Lakeのテーブルはバッチテーブルであるのと同時に、ストリーミングのソース、シンクでもあります。ストリーミングのデータ取り込み、バッチによる過去データのバックフィル、インタラクティブなクエリーすべてを簡単に実行できます。
- スキーマ強制: データ取り込みの際に不正なレコードの挿入を防ぐために、自動でスキーマの変更をハンドリングします。
- タイムトラベル: データのバージョン管理によって、ロールバック、完全な履歴の監査証跡、再現可能な機械学習エクスペリメントを実現します。
- upsertとdelete: チェンジデータキャプチャやSCD(slowly-changing-operation)オペレーション、ストリーミングによるupsertなど複雑なユースケースを実現するためのmerge、update、deleteオペレーションをサポートします。
Sparkとは
ビッグデータと機械学習ワークロードに最適化された、非常に高速な分散処理フレームワークです。
SparkとDelta Lakeと組み合わせることのメリット
クラウドのオブジェクトストレージに格納されているさまざまな種類のデータを高速かつ信頼性を持って処理できるようになります。
大容量データ・多種多様なデータを高速に処理
- 容量の制限がないのでデータ量の増加にも柔軟に対応できます。
- Delta Lakeには画像、テキストなど非構造化データを含めることができるので、データの種類を問わずにデータを格納するとができます。
- SparkはネイティブでDelta Lakeのデータを読み書きできるので、大量データを簡単に並列処理することができます。
データの品質保証
- ACIDトランザクションが保証されるので、データ書き込みジョブによるデータ破損を回避できます。
- Delta Lakeのスキーマ強制によって、Sparkで取り扱うデータのスキーマを固定、あるいは必要に応じて進化させることができます。
ノートブックのウォークスルー
この例では、どのようにDelta Lakeがクエリー性能を最適化するのかを見ていきます。Parquetフォーマットを用いた標準的なテーブルを作成し、レーテンシーを観察するために簡単なクエリーを実行します。次に、標準的なテーブルとDelta Lakeテーブルの間での性能の違いを見るために、同じテーブルのDelta Lakeバージョンに対して二つ目のクエリーを実行します。
シンプルに以下の4ステップを踏んでいきます。
- Step 1 : USのフライトスケジュールデータを用いて標準的なParquetベーステーブルを作成します。
- Step 2 : 年間を通じて出発空港、月毎のフライト数を計算するクエリーを実行します。
- Step 3 : Delta Lakeを用いてフライトテーブルを作成し、テーブルの最適化を行います。
- Step 4 : ステップ2のクエリーを再実行し、レーテンシーを観察します。
環境設定
こちらでは、他のユーザーとファイルパス、データベースが重複しないようにユーザー名を含むファイルパス、データベースを準備します。
import re
from pyspark.sql.types import *
# Username を取得
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化
username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()
# ユーザー固有のデータベース名を生成します
db_name = f"databricks_handson_{username}"
# ファイル格納パス
work_path = f"dbfs:/tmp/databricks_handson/{username}/delta_optimization"
# データベースの準備
spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"USE {db_name}")
# データベースを表示。
print(f"database_name: {db_name}")
print(f"path_name: {work_path}")
ステップ0: フライトデータの読み込み
flights = spark.read.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/databricks-datasets/asa/airlines/2008.csv")
flights.count()
約700万レコード、700MBのデータとなっています。
ステップ1: フライトデータを用いたParquetテーブルの書き込み
カラムOrigin
でParquetのパーティション(Sparkのパーティションとは異なります)を作成し、ディスクに書き込みます。
flights.write.format("parquet").mode("overwrite").partitionBy("Origin").save(f"{work_path}/flights_parquet")
ステップ1が完了すると、"flights"テーブルには年を通じたUSのフライト詳細が含まれます。
次にステップ2では、週の初日のフライト数の月間合計に基づくトップ20の都市を取得するクエリーを実行します。
ステップ2: クエリー実行
from pyspark.sql.functions import count
flights_parquet = spark.read.format("parquet").load(f"{work_path}/flights_parquet")
display(flights_parquet.filter("DayOfWeek = 1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))
ステップ2が完了すると、標準的な"flights_parquet"テーブルにおけるレーテンシーを観測することができます。
ステップ3とステップ4においては、Deltaテーブルで同じことを行います。今回はクエリーを実行する前に、検索を高速化するためにデータを最適化するためにOPTIMIZE
とZORDER
を実行します。
ステップ3: フライトデータを用いたDeltaテーブルの書き込み
flights.write.format("delta").mode("overwrite").partitionBy("Origin").save(f"{work_path}/flights_delta")
ステップ3(続き): Databricks DeltaテーブルのOPTIMIZE
Z-Orderingは関連する情報を同じファイルセットに配置するパフォーマンスチューニングのテクニックです。Delta Lakeのデータスキッピングアルゴリズムは読み取る必要があるデータ量を劇的に削減するために、この局所性(locality)を自動で活用します。データをZ-Orderするには、ZORDER BY
句に並び替えを行うカラムを指定します。
参考資料: Databricksにおけるデータファイル管理によるパフォーマンスの最適化 - Qiita
display(spark.sql("DROP TABLE IF EXISTS flights"))
display(spark.sql(f"CREATE TABLE flights USING DELTA LOCATION '{work_path}/flights_delta'"))
display(spark.sql("OPTIMIZE flights ZORDER BY (DayofWeek)"))
ステップ4: ステップ2のクエリーを実行してレーテンシーを測定
flights_delta = spark.read.format("delta").load(f"{work_path}/flights_delta")
display(flights_delta.filter("DayOfWeek = 1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))
OPTIMIZE
の実行後は、Deltaテーブルに対するクエリーが非常に高速になりました。クエリーをどのくらい高速化できるかは、処理を実行するクラスターに依存しますが、標準的なテーブルと比較して5-10倍の高速化を実現することができます。
今回の例ではOPTIMIZE
を行うことで、クエリーの時間は1.59分
から17秒
になり、約5.6倍高速になっています。