1
0

More than 1 year has passed since last update.

Databricks ( Spark ) にておける File metadata column (ファイル メタデータ列) の利用方法

Last updated at Posted at 2022-11-08

概要

Databricks Runtime 10.5 以降で利用できるようになった File metadata column (ファイル メタデータ列) の利用方法を共有します。

2022年11月8日時点では、下記のような項目を取得できます。

image.png

引用元:ファイル メタデータ列 - Azure Databricks | Microsoft Learn

実際に取得できる値は、下記にあるような項目です。監査列を付与する際に有用であり、特にデータインジェスト日を付与する場合には_metadata.file_modification_timeを利用する方針がよさそうです。

image.png

注意事項

1. _metadata 列をそのまま保持することは非推奨

下記のように記載がある通り、_metadata 列をそのままテーブルに保持させることが非推奨となっております。

image.png

引用元:ファイル メタデータ列 - Azure Databricks | Microsoft Learn

必要なカラムのみを取得して、_metadata列をDropしたほうがよさそうです。

df_5 = (
    df
    .withColumn('_metadata', df['_metadata'])
)
 
df_5 = (
    df_5
    .withColumn('_metadata_file_path', df_5['_metadata.file_path'])
    .withColumn('_metadata_file_modification_time', df_5['_metadata.file_modification_time'])
    .drop('_metadata')
)
 
df_5.display()

image.png

2. withColumn メソッドの連続によりカラムを設定するとエラーとなる場合があること

下記のように withColumn メソッドを連続して記載すると、_metadataが見つからないというエラーがでる場合があります。

df_4 = (
    df
    .withColumn('_metadata', df['_metadata'])
    .withColumn('_metadata_file_path', df['_metadata.file_path'])
    .withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)

df_4.display()

AnalysisException: Resolved attribute(s) _metadata#2119 missing from p_partkey#2073L,p_name#2074,p_mfgr#2075,p_brand#2076,p_type#2077,p_size#2078,p_container#2079,p_retailprice#2080,p_comment#2081,_metadata#2333 in operator !Project [p_partkey#2073L, p_name#2074, p_mfgr#2075, p_brand#2076, p_type#2077, p_size#2078, p_container#2079, p_retailprice#2080, p_comment#2081, _metadata#2333, _metadata#2119.file_path AS _metadata_file_path#2346]. Attribute(s) with the same name appear in the operation: _metadata. Please check if the right attribute(s) are used.;

image.png

対応方法としては2通りあり、実施方法を後述します。

  1. selectメソッドにより _metadata列を実体化する方法
  2. withColumnメソッドを分割する方法

2-1. エラーへの対応方法

0. 事前準備
# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"

schema = """
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string
"""

df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("sep", "|")
       .load(filepath)
    )

df.display()

image.png

1. selectメソッドにより _metadata列を実体化する方法
df_4 = (
    df
    .select('*','_metadata')
    .withColumn('_metadata_file_path', df['_metadata.file_path'])
    .withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)
 
df_4.display()

image.png

2. withColumnメソッドを分割する方法
df_5 = (
    df
    .withColumn('_metadata', df['_metadata'])
)
 
df_5 = (
    df_5
    .withColumn('_metadata_file_path', df_5['_metadata.file_path'])
    .withColumn('_metadata_file_modification_time', df_5['_metadata.file_modification_time'])
    .drop('_metadata')
)
 
df_5.display()

image.png

3. _metadata 列のまま、Pandas データフレームに変換した場合に、file_modification_time列が正常に変換できない場合があること

下記のように変換がうまくいかないことがあるため、単一列化する方がよさそうです。

filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/"
schema = """
  N_NATIONKEY  integer
  ,N_NAME       string
  ,N_REGIONKEY  integer 
  ,N_COMMENT    string
"""
df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("inferSchema", "True")
       .option("sep", "|")
       .load(filepath)
    )
df = df.select('_metadata')

pdf = df.toPandas()
pdf.display()

java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2014,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=22,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=9,HOUR_OF_DAY=21,MINUTE=54,SECOND=56,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]

image.png

Spark df -> Pandas df-> Spark df で変換した際にはエラーにはならないため、_metadata 列の仕様のようです。

from pyspark.sql import Row
from decimal import *
import datetime
import pprint
 
sample_Data = [
    Row(
        struct_column    = Row(
            string_column    = 'AAA', 
            byte_column      = 1, 
            integer_column   = 1, 
            bigint_column    = 1, 
            float_column     = 12.300000190734863, 
            double_column    = 12.3, 
            numeric_column   = Decimal('12'), 
            boolean_column   = True, 
            date_column      = datetime.date(2020, 1, 1), 
            timestamp_column = datetime.datetime(2021, 1, 1, 0, 0), 
            binary_column    = bytearray(b'A'), 
            struct_column    = Row(
                struct_string_column = 'AAA', 
                struct_int_column    = 1
            ), 
            array_column     = ['AAA', 'BBB', 'CCC'], 
            map_column       = {'AAA': 1}
        ), 
    )
]
 
df = spark.createDataFrame(sample_Data)
pdf = df.toPandas()
df_2 = spark.createDataFrame(pdf)
df_2.display()

image.png

3. File metadata column (ファイル メタデータ列) の利用例

1. Spark データフレームで利用する方法

# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"
 
schema = """
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string
"""
 
df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("sep", "|")
       .load(filepath)
    )
 
df.display()

image.png

df_4 = (
    df
    .select('*','_metadata')
    .withColumn('_metadata_file_path', df['_metadata.file_path'])
    .withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)
 
df_4.display()

image.png

2. Spark SQL で利用する方法

# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"
 
schema = """
  p_partkey long,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size int,
  p_container string,
  p_retailprice decimal(12, 2),
  p_comment string
"""
 
df = (spark
       .read
       .format("csv")
       .schema(schema)
       .option("sep", "|")
       .load(filepath)
    )
 
df.display()

image.png

df_6 = (
    df
    .withColumn('_metadata', df['_metadata'])
)
 
df_6.createOrReplaceTempView('_tmp')

image.png

%sql
SELECT
  *
  FROM
    _tmp

image.png

3. COPY INTO で利用する方法

db_name = '_metadata_test'
tbl_name = 'part'

spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
spark.sql(f'DROP TABLE IF EXISTS {db_name}.{tbl_name}')
spark.sql(f'''
CREATE TABLE IF NOT EXISTS {db_name}.{tbl_name}
(
  p_partkey String,
  p_name string,
  p_mfgr string,
  p_brand string,
  p_type string,
  p_size String,
  p_container string,
  p_retailprice String,
  p_comment string,
  _metadata_file_path    string,
  _metadata_file_modification_time timestamp
)
USING delta
''')

image.png

spark.sql(f'''
COPY INTO {db_name}.{tbl_name}
  FROM  (
    SELECT
      _c0 AS p_partkey
      ,_c1 AS p_name
      ,_c2 AS p_mfgr
      ,_c3 AS p_brand
      ,_c4 AS p_type
      ,_c5 AS p_size
      ,_c6 AS p_container
      ,_c7 AS p_retailprice
      ,_c8 AS p_comment
      ,_metadata.file_path AS _metadata_file_path
      ,_metadata.file_modification_time AS _metadata_file_modification_time
      FROM
        'dbfs:/databricks-datasets/tpch/data-001/part'
  )
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
    'mergeSchema' = 'true'
    ,'ignoreCorruptFiles' = 'false'
    ,'sep' = '|'
    ,'inferSchema' = 'false'
  )
  COPY_OPTIONS (
    'mergeSchema' = 'true'
--     ,'force' = 'true'
  )
''')

image.png

spark.table(f'{db_name}.{tbl_name}').display()

image.png

spark.sql(f'DROP DATABASE {db_name}')

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0