Track ML Model training data with Delta Lake | Databricks on AWSのノートブックをウォークスルーします。
ノートブックの翻訳版はこちらです。
機械学習(ML)モデルのトレーニングに使ったデータをトラッキングし、再現できる様にし、特定のデータセットを用いたMLモデルを特定できる様にするために、どのようにDelta LakeとMLflowを活用するのかを説明します。
MLflowとは?
機械学習モデルのライフサイクル管理のためのフレームワークを提供するソフトウェアです。機械学習のトラッキング、集中管理のためのモデルレジストリといった機能を提供します。Databricksでは、マネージドサービスとしてMLflowを利用できる様になっていますので、Databricksノートブック上でトレーニングした機械学習は自動でトラッキングされます。
Delta Lakeとは?
データレイクに格納されているデータに対して高速なデータ処理、強力なデータガバナンスを提供するストレージレイヤーソフトウェアです。ACIDトランザクションやデータのバージョン管理、インデックス作成機能などを提供します。機械学習の文脈ではデータのバージョン管理が重要な意味を持つことになります。
ノートブックのウォークスルー
データチームがモデルをトレーニングし、それをプロダクションにデプロイして、しばらくの間は上手くいくと言うのはよくある話です。そして、モデルが変な予測をし始めると、モデルを調査し、デバッグしなくてはなりません。
このノートブックでは、デバッグを容易にするためにモデルのトレーニングを容易に追跡、可視化、再現するために、どのようにMLflowとDelta Lakeを活用するのかをデモンストレーションします。
- MLパイプラインの構築に使用したデータの正確なスナップショットを追跡し、再現する。
- 特定のデータのスナップショットでトレーニングを行ったモデルを特定する。
- (例:古いモデルを再現するために)過去のデータのスナップショットに対してトレーニングを再実行する。
このノートブックでは、データのバージョン管理と「タイムトラベル」機能を提供するDelta Lakeを使用し、データを追跡し、特定のデータセットを使用したランをクエリーするためにMLflowを活用します。
問題定義: 貸し手向け「悪いローン」の分類
このノートブックでは、クレジットのスコア、クレジット履歴などその他の特徴量に基づいて、「悪いローン」(利益を産まない可能性があるローン)の特定をゴールとして、Lending Clubデータセットにおける分類問題に取り組みます。
最終的なゴールは、ローンを承認するかどうかを決定する前に、ローンの係員が使用する解釈可能なモデルを生成することです。この様なモデルは貸し手に対して、情報を提供するビューとなり、見込みのある借り手を即座に評価し、レスポンスできる様にします。
データ
使用するデータはLending Clubの公開データです。これには、2012年から2017年に融資されたローンが含まれています。それぞれのローンには、申請者によって提供された申込者情報と、現在のローンのステータス(遅延なし、遅延、完済など)、最新の支払い情報が含まれています。データに対する完全なビューはデータ辞書をご覧ください。
セットアップ: DBFSにDeltaテーブルを作成
DBFS(Databricksファイルシステム)に格納されている既存のParquetテーブルを変換することで、Delta Lakeフォーマットでいくつかのサンプルデータを生成します。
from pyspark.sql.functions import *
# テーブルが存在する場合には削除
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta" # 適宜変更してください
dbutils.fs.rm(DELTA_TABLE_DEFAULT_PATH, recurse=True)
# Lending Clubデータをロード&加工し、Delta LakeフォーマットでDBFSに保存します
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"
data = spark.read.parquet(lspq_path)
# 必要なカラムを選択し、他の前処理を適用
features = ["loan_amnt", "annual_inc", "dti", "delinq_2yrs","total_acc", "total_pymnt", "issue_d", "earliest_cr_line"]
raw_label = "loan_status"
loan_stats_ce = data.select(*(features + [raw_label]))
print("------------------------------------------------------------------------------------------------")
print("悪いローンのラベルを作成、これにはチャージオフ、デフォルト、ローンの支払い遅延が含まれます...")
loan_stats_ce = loan_stats_ce.filter(loan_stats_ce.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
.withColumn("bad_loan", (~(loan_stats_ce.loan_status == "Fully Paid")).cast("string"))
loan_stats_ce = loan_stats_ce.orderBy(rand()).limit(10000) # Community Editionでも実行できる様にロードする行を限定
print("------------------------------------------------------------------------------------------------")
print("数値のカラムを適切な型にキャスト...")
loan_stats_ce = loan_stats_ce.withColumn('issue_year', substring(loan_stats_ce.issue_d, 5, 4).cast('double')) \
.withColumn('earliest_year', substring(loan_stats_ce.earliest_cr_line, 5, 4).cast('double')) \
.withColumn('total_pymnt', loan_stats_ce.total_pymnt.cast('double'))
loan_stats_ce = loan_stats_ce.withColumn('credit_length_in_years', (loan_stats_ce.issue_year - loan_stats_ce.earliest_year))
# Delta Lakeフォーマットでテーブルを保存
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)
1. 再現性確保のためにデータバージョンとロケーションをトラッキング
このノートブックではウィジェット経由でデータのバージョンとパスを受け入れ、将来的に明示的にデータバージョンとパスを指定することでノートブックの実行を再現できる様になっています。データバージョンを指定できることはDelta Lakeを活用することのメリットであり、後ほどレストアできる様に以前のバージョンのデータセットを保持します。
# ノートブックのパラメーターからデータのパスとバージョンを取得
dbutils.widgets.text(name="deltaVersion", defaultValue="1", label="テーブルのバージョン、デフォルトは最新")
dbutils.widgets.text(name="deltaPath", defaultValue="", label="テーブルのパス")
data_version = None if dbutils.widgets.get("deltaVersion") == "" else int(dbutils.widgets.get("deltaVersion"))
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta" # 適宜変更してください
data_path = DELTA_TABLE_DEFAULT_PATH if dbutils.widgets.get("deltaPath") == "" else dbutils.widgets.get("deltaPath")
print("テーブルのバージョン:", data_version)
print("テーブルのパス:", data_path)
上のコマンドを実行すると、ノートブック上部にウィジェットが作成されます。
Deltaテーブルからデータをロード
ウィジェットで指定されたデータパスとバージョンを用いて、Delta Lakeフォーマットでデータをロードします。
# バージョンパラメーターが明示的に指定されていない場合、デフォルトでは最新バージョンのテーブルを使用します
if data_version is None:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, data_path)
version_to_load = delta_table.history(1).select("version").collect()[0].version
else:
version_to_load = data_version
loan_stats = spark.read.format("delta").option("versionAsOf", version_to_load).load(data_path)
# データの確認
display(loan_stats)
Deltaテーブルの履歴を確認
初期状態のデータ追加、アップデート、削除、マージ、追加を含むこのテーブルに対するすべてのトランザクションはテーブルに記録されます。
SQLからアクセスできる様に、Deltaファイルからテーブルを作成します。これをDeltaテーブルと呼びます。
spark.sql("DROP TABLE IF EXISTS loan_stats")
spark.sql("CREATE TABLE loan_stats USING DELTA LOCATION '" + DELTA_TABLE_DEFAULT_PATH + "'")
Databricksノートブックでは、言語マジックコマンド%sql
を指定することで、特定のセルの言語を切り替えることができます。SQLコマンドDESCRIBE HISTORY
でテーブルの変更履歴を確認します。
%sql
DESCRIBE HISTORY loan_stats
テーブルの作成直後なので、バージョン0しか存在していませんが、今後この履歴が更新されていくことになります。
ハイパーパラメーターチューニングのために交差検証を用いたモデルのトレーニング
Spark MLlibを用いてMLパイプラインをトレーニングします。後で調査できる様に、チューニングの実行におけるメトリクスとパラメーターは、自動でMLflowによってトラッキングされます。
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow.spark
from pyspark.sql import SparkSession
# data_version, data_pathを含むパラメーターを自動で記録するためにautolog APIを使います
mlflow.spark.autolog()
def _fit_crossvalidator(train, features, target):
"""
`features`のカラムを用いて、渡されたトレーニング用データフレームの`target`の2値ラベルを予測するために
CrossValidatorモデルをフィッティングするヘルパー関数
:param: train: トレーニングデータを格納するSparkデータフレーム
:param: features: `train`から特徴量として使用するカラム名を含む文字列のリスト
:param: target: 予測する`train`の2値ターゲットカラムの名前
"""
train = train.select(features + [target])
model_matrix_stages = [
Imputer(inputCols = features, outputCols = features),
VectorAssembler(inputCols=features, outputCol="features"),
StringIndexer(inputCol="bad_loan", outputCol="label")
]
lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "features")
pipeline = Pipeline(stages=model_matrix_stages + [lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=5)
cvModel = crossval.fit(train)
return cvModel.bestModel
# モデルのフィッティングを行いROCを表示します
features = ["loan_amnt", "annual_inc", "dti", "delinq_2yrs","total_acc", "credit_length_in_years"]
glm_model = _fit_crossvalidator(loan_stats, features, target="bad_loan")
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
display(lr_summary.roc)
なお、Databricksノートブックでは簡単にグラフを表示することができます。テーブルの右にある + をクリックし、 ビジュアライゼーションを選択します。
ビジュアライゼーションの設定画面が表示されるので、表示したい内容に設定を変更して保存をクリックすると、ノートブック上にビジュアライゼーションが追加されます。
print("MLパイプラインの精度: %s" % lr_summary.accuracy)
MLflowエクスペリメントランサイドバーでトレーニング結果を参照
上のモデルトレーニングコードは、MLflowのランの中にメトリクスやパラメーターを自動で記録し、MLflowランサイドバーで参照することができます。エクスペリメントランサイドバーを表示するには右上のフラスコアイコンをクリックします。
画面右側にサイドバーが表示され、トラッキングされたモデルを確認することができます。
ここでさらにモデル名をクリックすると、詳細画面に移動できます。この時点で、sparkDatasourceInfo
というタグに、トレーニングに使用したデータのパスとバージョンが記録されていることがわかります。
もちろんですが、モデル本体も記録されていますので、簡単に他のノートブックから活用することも可能です。
特徴量エンジニアリング: データスキーマを進化
データセットの過去のバージョンを追跡するDelta Lakeを用いて、モデルパフォーマンスを改善するためにいくつかの特徴量エンジニアリングを行うことができます。最初に、ローンごとに支払い金額とローン金額の合計を捉える特徴量を追加します。
print("------------------------------------------------------------------------------------------------")
print("ローンごとの支払い、ローン金額の合計を計算...")
loan_stats_new = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))
スキーマを安全に進化させるように、mergeSchema
を指定して更新したテーブルを保存します。
loan_stats_new.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)
# オリジナルのスキーマと更新したスキーマの違いを確認します
set(loan_stats_new.schema.fields) - set(loan_stats.schema.fields)
更新したデータでモデルを再トレーニングし、オリジナルのモデルとパフォーマンスを比較します。
新しいモデルの精度を確認します。
print("MLパイプラインの精度: %s" % lr_summary_new.accuracy)
このように、特徴量エンジニアリングを行うことで精度が改善されました。しかし、適切に情報を追跡する仕組みが無いと、「このモデルのトレーニングに使ったデータはどれだったっけ?」ですとか「そもそもベストなパフォーマンスのモデルはどこにいった?」ということになりかねません。こういった事態を回避し、データサイエンスの助けになるのがMLflowとDelta Lakeという強力な組み合わせなのです。
ここまでの取り組み内容はすべて自動でMLflowによってトラッキングされています。
2. オリジナルのデータバージョンを使用したランの特定
特徴量エンジニアリングのステップを経て、モデルの精度は ~80% から改善しました。この様に思うかもしれません: オリジナルのデータセットで構築したすべてのモデルを特徴量エンジニアリングしたデータセットで再トレーニングしたらどうなるのだろう?モデルパフォーマンスに動揺の改善が見られるのだろうか?
オリジナルデータセットに対して行われた他のランを特定するには、MLflowのmlflow.search_runs
APIを使います。
mlflow.search_runs(filter_string="tags.sparkDatasourceInfo LIKE '%path=dbfs:{path}%'".format(path=data_path, version=0))
3. データのスナップショットをロードし、ランを再現
最後に、モデルの再トレーニングに使うデータの特定のバージョンをロードすることができます。これを行うには、シンプルに上のウィジェットでデータバージョン1(特徴量エンジニアリングをおこなったデータに対応)を指定し、ノートブックのセクション1を再実行します。
まとめ
このように、機械学習モデルをトレーニングする際に記録すべきモデル本体はもちろんのこと、その際に指定したハイパーパラメーター、モデルのパフォーマンスを示すメトリクス、さらにはトレーニングに使用したデータまでもが追跡できていることを体験いただけたかと思います。
是非、サンプルノートブックを試していただければと思います。これらの機能を活用いただくことで、皆様のデータサイエンスの取り組みにおける生産性向上に貢献できたらと考えています。
この他、DatabricksがAIプロジェクトでどの様にお役に立つのかを漫画シリーズでまとめています。こちらもご覧いただければと思います。