1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Live TablesでS3のメタデータを処理する

Posted at

S3ではオブジェクトにメタデータを付与することができます。

今回はDLTのパイプラインでこのメタデータを操作してみます。

メタデータの付与

Python
import boto3

bucketname = 'ty-db-data-bucket'
key = 'dlt_data/metadata.jpg'

s3 = boto3.resource('s3', aws_access_key_id='<アクセスキーID>', aws_secret_access_key='<シークレットアクセスキー>')
s3_client = s3.meta.client

with open(filepath, 'rb') as body:
    response_put = s3_client.put_object(
        Bucket=bucketname,
        Body=body,
        Key=key,
        ServerSideEncryption='AES256',
        Metadata={
            'test': 'test metadata'
        }
    )
    print(response_put)

メタデータを確認します。

Python
response_get = s3_client.get_object(
  Bucket=bucketname,
  Key=key
)
metadata = response_get['Metadata']
print(metadata)
{'test': 'test metadata'}

DLTパイプラインの構築

シンプルな1つのテーブルのパイプラインを作成します。

Python
import dlt
import pyspark.sql.functions as F
from pyspark.sql.functions import lit

import boto3

# S3バケット
bucketname = 'ty-db-data-bucket'

メタデータを取り出すUDFを定義します。アクセスキー以外の方法を使った方がいいとは思っていますが、一旦こちらで。

Python
def get_bronze_meta_json(path: str) -> str:
  # AWSキーを用いたboto3の設定
  s3 = boto3.resource('s3', aws_access_key_id='<アクセスキーID>', aws_secret_access_key='<シークレットアクセスキー>')
  s3_client = s3.meta.client

  # メタデータの取得
  response_get = s3_client.get_object(
    Bucket=bucketname,
    Key=key
  )

  # リテラルを返します
  metadata = response_get['Metadata']
  return lit(metadata['test'])

Auto Loaderで読み込んだデータにUDFを適用します。

Python
@dlt.table(name="recordings_bronze_w_metadata")
def recordings_bronze_w_metadata():
  
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .load("dbfs:/mnt/dlt_data/dlt_data/"))

  df = df.withColumn("json_meta", get_bronze_meta_json(F.col("path")))
  return df

パイプラインの実行

1行のみの処理ですが、うまく動きました。

Screenshot 2023-05-15 at 9.28.16.png

DLTの設定でターゲットスキーマを指定しているので、結果がテーブルに格納されています。結果のテーブルも確認します。

SQL
SELECT path,json_meta FROM recordings_bronze_w_metadata

Screenshot 2023-05-15 at 9.35.43.png

ちゃんとS3のメタデータが取得できています!

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?