1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksベストプラクティス:ノートブックのモジュール化

Last updated at Posted at 2024-05-07

以前こちらの記事を翻訳しましたが、2021年のことです。

当時はPythonのカスタムモジュールのインポートが一筋縄ではいきませんでした。Wheelを作成したり、Reposを使ったりしなくてはなりませんでした。今では、ワークスペースのどこでもワークスペースファイルが使えるので、簡単にカスタムモジュールを利用できます。

ここでは、マニュアルに記載されている手順にそって、こちらのリポジトリにあるノートブックをウォークスルーします。その過程でその他のDatabricksノートブックのTipsもご紹介します。

モジュール化しない場合

notebooks/covid_eda_raw.pyのノートブックではCOVID-19の入院者数データに対する分析をこなっていますが、モジュール化は行っていません。

以下のコマンドでは、クラスターのローカルストレージの/tmpにCSVファイルがダウンロードされます。

!wget -q https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv -O /tmp/covid-hospitalizations.csv

ポイント
Databricksでファイルを取り扱う際には、どこに保存するのかを意識することが重要です。詳細はこちらの記事をご覧ください。

変換処理

import pandas as pd

# /tmpから読み込み、JBNのサブセットを取得、ピボットを行い、欠損地を0に置換
df = pd.read_csv("/tmp/covid-hospitalizations.csv")
df = df[df.iso_code == 'JPN']\
     .pivot_table(values='value', columns='indicator', index='date')\
     .fillna(0)

df

Screenshot 2024-05-07 at 16.12.32.png

可視化

df.plot(figsize=(13,6), grid=True).legend(loc='upper left')

download (1).png

Delta Lakeに保存

現在のスキーマにはカラム名にスペースが入っており、Delta Lakeと互換性がありません。データをテーブルとして保存するために、スペースをアンダースコアに置換します。また、日付のインデックスを自身のカラムとして追加し、他の方がこのテーブルをクエリーした際に利用できるようにします。

import pyspark.pandas as ps

clean_cols = df.columns.str.replace(' ', '_')

# pandas on Sparkデータフレームの作成
psdf = ps.from_pandas(df)

psdf.columns = clean_cols
psdf['date'] = psdf.index

# 最新のデータを毎回上書きして、Delta tableに書き込み
psdf.to_table(name='users.takaaki_yayoi.dev_covid_analysis', mode='overwrite')

ポイント
ラップトップのメモリーやCPUを使うJuypter Notebookと違い、Databricksの計算資源はクラウド上の揮発性のクラスターです。つまり、クラスターが停止すると永続化していないデータは消えてしまいます。後でアクセスする可能性があるデータはテーブルやファイルとして永続化するようにしましょう。

永続化したテーブルはサイドメニューのカタログからアクセスできるカタログエクスプローラで確認できます。
Screenshot 2024-05-07 at 16.43.37.png

テーブルの参照

テーブルとして永続化すると、SQLでテーブルにアクセスできるようになります。

%sql
SELECT * FROM users.takaaki_yayoi.dev_covid_analysis

Screenshot 2024-05-07 at 16.14.42.png

ポイント
Databricksノートブックでは、セル単位で言語を切り替えることができます。上の例では%sqlマジックコマンドを指定して、SQLを実行しています。詳細はこちらをご覧ください。

ノートブックのモジュール化

ロジックを再利用するケースにおいてモジュール化は有効です。また、ノートブックの肥大化を避けることができ、コーディングやデバッグの効率が向上します。

notebooks/covid_eda_modular.pyではモジュール化を行っています。covid_analysis/transforms.pyにモジュール化した関数が格納されています。

covid_analysis/transforms.py
import pandas as pd


# 国コードでフィルタリング
def filter_country(pdf, country="USA"):
    pdf = pdf[pdf.iso_code == country]
    return pdf


# indicatorでピボットし、欠損値を補完
def pivot_and_clean(pdf, fillna):
    pdf["value"] = pd.to_numeric(pdf["value"])
    pdf = pdf.pivot_table(
        values="value", columns="indicator", index="date"
    ).fillna(fillna)
    return pdf


# Deltaテーブルと互換性のあるカラム名を作成
def clean_spark_cols(pdf):
    pdf.columns = pdf.columns.str.replace(" ", "_")
    return pdf


# インデックスをカラムに変換 (pandas API on Sparkでも動作します)
def index_to_col(df, colname):
    df[colname] = df.index
    return df

notebooks/covid_eda_modular.pyではモジュールを読み込んで利用します。

セットアップ

こちらを実行することで、モジュールで更新があった場合にそれらが反映されます。

%load_ext autoreload
%autoreload 2

また、モジュールを読み込む際にはパスに注意してください。

import sys
import os
print(sys.path)
[..., 
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices/notebooks']

上の例では、ノートブックのカレントディレクトリはパスに含まれていますが、モジュールファイルのパスは含まれていません。以下を実行して親のディレクトリをパスに追加します。

# 親フォルダにあるパッケージをインポートできるようにパスを追加
sys.path.append(os.path.abspath('..'))
print(sys.path)
[...,
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices/notebooks', 
'/Workspace/Users/takaaki.yayoi@databricks.com/20240507_notebook_best_practice/notebook-best-practices']

なお、Reposで作業している場合には、Reposフォルダー(Gitフォルダー)のルートパスは最初から追加されています。詳細はこちらをご覧ください。

データの取得と変換

data_path = 'https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv'
print(f'Data path: {data_path}')
Data path: https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/hospitalizations/covid-hospitalizations.csv
from covid_analysis.transforms import *
import pandas as pd

df = pd.read_csv(data_path)
df = filter_country(df, country='JPN')
df = pivot_and_clean(df, fillna=0)  
df = clean_spark_cols(df)
df = index_to_col(df, colname='date')
# PandasをSparkデータフレームに変換
df = spark.createDataFrame(df)

display(df)

Screenshot 2024-05-07 at 16.29.24.png

Delta Lakeに保存

現在のスキーマにはカラム名にスペースが入っており、Delta Lakeと互換性がありません。データをテーブルとして保存するために、スペースをアンダースコアに置換します。また、日付のインデックスを自身のカラムとして追加し、他の方がこのテーブルをクエリーした際に利用できるようにします。

# Delta Lakeに書き込み
df.write.mode('overwrite').saveAsTable('users.takaaki_yayoi.covid_stats')

可視化

# Databricksのビジュアライゼーションとデータプロファイリングの活用
display(spark.table('users.takaaki_yayoi.covid_stats'))

ポイント
Databricksノートブックには可視化の機能がビルトインされています。データフレームを表示した状態で + > 可視化 を選択します。
Screenshot 2024-05-07 at 16.30.45.png
プレビューを確認しながら、可視化の設定を行います。
Screenshot 2024-05-07 at 16.32.34.png
これで簡単に可視化を行うことができます。
Screenshot 2024-05-07 at 16.32.49.png

ポイント
さらに、 + > データプロファイルをクリックすると、データフレームの統計情報や傾向をクイックに確認することができます。
Screenshot 2024-05-07 at 16.34.47.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?