1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta LakeによるSparkクエリーの高速化

Posted at

こちらのノートブックをウォークスルーした内容となっています。

ノートブックの日本語訳はこちらにあります。

重要!
こちらの例では、i3.xlarge(メモリ30.5GB、4コア)のシングルノードのクラスターを使用しています。こちらで示している性能値は参考値と捉えてください。

Delta Lakeとは

Delta Lakeはデータレイク上で高性能のデータ処理、データ品質の確保を可能にするストレージレイヤーソフトウェアです。Apache Parquetをベースとしたオープンなデータフォーマットを提供します。

データレイクの概要、歴史、課題に関してはこちらを参照ください。

データレイクとは、膨大な量のデータを集約し、未加工のネイティブな形式で格納するレポジトリです。ファイルやフォルダ内にデータを格納する階層型データウェアハウスと比較して、データレイクはフラットなアーキテクチャとオブジェクトストレージでデータを格納します。データをメタデータタグと一意の識別子で格納するオブジェクトストレージにより、リージョン間でのデータの検索と取得は容易で、性能を向上させます。安価なオブジェクトストレージとオープンフォーマットを活用し、データレイクは、多くのアプリケーションにおけるデータ活用を可能にします。

Delta Lakeは以下のような機能を提供することで、高性能・高信頼のデータレイクを実現します。

  • SparkにおけるACIDトランザクション: シリアライズ可能なアイソレーションレベルによって、読み取り側で一貫性のないデータを参照することがなくなります。
  • スケーラブルなメタデータのハンドリング: 数十億のファイルから構成されるパタバイト規模の全てのメタデータを容易に取り扱うことができる、Sparkの分散処理のパワーを活用します。
  • ストリーミングバッチの統合: Delta Lakeのテーブルはバッチテーブルであるのと同時に、ストリーミングのソース、シンクでもあります。ストリーミングのデータ取り込み、バッチによる過去データのバックフィル、インタラクティブなクエリーすべてを簡単に実行できます。
  • スキーマ強制: データ取り込みの際に不正なレコードの挿入を防ぐために、自動でスキーマの変更をハンドリングします。
  • タイムトラベル: データのバージョン管理によって、ロールバック、完全な履歴の監査証跡、再現可能な機械学習エクスペリメントを実現します。
  • upsertdelete: チェンジデータキャプチャやSCD(slowly-changing-operation)オペレーション、ストリーミングによるupsertなど複雑なユースケースを実現するためのmerge、update、deleteオペレーションをサポートします。

Sparkとは

ビッグデータと機械学習ワークロードに最適化された、非常に高速な分散処理フレームワークです。

SparkとDelta Lakeと組み合わせることのメリット

クラウドのオブジェクトストレージに格納されているさまざまな種類のデータを高速かつ信頼性を持って処理できるようになります。

大容量データ・多種多様なデータを高速に処理

  • 容量の制限がないのでデータ量の増加にも柔軟に対応できます。
  • Delta Lakeには画像、テキストなど非構造化データを含めることができるので、データの種類を問わずにデータを格納するとができます。
  • SparkはネイティブでDelta Lakeのデータを読み書きできるので、大量データを簡単に並列処理することができます。

データの品質保証

  • ACIDトランザクションが保証されるので、データ書き込みジョブによるデータ破損を回避できます。
  • Delta Lakeのスキーマ強制によって、Sparkで取り扱うデータのスキーマを固定、あるいは必要に応じて進化させることができます。

ノートブックのウォークスルー

この例では、どのようにDelta Lakeがクエリー性能を最適化するのかを見ていきます。Parquetフォーマットを用いた標準的なテーブルを作成し、レーテンシーを観察するために簡単なクエリーを実行します。次に、標準的なテーブルとDelta Lakeテーブルの間での性能の違いを見るために、同じテーブルのDelta Lakeバージョンに対して二つ目のクエリーを実行します。

シンプルに以下の4ステップを踏んでいきます。

  • Step 1 : USのフライトスケジュールデータを用いて標準的なParquetベーステーブルを作成します。
  • Step 2 : 年間を通じて出発空港、月毎のフライト数を計算するクエリーを実行します。
  • Step 3 : Delta Lakeを用いてフライトテーブルを作成し、テーブルの最適化を行います。
  • Step 4 : ステップ2のクエリーを再実行し、レーテンシーを観察します。

環境設定

こちらでは、他のユーザーとファイルパス、データベースが重複しないようにユーザー名を含むファイルパス、データベースを準備します。

Python
import re
from pyspark.sql.types import * 

# Username を取得
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化
username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()

# ユーザー固有のデータベース名を生成します
db_name = f"databricks_handson_{username}"

# ファイル格納パス
work_path = f"dbfs:/tmp/databricks_handson/{username}/delta_optimization"

# データベースの準備
spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"USE {db_name}")

# データベースを表示。
print(f"database_name: {db_name}")
print(f"path_name: {work_path}")

Screen Shot 2022-07-06 at 14.37.59.png

ステップ0: フライトデータの読み込み

Python
flights = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("/databricks-datasets/asa/airlines/2008.csv")
Python
flights.count()

Screen Shot 2022-07-06 at 14.39.21.png

約700万レコード、700MBのデータとなっています。

ステップ1: フライトデータを用いたParquetテーブルの書き込み

カラムOriginでParquetのパーティション(Sparkのパーティションとは異なります)を作成し、ディスクに書き込みます。

Python
flights.write.format("parquet").mode("overwrite").partitionBy("Origin").save(f"{work_path}/flights_parquet")

Screen Shot 2022-07-06 at 14.40.11.png

ステップ1が完了すると、"flights"テーブルには年を通じたUSのフライト詳細が含まれます。

次にステップ2では、週の初日のフライト数の月間合計に基づくトップ20の都市を取得するクエリーを実行します。

ステップ2: クエリー実行

Python
from pyspark.sql.functions import count

flights_parquet = spark.read.format("parquet").load(f"{work_path}/flights_parquet")

display(flights_parquet.filter("DayOfWeek = 1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))

Screen Shot 2022-07-06 at 14.41.15.png

ステップ2が完了すると、標準的な"flights_parquet"テーブルにおけるレーテンシーを観測することができます。

ステップ3とステップ4においては、Deltaテーブルで同じことを行います。今回はクエリーを実行する前に、検索を高速化するためにデータを最適化するためにOPTIMIZEZORDERを実行します。

ステップ3: フライトデータを用いたDeltaテーブルの書き込み

Python
flights.write.format("delta").mode("overwrite").partitionBy("Origin").save(f"{work_path}/flights_delta")

Screen Shot 2022-07-06 at 14.42.10.png

ステップ3(続き): Databricks DeltaテーブルのOPTIMIZE

Z-Orderingは関連する情報を同じファイルセットに配置するパフォーマンスチューニングのテクニックです。Delta Lakeのデータスキッピングアルゴリズムは読み取る必要があるデータ量を劇的に削減するために、この局所性(locality)を自動で活用します。データをZ-Orderするには、ZORDER BY句に並び替えを行うカラムを指定します。

参考資料: Databricksにおけるデータファイル管理によるパフォーマンスの最適化 - Qiita

Python
display(spark.sql("DROP TABLE IF EXISTS flights"))

display(spark.sql(f"CREATE TABLE flights USING DELTA LOCATION '{work_path}/flights_delta'"))
                  
display(spark.sql("OPTIMIZE flights ZORDER BY (DayofWeek)"))

Screen Shot 2022-07-06 at 14.43.19.png

ステップ4: ステップ2のクエリーを実行してレーテンシーを測定

Python
flights_delta = spark.read.format("delta").load(f"{work_path}/flights_delta")

display(flights_delta.filter("DayOfWeek = 1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))

Screen Shot 2022-07-06 at 14.44.09.png

OPTIMIZEの実行後は、Deltaテーブルに対するクエリーが非常に高速になりました。クエリーをどのくらい高速化できるかは、処理を実行するクラスターに依存しますが、標準的なテーブルと比較して5-10倍の高速化を実現することができます。

今回の例ではOPTIMIZEを行うことで、クエリーの時間は1.59分から17秒になり、約5.6倍高速になっています。

Databricks 無料トライアル

Databricks 無料トライアル

1
1
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?