0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

databricksのhiveメタストアに保存されたテーブルの過去バージョンにアクセスする方法

Posted at

databricksの標準ファイルフォーマットであるdeltaにはtime travelなる機能があり、この機能を使うと過去のバージョンのデータにアクセスできる。このdelta形式は実はhiveメタストアにも使われているので、hiveメタストアに保存されたテーブルにもtime travelは使える。しかし、やや面倒な方法をとる必要があるため、備忘録的に書いておく。なおtime travel機能については下記を参考のこと。
https://www.databricks.com/jp/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

hiveメタストア上のテーブルのフォーマットを確認してみる

とりあえず下準備としてsklearn.datasetsのbreast_cancerを使って、hiveメタストアにデータを保存してみる。

import pandas as pd
from sklearn import datasets

breast_cancer_data = datasets.load_breast_cancer()
breast_cancer_df =  pd.DataFrame(
    breast_cancer_data.data,
    columns = breast_cancer_data.feature_names
  )
breast_cancer_df.columns = [col.replace(' ', '_') for col in breast_cancer_df.columns]
cols = list(breast_cancer_df.columns)
# spark DataFrameに変換
breast_cancer_df['target'] = breast_cancer_data.target
breast_cancer_sdf = spark.createDataFrame(breast_cancer_df)

# hiveメタストアにテーブルを保存
breast_cancer_sdf.write.saveAsTable('default.breast_cancer')

ここでdefaultデータベースに保存されたテーブルbreast_cancerのデータフォーマットを確認してみよう。

display(spark.sql('DESCRIBE DETAIL default.breast_cancer'))

実行結果は以下のとおり。
image.png
format列の値がdeltaになっていることが分かる。さてdeltaのtime travelで過去バージョンにアクセスするには、hiveメタストアに保存されたテーブルのファイルパスが必要になる。ファイルパスはこの結果のlocation列の値になる。

実際にtime travelしてみる

ではbreast_cancerからtarget列を削除してhiveメタストア上のテーブルを上書き保存してみる。

breast_cancer_sdf = breast_cancer_sdf.drop('target')
# hiveメタストアに上書き保存
(breast_cancer_sdf
    .write
    .mode('overwrite')
    .option('overwriteSchema','true')
    .saveAsTable('default.breast_cancer')
)

ついでにバージョン履歴も確認しておく

display(spark.sql('DESCRIBE HISTORY default.breast_cancer'))

結果は以下のとおり。hiveメタストアもバージョン管理されていることがわかる。
image.png

繰り返しになるがtime travelにはデータのファイルパスが必要になるため、DESCRIBE DETAILで取得する。

location = (spark
  .sql('DESCRIBE DETAIL default.breast_cancer')
  .select('location')
  .collect()[0][0]
)
# version 0のテーブルを読み込む
df_v0 = spark.read.format('delta').option('versionAsOf',0).load(location)

結果は以下のとおり、target列が存在しているので、列を削除前のバージョンを復元できていることがわかる
image.png

一応バージョン1の結果も載せてみる。当然だがtarget列は存在しない。

df_v1 = spark.read.format('delta').option('versionAsOf',1).load(location)

image.png

なおtime travelで遡れる期間はデフォルトで30日なので、それ以上長い期間のバージョンを保持する場合は別途設定が必要な点に注意が必要。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?