LoginSignup
1
0

More than 1 year has passed since last update.

AWS Glueでデータレイクフレームワーク(Delta Lake)を試してみた

Posted at

背景・目的

Glueでいくつかのデータレイクフレームワークが利用できますが、それぞれどのような特徴かわからなかったので整理しつつ、
簡単に触ってみたいと思います。ここでは、Delta Lakeを扱います。

まとめ

  • Delta Lakeでは、ACIDトランザクションをサポートしている。
  • ストリーミングもサポートしている。
  • タイムトラベルを提供している。

概要

AWS Glue での Delta Lake フレームワークの使用

Glue3.0以降で、Delta Lakeフレームワークが使用できる。
サポートされているバージョンは、下記の通り。

Glueのバージョ Delta Lakeバージョン
4.0 2.1.0
3.0 1.0.0

Introduction

Delta Lake は、データ レイク上に Lakehouse アーキテクチャを構築できるOSSプロジェクトとのこと。
下記を提供し、S3、ADLS、HDFS、GCSなどの既存データレイク上でストリーミングとバッチデータ処理を統合する。

  • ACID トランザクション
  • スケーラブルなメタデータ処理

Delta Lakeは下記を提供する。

  • SparkでのACIDトランザクション
    • シリアル化可能な分離レベルにより、読み込みで一貫性のないデータを確認することはない。
  • スケーラブルなメタデータ処理
    • Sparkの分散処理を活用して、数十億のファイルを持つPB規模のテーブルの全てのメタデータを簡単に処理する
  • ストリーミングとバッチの統合
    • Delta Lakeのテーブルはバッチテーブルであり、ストリーミングソースとのシンクでもあるとのこと。
    • ストリーミングデータの取り込み、履歴のバッチバックフィル、インタラクティブなクエリは全て、すぐに利用できる。
  • スキーマの適用
    • スキーマのバリエーションを自動的に処理して、取り込み中に不正なレコードが挿入されるのを防ぐ
  • タイムトラベル
    • データのバージョン管理により、ロールバック、完全な履歴監査証跡、再現可能な機械学習実験が可能
  • Upsertと削除
    • マージ、更新、削除操作をサポートし、変更データキャプチャ、Upsertのストリーミングなどの複雑なユースケースを有効にする。

Concurrency control

Optimistic concurrency control

楽観的同時実行制御を使用し、書き込み間のトランザクション保証を提供している。
下記の3つのフェーズで書き込みが行われる。

  1. 読み取り
    • 使用可能な最新バージョンのテーブルを読み取り、変更が必要なファイルを識別する。
  2. 書き込み
    • 新しいファイルを書き込むことですべての変更をステージングにする
  3. 検証とコミット
    • 変更をコミットする前に、提案された変更が読み取られたスナップショット以降に同時にコミットされた可能性のある他の変更と競合するか確認する。
    • 競合がない場合、ステージングされたすべての変更は新しいバージョン管理されたスナップショットとしてコミットされ、書き込みは成功する。競合がある場合、例外により書き込みに失敗する。

実践

Delta Lakeを使ってみます。

事前準備

  1. S3バケットを作成し、下記のテストデータをアップロードします。

    {"id":"00001","value":"test1"}
    {"id":"00002","value":"test2"}
    {"id":"00003","value":"test3"}
    

    image.png

ジョブの作成(新規作成)

ジョブの設定

作成したデータを元に、Delta Lakeフォーマットで出力するジョブを作成します。

  1. Glue SutdioでCreateをクリックします。
    image.png

  2. 下記のようなジョブを作成します。
    image.png

  3. 下記の設定を入力します。

    • Formatには、Delta Lake
    • Compression Typeには、Snappy
    • Data Catalog update optionsには、Do not update the data Catalog
      image.png
  4. 生成されたコードを確認してみます。

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
        format_options={"multiline": False},
        connection_type="s3",
        format="json",
        connection_options={"paths": ["s3://XXXXXX/input/"], "recurse": True},
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[("id", "string", "id", "string"), ("value", "string", "value", "string")],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Script generated for node S3 bucket
    additional_options = {
        "path": "s3://XXXXXX/output/",
        "write.parquet.compression-codec": "snappy",
    }
    S3bucket_node3_df = ApplyMapping_node2.toDF()
    S3bucket_node3_df.write.format("delta").options(**additional_options).mode(
        "append"
    ).save()
    
    job.commit()
    
    

ジョブの実行

  1. Runをクリックしジョブを実行し、Succeededを確認します。
    image.png

  2. 出力先のパスを確認します。下記のファイルが出来上がっていました。

    • _delta_log_folder/
    • _delta_log/
    • データファイル(part-0000・・・)
      image.png
  3. データファイルを確認します。入力データ全て出力されています。
    image.png

  4. delta_log下に出力されていたログを確認します。下記の内容が出力されていました。

    • protocol
    • metadata
    • operation
    • commitInfo
      image.png
    {
      "protocol": {
        "minReaderVersion": 1,
        "minWriterVersion": 2
      }
    }
    {
      "metaData": {
        "id": "1439af20-f1e2-4411-913e-82d6e6fb3686",
        "format": {
          "provider": "parquet",
          "options": {}
        },
        "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
        "partitionColumns": [],
        "configuration": {},
        "createdTime": 1682175562496
      }
    }
    {
      "add": {
        "path": "part-00000-1cdab5e0-aaaa-4350-99f0-67841b0a31fe-c000.snappy.parquet",
        "partitionValues": {},
        "size": 685,
        "modificationTime": 1682175575000,
        "dataChange": true,
        "stats": "{\"numRecords\":3,\"minValues\":{\"id\":\"00001\",\"value\":\"test1\"},\"maxValues\":{\"id\":\"00003\",\"value\":\"test3\"},\"nullCount\":{\"id\":0,\"value\":0}}"
      }
    }
    {
      "commitInfo": {
        "timestamp": 1682175575987,
        "operation": "WRITE",
        "operationParameters": {
          "mode": "Append",
          "partitionBy": "[]"
        },
        "isolationLevel": "Serializable",
        "isBlindAppend": true,
        "operationMetrics": {
          "numFiles": "1",
          "numOutputRows": "3",
          "numOutputBytes": "685"
        },
        "engineInfo": "Apache-Spark/3.3.0-amzn-1 Delta-Lake/2.1.0",
        "txnId": "76ad7b0c-3d2b-496f-96dd-e9d9017f0917"
      }
    }
    
  5. _delta_log_folderはファイルでした。特に何も出力されていませんでした。

ジョブの作成(Upsert)

現在、出力されたDelta Lakeは、下記の状態です。

key value
00001 test1
00002 test2
00003 test3

この出力データに対して、追加と削除を試してみます。

データの準備

  1. 下記のデータをインプットのS3バケットに配置します。
    {"id":"00002","value":"update"}
    {"id":"00004","value":"test4"}
    
  2. キーが重複してしまうので元のファイルを削除しておきます。(ブックマークが無効のため)
    image.png

最終的に、下記の結果となることを期待しています。

key value 備考
00001 test1
00002 update ※更新
00003 test3
00004 test4 ※追加

ジョブの設定

  1. 下記の実装を行います。
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue import DynamicFrame
    from delta.tables import DeltaTable
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    
    ## JSON
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
        format_options={"multiline": False},
        connection_type="s3",
        format="json",
        connection_options={"paths": ["s3://XXXXXXX/input/"], "recurse": True},
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[("id", "string", "id", "string"), ("value", "string", "value", "string")],
        transformation_ctx="ApplyMapping_node2",
    )
    
    ## Delta Lake
    
    delta_df = DeltaTable.forPath(spark,"s3://XXXXXXX/output/")
    
    df = ApplyMapping_node2.toDF()
    
    ## Output(UpSert)
    
    delta_df.alias("delta").merge(source = df.alias("input"),
        condition = "delta.id = input.id " 
        ).whenMatchedUpdate(set={
            "value":"input.value"
        }).whenNotMatchedInsert(values ={
            "id" : "input.id" ,
            "value" : "input.value"
        }).execute()
    
    
    
    job.commit()
    

ジョブの実行

  1. Runをクリックしジョブを実行し、Succeededを確認します。
    image.png

  2. 新しくParquetくファイルが1つできました。
    image.png

  3. Parquetファイルを確認します。想定どおりです。
    image.png

  4. delta_log下にファイルが一つ作成されていました。
    image.png

  5. delta_logを確認します。

    {
      "remove": {
        "path": "part-00000-1cdab5e0-aaaa-4350-99f0-67841b0a31fe-c000.snappy.parquet",
        "deletionTimestamp": 1682180441784,
        "dataChange": true,
        "extendedFileMetadata": true,
        "partitionValues": {},
        "size": 685
      }
    }
    {
      "add": {
        "path": "part-00000-1c1062d9-f2fb-496d-be97-af6858debedc-c000.snappy.parquet",
        "partitionValues": {},
        "size": 703,
        "modificationTime": 1682180442000,
        "dataChange": true,
        "stats": "{\"numRecords\":4,\"minValues\":{\"id\":\"00001\",\"value\":\"test1\"},\"maxValues\":{\"id\":\"00004\",\"value\":\"update\"},\"nullCount\":{\"id\":0,\"value\":0}}"
      }
    }
    {
      "commitInfo": {
        "timestamp": 1682180441832,
        "operation": "MERGE",
        "operationParameters": {
          "predicate": "(delta.id = input.id)",
          "matchedPredicates": "[{\"actionType\":\"update\"}]",
          "notMatchedPredicates": "[{\"actionType\":\"insert\"}]"
        },
        "readVersion": 0,
        "isolationLevel": "Serializable",
        "isBlindAppend": false,
        "operationMetrics": {
          "numTargetRowsCopied": "2",
          "numTargetRowsDeleted": "0",
          "numTargetFilesAdded": "1",
          "executionTimeMs": "7546",
          "numTargetRowsInserted": "1",
          "scanTimeMs": "5126",
          "numTargetRowsUpdated": "1",
          "numOutputRows": "4",
          "numTargetChangeFilesAdded": "0",
          "numSourceRows": "2",
          "numTargetFilesRemoved": "1",
          "rewriteTimeMs": "2278"
        },
        "engineInfo": "Apache-Spark/3.3.0-amzn-1 Delta-Lake/2.1.0",
        "txnId": "9e22e8a4-ee37-4e3f-bc02-be3acc69ba75"
      }
    }
    

考察

今回、Delta Lakeを使用してUpsertを試してみました。データレイクでマスターデータの管理を行う際に有効だと感じました。
今後は、タイムトラベルや大量処理などの性能を確認してみたいと思います。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0