0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricksにおけるファイルメタデータカラム

Posted at

File metadata column | Databricks on AWS [2022/6/9時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

注意
Databricksランタイム10.5以降で使用できます。

_metadataカラムを用いることで、入力ファイルのメタデータ情報を取得することができます。_metadataカラムは隠しカラムであり、すべてのファイルフォマットに対して利用することができます。返却されるデータフレームに_metadataカラムを含めるには、クエリーの中で明示的に参照する必要があります。

データソースに_metadataというカラムが含まれている場合、クエリーはデータソースからのカラムを返却し、ファイルメタデータを返却しません。

警告!
将来のリリースでは_metadataカラムに新たなフィールドが追加される場合があります。_metadataカラムがアップデートされた際のスキーマエボリューションのエラーを防ぐには、お使いのクエリーのカラムで特定のフィールドを選択するようにすることをお勧めします。サンプルをご覧ください。

サポートされるメタデータ

_metadataカラムは以下のフィールドを持つSTRUCTです。

名前 タイプ 説明 サンプル
file_path STRING 入力ファイルのファイルパス。 file:/tmp/f0.csv
file_name STRING 入力ファイルの名前と拡張子。 f0.csv
file_size LONG 入力ファイルのバイト長。 628
file_modification_time TIMESTAMP 入力ファイルの最終更新時刻タイムスタンプ。 2021-12-20 20:05:21

サンプル

基本的なファイルベースのデータソースのリーダーでの使用

Python
df = spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*", "_metadata")

display(df)

'''
Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
'''
Scala
val df = spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*", "_metadata")

display(df_population)

/* Result:
+---------+-----+----------------------------------------------------+
|   name  | age |                 _metadata                          |
+=========+=====+====================================================+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f0.csv",                |
| Debbie  | 18  |    "file_name": "f0.csv",                          |
|         |     |    "file_size": 12,                                |
|         |     |    "file_modification_time": "2021-07-02 01:05:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
|         |     | {                                                  |
|         |     |    "file_path": "dbfs:/tmp/f1.csv",                |
| Frank   | 24  |    "file_name": "f1.csv",                          |
|         |     |    "file_size": 10,                                |
|         |     |    "file_modification_time": "2021-12-20 02:06:21" |
|         |     | }                                                  |
+---------+-----+----------------------------------------------------+
*/

特定のフィールドの選択

Python
spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("_metadata.file_name", "_metadata.file_size")
Scala
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("_metadata.file_name", "_metadata.file_size")

フィルターでの使用

Python
spark.read \
  .format("csv") \
  .schema(schema) \
  .load("dbfs:/tmp/*") \
  .select("*") \
  .filter(col("_metadata.file_name") == lit("test.csv"))
Scala
spark.read
  .format("csv")
  .schema(schema)
  .load("dbfs:/tmp/*")
  .select("*")
  .filter(col("_metadata.file_name") === lit("test.csv"))

COPY INTOでの使用

SQL
COPY INTO my_delta_table
FROM (
  SELECT *, _metadata FROM 's3://my-bucket/csvData'
)
FILEFORMAT = CSV

Auto Loaderでの使用

Python
spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .schema(schema) \
  .load("s3://my-bucket/csvData") \
  .select("*", "_metadata") \
  .writeStream \
  .format("delta") \
  .option("checkpointLocation", checkpointLocation) \
  .start(targetTable)
Scala
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .schema(schema)
  .load("s3://my-bucket/csvData")
  .select("*", "_metadata")
  .writeStream
  .format("delta")
  .option("checkpointLocation", checkpointLocation)
  .start(targetTable)

関連資料

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?