以前こちらの記事を翻訳しましたが、2021年のことです。
当時はPythonのカスタムモジュールのインポートが一筋縄ではいきませんでした。Wheelを作成したり、Reposを使ったりしなくてはなりませんでした。今では、ワークスペースのどこでもワークスペースファイルが使えるので、簡単にカスタムモジュールを利用できます。
ここでは、マニュアルに記載されている手順にそって、こちらのリポジトリにあるノートブックをウォークスルーします。その過程でその他のDatabricksノートブックのTipsもご紹介します。
モジュール化しない場合
notebooks/covid_eda_raw.py
のノートブックではCOVID-19の入院者数データに対する分析をこなっていますが、モジュール化は行っていません。
以下のコマンドでは、クラスターのローカルストレージの/tmp
にCSVファイルがダウンロードされます。
!wget -q https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv -O /tmp/covid-hospitalizations.csv
ポイント
Databricksでファイルを取り扱う際には、どこに保存するのかを意識することが重要です。詳細はこちらの記事をご覧ください。
変換処理
import pandas as pd
# /tmpから読み込み、JBNのサブセットを取得、ピボットを行い、欠損地を0に置換
df = pd.read_csv("/tmp/covid-hospitalizations.csv")
df = df[df.iso_code == 'JPN']\
.pivot_table(values='value', columns='indicator', index='date')\
.fillna(0)
df
可視化
df.plot(figsize=(13,6), grid=True).legend(loc='upper left')
Delta Lakeに保存
現在のスキーマにはカラム名にスペースが入っており、Delta Lakeと互換性がありません。データをテーブルとして保存するために、スペースをアンダースコアに置換します。また、日付のインデックスを自身のカラムとして追加し、他の方がこのテーブルをクエリーした際に利用できるようにします。
import pyspark.pandas as ps
clean_cols = df.columns.str.replace(' ', '_')
# pandas on Sparkデータフレームの作成
psdf = ps.from_pandas(df)
psdf.columns = clean_cols
psdf['date'] = psdf.index
# 最新のデータを毎回上書きして、Delta tableに書き込み
psdf.to_table(name='users.takaaki_yayoi.dev_covid_analysis', mode='overwrite')
ポイント
ラップトップのメモリーやCPUを使うJuypter Notebookと違い、Databricksの計算資源はクラウド上の揮発性のクラスターです。つまり、クラスターが停止すると永続化していないデータは消えてしまいます。後でアクセスする可能性があるデータはテーブルやファイルとして永続化するようにしましょう。
永続化したテーブルはサイドメニューのカタログからアクセスできるカタログエクスプローラで確認できます。
テーブルの参照
テーブルとして永続化すると、SQLでテーブルにアクセスできるようになります。
%sql
SELECT * FROM users.takaaki_yayoi.dev_covid_analysis
ポイント
Databricksノートブックでは、セル単位で言語を切り替えることができます。上の例では%sql
マジックコマンドを指定して、SQLを実行しています。詳細はこちらをご覧ください。
ノートブックのモジュール化
ロジックを再利用するケースにおいてモジュール化は有効です。また、ノートブックの肥大化を避けることができ、コーディングやデバッグの効率が向上します。
notebooks/covid_eda_modular.py
ではモジュール化を行っています。covid_analysis/transforms.py
にモジュール化した関数が格納されています。
import pandas as pd
# 国コードでフィルタリング
def filter_country(pdf, country="USA"):
pdf = pdf[pdf.iso_code == country]
return pdf
# indicatorでピボットし、欠損値を補完
def pivot_and_clean(pdf, fillna):
pdf["value"] = pd.to_numeric(pdf["value"])
pdf = pdf.pivot_table(
values="value", columns="indicator", index="date"
).fillna(fillna)
return pdf
# Deltaテーブルと互換性のあるカラム名を作成
def clean_spark_cols(pdf):
pdf.columns = pdf.columns.str.replace(" ", "_")
return pdf
# インデックスをカラムに変換 (pandas API on Sparkでも動作します)
def index_to_col(df, colname):
df[colname] = df.index
return df
notebooks/covid_eda_modular.py
ではモジュールを読み込んで利用します。
セットアップ
こちらを実行することで、モジュールで更新があった場合にそれらが反映されます。
%load_ext autoreload
%autoreload 2
また、モジュールを読み込む際にはパスに注意してください。
import sys
import os
print(sys.path)
[...,
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices/notebooks']
上の例では、ノートブックのカレントディレクトリはパスに含まれていますが、モジュールファイルのパスは含まれていません。以下を実行して親のディレクトリをパスに追加します。
# 親フォルダにあるパッケージをインポートできるようにパスを追加
sys.path.append(os.path.abspath('..'))
print(sys.path)
[...,
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices/notebooks',
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices']
なお、Reposで作業している場合には、Reposフォルダー(Gitフォルダー)のルートパスは最初から追加されています。詳細はこちらをご覧ください。
データの取得と変換
data_path = 'https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv'
print(f'Data path: {data_path}')
Data path: https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv
from covid_analysis.transforms import *
import pandas as pd
df = pd.read_csv(data_path)
df = filter_country(df, country='JPN')
df = pivot_and_clean(df, fillna=0)
df = clean_spark_cols(df)
df = index_to_col(df, colname='date')
# PandasをSparkデータフレームに変換
df = spark.createDataFrame(df)
display(df)
Delta Lakeに保存
現在のスキーマにはカラム名にスペースが入っており、Delta Lakeと互換性がありません。データをテーブルとして保存するために、スペースをアンダースコアに置換します。また、日付のインデックスを自身のカラムとして追加し、他の方がこのテーブルをクエリーした際に利用できるようにします。
# Delta Lakeに書き込み
df.write.mode('overwrite').saveAsTable('users.takaaki_yayoi.covid_stats')
可視化
# Databricksのビジュアライゼーションとデータプロファイリングの活用
display(spark.table('users.takaaki_yayoi.covid_stats'))