背景・目的
Glueでいくつかのデータレイクフレームワークが利用できますが、それぞれどのような特徴かわからなかったので整理しつつ、
簡単に触ってみたいと思います。ここでは、Delta Lakeを扱います。
まとめ
- Delta Lakeでは、ACIDトランザクションをサポートしている。
- ストリーミングもサポートしている。
- タイムトラベルを提供している。
概要
AWS Glue での Delta Lake フレームワークの使用
Glue3.0以降で、Delta Lakeフレームワークが使用できる。
サポートされているバージョンは、下記の通り。
Glueのバージョ | Delta Lakeバージョン |
---|---|
4.0 | 2.1.0 |
3.0 | 1.0.0 |
Introduction
Delta Lake は、データ レイク上に Lakehouse アーキテクチャを構築できるOSSプロジェクトとのこと。
下記を提供し、S3、ADLS、HDFS、GCSなどの既存データレイク上でストリーミングとバッチデータ処理を統合する。
- ACID トランザクション
- スケーラブルなメタデータ処理
Delta Lakeは下記を提供する。
- SparkでのACIDトランザクション
- シリアル化可能な分離レベルにより、読み込みで一貫性のないデータを確認することはない。
- スケーラブルなメタデータ処理
- Sparkの分散処理を活用して、数十億のファイルを持つPB規模のテーブルの全てのメタデータを簡単に処理する
- ストリーミングとバッチの統合
- Delta Lakeのテーブルはバッチテーブルであり、ストリーミングソースとのシンクでもあるとのこと。
- ストリーミングデータの取り込み、履歴のバッチバックフィル、インタラクティブなクエリは全て、すぐに利用できる。
- スキーマの適用
- スキーマのバリエーションを自動的に処理して、取り込み中に不正なレコードが挿入されるのを防ぐ
- タイムトラベル
- データのバージョン管理により、ロールバック、完全な履歴監査証跡、再現可能な機械学習実験が可能
- Upsertと削除
- マージ、更新、削除操作をサポートし、変更データキャプチャ、Upsertのストリーミングなどの複雑なユースケースを有効にする。
Concurrency control
Optimistic concurrency control
楽観的同時実行制御を使用し、書き込み間のトランザクション保証を提供している。
下記の3つのフェーズで書き込みが行われる。
- 読み取り
- 使用可能な最新バージョンのテーブルを読み取り、変更が必要なファイルを識別する。
- 書き込み
- 新しいファイルを書き込むことですべての変更をステージングにする
- 検証とコミット
- 変更をコミットする前に、提案された変更が読み取られたスナップショット以降に同時にコミットされた可能性のある他の変更と競合するか確認する。
- 競合がない場合、ステージングされたすべての変更は新しいバージョン管理されたスナップショットとしてコミットされ、書き込みは成功する。競合がある場合、例外により書き込みに失敗する。
実践
Delta Lakeを使ってみます。
事前準備
-
S3バケットを作成し、下記のテストデータをアップロードします。
{"id":"00001","value":"test1"} {"id":"00002","value":"test2"} {"id":"00003","value":"test3"}
ジョブの作成(新規作成)
ジョブの設定
作成したデータを元に、Delta Lakeフォーマットで出力するジョブを作成します。
-
下記の設定を入力します。
-
生成されたコードを確認してみます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_options( format_options={"multiline": False}, connection_type="s3", format="json", connection_options={"paths": ["s3://XXXXXX/input/"], "recurse": True}, transformation_ctx="S3bucket_node1", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[("id", "string", "id", "string"), ("value", "string", "value", "string")], transformation_ctx="ApplyMapping_node2", ) # Script generated for node S3 bucket additional_options = { "path": "s3://XXXXXX/output/", "write.parquet.compression-codec": "snappy", } S3bucket_node3_df = ApplyMapping_node2.toDF() S3bucket_node3_df.write.format("delta").options(**additional_options).mode( "append" ).save() job.commit()
ジョブの実行
-
出力先のパスを確認します。下記のファイルが出来上がっていました。
-
delta_log下に出力されていたログを確認します。下記の内容が出力されていました。
{ "protocol": { "minReaderVersion": 1, "minWriterVersion": 2 } } { "metaData": { "id": "1439af20-f1e2-4411-913e-82d6e6fb3686", "format": { "provider": "parquet", "options": {} }, "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {}, "createdTime": 1682175562496 } } { "add": { "path": "part-00000-1cdab5e0-aaaa-4350-99f0-67841b0a31fe-c000.snappy.parquet", "partitionValues": {}, "size": 685, "modificationTime": 1682175575000, "dataChange": true, "stats": "{\"numRecords\":3,\"minValues\":{\"id\":\"00001\",\"value\":\"test1\"},\"maxValues\":{\"id\":\"00003\",\"value\":\"test3\"},\"nullCount\":{\"id\":0,\"value\":0}}" } } { "commitInfo": { "timestamp": 1682175575987, "operation": "WRITE", "operationParameters": { "mode": "Append", "partitionBy": "[]" }, "isolationLevel": "Serializable", "isBlindAppend": true, "operationMetrics": { "numFiles": "1", "numOutputRows": "3", "numOutputBytes": "685" }, "engineInfo": "Apache-Spark/3.3.0-amzn-1 Delta-Lake/2.1.0", "txnId": "76ad7b0c-3d2b-496f-96dd-e9d9017f0917" } }
-
_delta_log_folderはファイルでした。特に何も出力されていませんでした。
ジョブの作成(Upsert)
現在、出力されたDelta Lakeは、下記の状態です。
key | value |
---|---|
00001 | test1 |
00002 | test2 |
00003 | test3 |
この出力データに対して、追加と削除を試してみます。
データの準備
- 下記のデータをインプットのS3バケットに配置します。
{"id":"00002","value":"update"} {"id":"00004","value":"test4"}
- キーが重複してしまうので元のファイルを削除しておきます。(ブックマークが無効のため)
最終的に、下記の結果となることを期待しています。
key | value | 備考 |
---|---|---|
00001 | test1 | |
00002 | update | ※更新 |
00003 | test3 | |
00004 | test4 | ※追加 |
ジョブの設定
- 下記の実装を行います。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue import DynamicFrame from delta.tables import DeltaTable args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) ## JSON # Script generated for node S3 bucket S3bucket_node1 = glueContext.create_dynamic_frame.from_options( format_options={"multiline": False}, connection_type="s3", format="json", connection_options={"paths": ["s3://XXXXXXX/input/"], "recurse": True}, transformation_ctx="S3bucket_node1", ) # Script generated for node ApplyMapping ApplyMapping_node2 = ApplyMapping.apply( frame=S3bucket_node1, mappings=[("id", "string", "id", "string"), ("value", "string", "value", "string")], transformation_ctx="ApplyMapping_node2", ) ## Delta Lake delta_df = DeltaTable.forPath(spark,"s3://XXXXXXX/output/") df = ApplyMapping_node2.toDF() ## Output(UpSert) delta_df.alias("delta").merge(source = df.alias("input"), condition = "delta.id = input.id " ).whenMatchedUpdate(set={ "value":"input.value" }).whenNotMatchedInsert(values ={ "id" : "input.id" , "value" : "input.value" }).execute() job.commit()
ジョブの実行
-
delta_logを確認します。
{ "remove": { "path": "part-00000-1cdab5e0-aaaa-4350-99f0-67841b0a31fe-c000.snappy.parquet", "deletionTimestamp": 1682180441784, "dataChange": true, "extendedFileMetadata": true, "partitionValues": {}, "size": 685 } } { "add": { "path": "part-00000-1c1062d9-f2fb-496d-be97-af6858debedc-c000.snappy.parquet", "partitionValues": {}, "size": 703, "modificationTime": 1682180442000, "dataChange": true, "stats": "{\"numRecords\":4,\"minValues\":{\"id\":\"00001\",\"value\":\"test1\"},\"maxValues\":{\"id\":\"00004\",\"value\":\"update\"},\"nullCount\":{\"id\":0,\"value\":0}}" } } { "commitInfo": { "timestamp": 1682180441832, "operation": "MERGE", "operationParameters": { "predicate": "(delta.id = input.id)", "matchedPredicates": "[{\"actionType\":\"update\"}]", "notMatchedPredicates": "[{\"actionType\":\"insert\"}]" }, "readVersion": 0, "isolationLevel": "Serializable", "isBlindAppend": false, "operationMetrics": { "numTargetRowsCopied": "2", "numTargetRowsDeleted": "0", "numTargetFilesAdded": "1", "executionTimeMs": "7546", "numTargetRowsInserted": "1", "scanTimeMs": "5126", "numTargetRowsUpdated": "1", "numOutputRows": "4", "numTargetChangeFilesAdded": "0", "numSourceRows": "2", "numTargetFilesRemoved": "1", "rewriteTimeMs": "2278" }, "engineInfo": "Apache-Spark/3.3.0-amzn-1 Delta-Lake/2.1.0", "txnId": "9e22e8a4-ee37-4e3f-bc02-be3acc69ba75" } }
考察
今回、Delta Lakeを使用してUpsertを試してみました。データレイクでマスターデータの管理を行う際に有効だと感じました。
今後は、タイムトラベルや大量処理などの性能を確認してみたいと思います。
参考