3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

MLflowとDelta Lakeを用いた機械学習トレーニング

Last updated at Posted at 2022-11-20

Track ML Model training data with Delta Lake | Databricks on AWSのノートブックをウォークスルーします。

ノートブックの翻訳版はこちらです。

機械学習(ML)モデルのトレーニングに使ったデータをトラッキングし、再現できる様にし、特定のデータセットを用いたMLモデルを特定できる様にするために、どのようにDelta LakeMLflowを活用するのかを説明します。

MLflowとは?

機械学習モデルのライフサイクル管理のためのフレームワークを提供するソフトウェアです。機械学習のトラッキング、集中管理のためのモデルレジストリといった機能を提供します。Databricksでは、マネージドサービスとしてMLflowを利用できる様になっていますので、Databricksノートブック上でトレーニングした機械学習は自動でトラッキングされます。

Delta Lakeとは?

データレイクに格納されているデータに対して高速なデータ処理、強力なデータガバナンスを提供するストレージレイヤーソフトウェアです。ACIDトランザクションやデータのバージョン管理、インデックス作成機能などを提供します。機械学習の文脈ではデータのバージョン管理が重要な意味を持つことになります。

ノートブックのウォークスルー

データチームがモデルをトレーニングし、それをプロダクションにデプロイして、しばらくの間は上手くいくと言うのはよくある話です。そして、モデルが変な予測をし始めると、モデルを調査し、デバッグしなくてはなりません。

このノートブックでは、デバッグを容易にするためにモデルのトレーニングを容易に追跡、可視化、再現するために、どのようにMLflowDelta Lakeを活用するのかをデモンストレーションします。

  1. MLパイプラインの構築に使用したデータの正確なスナップショットを追跡し、再現する。
  2. 特定のデータのスナップショットでトレーニングを行ったモデルを特定する。
  3. (例:古いモデルを再現するために)過去のデータのスナップショットに対してトレーニングを再実行する。

このノートブックでは、データのバージョン管理と「タイムトラベル」機能を提供するDelta Lakeを使用し、データを追跡し、特定のデータセットを使用したランをクエリーするためにMLflowを活用します。

問題定義: 貸し手向け「悪いローン」の分類

このノートブックでは、クレジットのスコア、クレジット履歴などその他の特徴量に基づいて、「悪いローン」(利益を産まない可能性があるローン)の特定をゴールとして、Lending Clubデータセットにおける分類問題に取り組みます。

最終的なゴールは、ローンを承認するかどうかを決定する前に、ローンの係員が使用する解釈可能なモデルを生成することです。この様なモデルは貸し手に対して、情報を提供するビューとなり、見込みのある借り手を即座に評価し、レスポンスできる様にします。

データ

使用するデータはLending Clubの公開データです。これには、2012年から2017年に融資されたローンが含まれています。それぞれのローンには、申請者によって提供された申込者情報と、現在のローンのステータス(遅延なし、遅延、完済など)、最新の支払い情報が含まれています。データに対する完全なビューはデータ辞書をご覧ください。

Loan_Data

セットアップ: DBFSにDeltaテーブルを作成

DBFS(Databricksファイルシステム)に格納されている既存のParquetテーブルを変換することで、Delta Lakeフォーマットでいくつかのサンプルデータを生成します。

Python
from pyspark.sql.functions import *

# テーブルが存在する場合には削除
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta" # 適宜変更してください
dbutils.fs.rm(DELTA_TABLE_DEFAULT_PATH, recurse=True)

# Lending Clubデータをロード&加工し、Delta LakeフォーマットでDBFSに保存します
lspq_path = "/databricks-datasets/samples/lending_club/parquet/"
data = spark.read.parquet(lspq_path)

# 必要なカラムを選択し、他の前処理を適用
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "total_pymnt", "issue_d", "earliest_cr_line"]
raw_label = "loan_status"
loan_stats_ce = data.select(*(features + [raw_label]))
print("------------------------------------------------------------------------------------------------")
print("悪いローンのラベルを作成、これにはチャージオフ、デフォルト、ローンの支払い遅延が含まれます...")
loan_stats_ce = loan_stats_ce.filter(loan_stats_ce.loan_status.isin(["Default", "Charged Off", "Fully Paid"]))\
                       .withColumn("bad_loan", (~(loan_stats_ce.loan_status == "Fully Paid")).cast("string"))
loan_stats_ce = loan_stats_ce.orderBy(rand()).limit(10000) # Community Editionでも実行できる様にロードする行を限定
print("------------------------------------------------------------------------------------------------")
print("数値のカラムを適切な型にキャスト...")
loan_stats_ce = loan_stats_ce.withColumn('issue_year',  substring(loan_stats_ce.issue_d, 5, 4).cast('double')) \
                       .withColumn('earliest_year', substring(loan_stats_ce.earliest_cr_line, 5, 4).cast('double')) \
                       .withColumn('total_pymnt', loan_stats_ce.total_pymnt.cast('double'))
loan_stats_ce = loan_stats_ce.withColumn('credit_length_in_years', (loan_stats_ce.issue_year - loan_stats_ce.earliest_year))   
# Delta Lakeフォーマットでテーブルを保存
loan_stats_ce.write.format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)

1. 再現性確保のためにデータバージョンとロケーションをトラッキング

このノートブックではウィジェット経由でデータのバージョンとパスを受け入れ、将来的に明示的にデータバージョンとパスを指定することでノートブックの実行を再現できる様になっています。データバージョンを指定できることはDelta Lakeを活用することのメリットであり、後ほどレストアできる様に以前のバージョンのデータセットを保持します。

Python
# ノートブックのパラメーターからデータのパスとバージョンを取得
dbutils.widgets.text(name="deltaVersion", defaultValue="1", label="テーブルのバージョン、デフォルトは最新")
dbutils.widgets.text(name="deltaPath", defaultValue="", label="テーブルのパス")

data_version = None if dbutils.widgets.get("deltaVersion") == "" else int(dbutils.widgets.get("deltaVersion"))
DELTA_TABLE_DEFAULT_PATH = "/ml/loan_stats.delta" # 適宜変更してください
data_path = DELTA_TABLE_DEFAULT_PATH if dbutils.widgets.get("deltaPath")  == "" else dbutils.widgets.get("deltaPath")

print("テーブルのバージョン:", data_version)
print("テーブルのパス:", data_path)

上のコマンドを実行すると、ノートブック上部にウィジェットが作成されます。
Screen Shot 2022-11-20 at 10.26.30.png
Screen Shot 2022-11-20 at 10.26.20.png

Deltaテーブルからデータをロード

ウィジェットで指定されたデータパスとバージョンを用いて、Delta Lakeフォーマットでデータをロードします。

Python
# バージョンパラメーターが明示的に指定されていない場合、デフォルトでは最新バージョンのテーブルを使用します
if data_version is None:
  from delta.tables import DeltaTable  
  delta_table = DeltaTable.forPath(spark, data_path)
  version_to_load = delta_table.history(1).select("version").collect()[0].version  
else:
  version_to_load = data_version

loan_stats = spark.read.format("delta").option("versionAsOf", version_to_load).load(data_path)  

# データの確認
display(loan_stats)

Screen Shot 2022-11-20 at 10.27.36.png

Deltaテーブルの履歴を確認

初期状態のデータ追加、アップデート、削除、マージ、追加を含むこのテーブルに対するすべてのトランザクションはテーブルに記録されます。

SQLからアクセスできる様に、Deltaファイルからテーブルを作成します。これをDeltaテーブルと呼びます。

Python
spark.sql("DROP TABLE IF EXISTS loan_stats")
spark.sql("CREATE TABLE loan_stats USING DELTA LOCATION '" + DELTA_TABLE_DEFAULT_PATH + "'")

Databricksノートブックでは、言語マジックコマンド%sqlを指定することで、特定のセルの言語を切り替えることができます。SQLコマンドDESCRIBE HISTORYでテーブルの変更履歴を確認します。

SQL
%sql
DESCRIBE HISTORY loan_stats

テーブルの作成直後なので、バージョン0しか存在していませんが、今後この履歴が更新されていくことになります。
Screen Shot 2022-11-20 at 10.29.47.png

ハイパーパラメーターチューニングのために交差検証を用いたモデルのトレーニング

Spark MLlibを用いてMLパイプラインをトレーニングします。後で調査できる様に、チューニングの実行におけるメトリクスとパラメーターは、自動でMLflowによってトラッキングされます。

Python
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import mlflow.spark
from pyspark.sql import SparkSession

# data_version, data_pathを含むパラメーターを自動で記録するためにautolog APIを使います
mlflow.spark.autolog()

def _fit_crossvalidator(train, features, target):
  """
  `features`のカラムを用いて、渡されたトレーニング用データフレームの`target`の2値ラベルを予測するために
  CrossValidatorモデルをフィッティングするヘルパー関数
  :param: train: トレーニングデータを格納するSparkデータフレーム
  :param: features: `train`から特徴量として使用するカラム名を含む文字列のリスト
  :param: target: 予測する`train`の2値ターゲットカラムの名前
  """
  train = train.select(features + [target])
  model_matrix_stages = [
    Imputer(inputCols = features, outputCols = features),
    VectorAssembler(inputCols=features, outputCol="features"),
    StringIndexer(inputCol="bad_loan", outputCol="label")
  ]
  lr = LogisticRegression(maxIter=10, elasticNetParam=0.5, featuresCol = "features")
  pipeline = Pipeline(stages=model_matrix_stages + [lr])
  paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
  crossval = CrossValidator(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=BinaryClassificationEvaluator(),
                            numFolds=5)

  cvModel = crossval.fit(train)
  return cvModel.bestModel
Python
# モデルのフィッティングを行いROCを表示します
features = ["loan_amnt",  "annual_inc", "dti", "delinq_2yrs","total_acc", "credit_length_in_years"]
glm_model = _fit_crossvalidator(loan_stats, features, target="bad_loan")
lr_summary = glm_model.stages[len(glm_model.stages)-1].summary
display(lr_summary.roc)

MLflowによってモデルが記録されます。
Screen Shot 2022-11-20 at 10.31.18.png

ROCがテーブル形式で表示されます。
Screen Shot 2022-11-20 at 10.31.52.png

なお、Databricksノートブックでは簡単にグラフを表示することができます。テーブルの右にある + をクリックし、 ビジュアライゼーションを選択します。
Screen Shot 2022-11-20 at 10.32.48.png

ビジュアライゼーションの設定画面が表示されるので、表示したい内容に設定を変更して保存をクリックすると、ノートブック上にビジュアライゼーションが追加されます。
Screen Shot 2022-11-20 at 10.32.54.png
Screen Shot 2022-11-20 at 10.33.30.png

Python
print("MLパイプラインの精度: %s" % lr_summary.accuracy)

Screen Shot 2022-11-20 at 10.35.38.png

MLflowエクスペリメントランサイドバーでトレーニング結果を参照

上のモデルトレーニングコードは、MLflowのランの中にメトリクスやパラメーターを自動で記録し、MLflowランサイドバーで参照することができます。エクスペリメントランサイドバーを表示するには右上のフラスコアイコンをクリックします。
Screen Shot 2022-11-20 at 10.36.10.png

画面右側にサイドバーが表示され、トラッキングされたモデルを確認することができます。
Screen Shot 2022-11-20 at 10.52.18.png

ここでさらにモデル名をクリックすると、詳細画面に移動できます。この時点で、sparkDatasourceInfoというタグに、トレーニングに使用したデータのパスとバージョンが記録されていることがわかります。
Screen Shot 2022-11-20 at 10.52.29.png

もちろんですが、モデル本体も記録されていますので、簡単に他のノートブックから活用することも可能です。
Screen Shot 2022-11-20 at 11.01.23.png

特徴量エンジニアリング: データスキーマを進化

データセットの過去のバージョンを追跡するDelta Lakeを用いて、モデルパフォーマンスを改善するためにいくつかの特徴量エンジニアリングを行うことができます。最初に、ローンごとに支払い金額とローン金額の合計を捉える特徴量を追加します。

Python
print("------------------------------------------------------------------------------------------------")
print("ローンごとの支払い、ローン金額の合計を計算...")
loan_stats_new = loan_stats.withColumn('net', round( loan_stats.total_pymnt - loan_stats.loan_amnt, 2))

スキーマを安全に進化させるように、mergeSchemaを指定して更新したテーブルを保存します。

Python
loan_stats_new.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(DELTA_TABLE_DEFAULT_PATH)
Python
# オリジナルのスキーマと更新したスキーマの違いを確認します
set(loan_stats_new.schema.fields) - set(loan_stats.schema.fields)

カラムnetが追加されています。
Screen Shot 2022-11-20 at 10.38.21.png

更新したデータでモデルを再トレーニングし、オリジナルのモデルとパフォーマンスを比較します。
Screen Shot 2022-11-20 at 10.40.17.png

新しいモデルの精度を確認します。

Python
print("MLパイプラインの精度: %s" % lr_summary_new.accuracy)

Screen Shot 2022-11-20 at 10.41.00.png

このように、特徴量エンジニアリングを行うことで精度が改善されました。しかし、適切に情報を追跡する仕組みが無いと、「このモデルのトレーニングに使ったデータはどれだったっけ?」ですとか「そもそもベストなパフォーマンスのモデルはどこにいった?」ということになりかねません。こういった事態を回避し、データサイエンスの助けになるのがMLflowとDelta Lakeという強力な組み合わせなのです。

ここまでの取り組み内容はすべて自動でMLflowによってトラッキングされています。
Screen Shot 2022-11-20 at 11.06.06.png

2. オリジナルのデータバージョンを使用したランの特定

特徴量エンジニアリングのステップを経て、モデルの精度は ~80% から改善しました。この様に思うかもしれません: オリジナルのデータセットで構築したすべてのモデルを特徴量エンジニアリングしたデータセットで再トレーニングしたらどうなるのだろう?モデルパフォーマンスに動揺の改善が見られるのだろうか?

オリジナルデータセットに対して行われた他のランを特定するには、MLflowのmlflow.search_runs APIを使います。

Python
mlflow.search_runs(filter_string="tags.sparkDatasourceInfo LIKE '%path=dbfs:{path}%'".format(path=data_path, version=0))

Screen Shot 2022-11-20 at 11.00.02.png

3. データのスナップショットをロードし、ランを再現

最後に、モデルの再トレーニングに使うデータの特定のバージョンをロードすることができます。これを行うには、シンプルに上のウィジェットでデータバージョン1(特徴量エンジニアリングをおこなったデータに対応)を指定し、ノートブックのセクション1を再実行します。

まとめ

このように、機械学習モデルをトレーニングする際に記録すべきモデル本体はもちろんのこと、その際に指定したハイパーパラメーター、モデルのパフォーマンスを示すメトリクス、さらにはトレーニングに使用したデータまでもが追跡できていることを体験いただけたかと思います。

是非、サンプルノートブックを試していただければと思います。これらの機能を活用いただくことで、皆様のデータサイエンスの取り組みにおける生産性向上に貢献できたらと考えています。

この他、DatabricksがAIプロジェクトでどの様にお役に立つのかを漫画シリーズでまとめています。こちらもご覧いただければと思います。

Databricks 無料トライアル

Databricks 無料トライアル

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?