概要
Databricks Runtime 10.5 以降で利用できるようになった File metadata column (ファイル メタデータ列) の利用方法を共有します。
2022年11月8日時点では、下記のような項目を取得できます。
引用元:ファイル メタデータ列 - Azure Databricks | Microsoft Learn
実際に取得できる値は、下記にあるような項目です。監査列を付与する際に有用であり、特にデータインジェスト日を付与する場合には_metadata.file_modification_time
を利用する方針がよさそうです。
注意事項
1. _metadata
列をそのまま保持することは非推奨
下記のように記載がある通り、_metadata
列をそのままテーブルに保持させることが非推奨となっております。
引用元:ファイル メタデータ列 - Azure Databricks | Microsoft Learn
必要なカラムのみを取得して、_metadata
列をDrop
したほうがよさそうです。
df_5 = (
df
.withColumn('_metadata', df['_metadata'])
)
df_5 = (
df_5
.withColumn('_metadata_file_path', df_5['_metadata.file_path'])
.withColumn('_metadata_file_modification_time', df_5['_metadata.file_modification_time'])
.drop('_metadata')
)
df_5.display()
2. withColumn
メソッドの連続によりカラムを設定するとエラーとなる場合があること
下記のように withColumn
メソッドを連続して記載すると、_metadata
が見つからないというエラーがでる場合があります。
df_4 = (
df
.withColumn('_metadata', df['_metadata'])
.withColumn('_metadata_file_path', df['_metadata.file_path'])
.withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)
df_4.display()
AnalysisException: Resolved attribute(s) _metadata#2119 missing from p_partkey#2073L,p_name#2074,p_mfgr#2075,p_brand#2076,p_type#2077,p_size#2078,p_container#2079,p_retailprice#2080,p_comment#2081,_metadata#2333 in operator !Project [p_partkey#2073L, p_name#2074, p_mfgr#2075, p_brand#2076, p_type#2077, p_size#2078, p_container#2079, p_retailprice#2080, p_comment#2081, _metadata#2333, _metadata#2119.file_path AS _metadata_file_path#2346]. Attribute(s) with the same name appear in the operation: _metadata. Please check if the right attribute(s) are used.;
対応方法としては2通りあり、実施方法を後述します。
-
select
メソッドにより_metadata
列を実体化する方法 -
withColumn
メソッドを分割する方法
2-1. エラーへの対応方法
0. 事前準備
# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"
schema = """
p_partkey long,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size int,
p_container string,
p_retailprice decimal(12, 2),
p_comment string
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("sep", "|")
.load(filepath)
)
df.display()
1. select
メソッドにより _metadata
列を実体化する方法
df_4 = (
df
.select('*','_metadata')
.withColumn('_metadata_file_path', df['_metadata.file_path'])
.withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)
df_4.display()
2. withColumn
メソッドを分割する方法
df_5 = (
df
.withColumn('_metadata', df['_metadata'])
)
df_5 = (
df_5
.withColumn('_metadata_file_path', df_5['_metadata.file_path'])
.withColumn('_metadata_file_modification_time', df_5['_metadata.file_modification_time'])
.drop('_metadata')
)
df_5.display()
3. _metadata
列のまま、Pandas データフレームに変換した場合に、file_modification_time
列が正常に変換できない場合があること
下記のように変換がうまくいかないことがあるため、単一列化する方がよさそうです。
filepath = "dbfs:/databricks-datasets/tpch/data-001/nation/"
schema = """
N_NATIONKEY integer
,N_NAME string
,N_REGIONKEY integer
,N_COMMENT string
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("inferSchema", "True")
.option("sep", "|")
.load(filepath)
)
df = df.select('_metadata')
pdf = df.toPandas()
pdf.display()
java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Etc/UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2014,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=22,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=1,HOUR=9,HOUR_OF_DAY=21,MINUTE=54,SECOND=56,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]
Spark df -> Pandas df-> Spark df で変換した際にはエラーにはならないため、_metadata
列の仕様のようです。
from pyspark.sql import Row
from decimal import *
import datetime
import pprint
sample_Data = [
Row(
struct_column = Row(
string_column = 'AAA',
byte_column = 1,
integer_column = 1,
bigint_column = 1,
float_column = 12.300000190734863,
double_column = 12.3,
numeric_column = Decimal('12'),
boolean_column = True,
date_column = datetime.date(2020, 1, 1),
timestamp_column = datetime.datetime(2021, 1, 1, 0, 0),
binary_column = bytearray(b'A'),
struct_column = Row(
struct_string_column = 'AAA',
struct_int_column = 1
),
array_column = ['AAA', 'BBB', 'CCC'],
map_column = {'AAA': 1}
),
)
]
df = spark.createDataFrame(sample_Data)
pdf = df.toPandas()
df_2 = spark.createDataFrame(pdf)
df_2.display()
3. File metadata column (ファイル メタデータ列) の利用例
1. Spark データフレームで利用する方法
# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"
schema = """
p_partkey long,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size int,
p_container string,
p_retailprice decimal(12, 2),
p_comment string
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("sep", "|")
.load(filepath)
)
df.display()
df_4 = (
df
.select('*','_metadata')
.withColumn('_metadata_file_path', df['_metadata.file_path'])
.withColumn('_metadata_file_modification_time', df['_metadata.file_modification_time'])
)
df_4.display()
2. Spark SQL で利用する方法
# データフレームを作成
filepath = "dbfs:/databricks-datasets/tpch/data-001/part/"
schema = """
p_partkey long,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size int,
p_container string,
p_retailprice decimal(12, 2),
p_comment string
"""
df = (spark
.read
.format("csv")
.schema(schema)
.option("sep", "|")
.load(filepath)
)
df.display()
df_6 = (
df
.withColumn('_metadata', df['_metadata'])
)
df_6.createOrReplaceTempView('_tmp')
%sql
SELECT
*
FROM
_tmp
3. COPY INTO
で利用する方法
db_name = '_metadata_test'
tbl_name = 'part'
spark.sql(f'CREATE DATABASE IF NOT EXISTS {db_name}')
spark.sql(f'DROP TABLE IF EXISTS {db_name}.{tbl_name}')
spark.sql(f'''
CREATE TABLE IF NOT EXISTS {db_name}.{tbl_name}
(
p_partkey String,
p_name string,
p_mfgr string,
p_brand string,
p_type string,
p_size String,
p_container string,
p_retailprice String,
p_comment string,
_metadata_file_path string,
_metadata_file_modification_time timestamp
)
USING delta
''')
spark.sql(f'''
COPY INTO {db_name}.{tbl_name}
FROM (
SELECT
_c0 AS p_partkey
,_c1 AS p_name
,_c2 AS p_mfgr
,_c3 AS p_brand
,_c4 AS p_type
,_c5 AS p_size
,_c6 AS p_container
,_c7 AS p_retailprice
,_c8 AS p_comment
,_metadata.file_path AS _metadata_file_path
,_metadata.file_modification_time AS _metadata_file_modification_time
FROM
'dbfs:/databricks-datasets/tpch/data-001/part'
)
FILEFORMAT = CSV
FORMAT_OPTIONS (
'mergeSchema' = 'true'
,'ignoreCorruptFiles' = 'false'
,'sep' = '|'
,'inferSchema' = 'false'
)
COPY_OPTIONS (
'mergeSchema' = 'true'
-- ,'force' = 'true'
)
''')
spark.table(f'{db_name}.{tbl_name}').display()
spark.sql(f'DROP DATABASE {db_name}')