こちらのノートブックをウォークスルーします。
日本語に翻訳したものはこちらです。
Databricks Feature Storeとは
特徴量の一元管理を行うためのリポジトリです。似たような取り組みを行っているにも関わらず、複数人が特徴量を準備するのは無駄となります。このような無駄を排除し、特徴量の再利用を促進することができます。
DatabricksのFeature Storeを活用することで、以下のメリットを享受することができます。
- 検索容易性: Feature StoreのUIを通じて特徴量の検索、参照が可能となります。
- リネージ: Databricksで特徴量を作成すると、特徴量テーブルの作成で使用されたデータソースが保存され、容易に確認することができるようになります。また、使用されたノートブック、ジョブ、エンドポイントなどを確認することができます。
- モデルのスコアリングとサービングとの統合: Feature Storeの特徴量を使用してモデルをトレーニングすると、モデルには特徴量のメタデータも一緒にパッケージングされます。モデルをバッチ推論やオンライン推論で使用する際には、自動的にFeature Storeから特徴量が取得されます。
Unity Catakigにおける特徴量エンジニアリングの基本的なサンプル
こちらのノートブックではUnity Catalogに統合されたFeature Storeとモデルの管理機能もウォークスルーします。
このノートブックでは、MLモデルのトレーニングとバッチ推論を行うために、推論時にのみ利用できる特徴量を含むUnity Catalogの特徴量の作成、格納、管理を行うためのUnity Catalogの特徴量エンジニアリングの使い方を説明します。このサンプルでは、さまざまな静的なワインの特徴量やリアルタイムのインプットとMLモデルを用いてワインの品質を予測することがゴールとなります。
このノートブックでは以下の方法を説明します:
- 機械学習モデル向けのトレーニングデータセットを構築するための特徴量テーブルの作成
- 新バージョンのモデルを作成するために、特徴量テーブルを編集し、アップデートされたテーブルを使用
- 特徴量とモデルがどのような関係にあるのかを特定するためにDatabricksの特徴量UIを使用
- 自動特徴量検索を用いたバッチスコアリングの実行
要件
- Databricks機械学習ランタイム13.2以降
- Databricks機械学習ランタイムにアクセスできない場合には、Databricksランタイム13.2以降でこのノートブックを実行することができます。この際には、ノートブックの最初で
%pip install databricks-feature-engineering
を実行します。
- Databricks機械学習ランタイムにアクセスできない場合には、Databricksランタイム13.2以降でこのノートブックを実行することができます。この際には、ノートブックの最初で
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id, expr, rand
import uuid
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
import mlflow
import mlflow.sklearn
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
データセットのロード
以下のセルのコードでは、データセットをロードして少々のデータ準備を行います: それぞれの観測値に対してユニークなIDを作成し、カラム名から空白を除外します。ユニークIDのカラム(wine_id
)は特徴量テーブルの主キーとなり、特徴量の検索に使用されます。
raw_data = spark.read.load("/databricks-datasets/wine-quality/winequality-red.csv",format="csv",sep=";",inferSchema="true",header="true" )
def addIdColumn(dataframe, id_column_name):
"""データフレームに id カラムを追加"""
columns = dataframe.columns
new_df = dataframe.withColumn(id_column_name, monotonically_increasing_id())
return new_df[[id_column_name] + columns]
def renameColumns(df):
"""UCのFeature Engineeringと互換性を持つようにカラム名を変更"""
renamed_df = df
for column in df.columns:
renamed_df = renamed_df.withColumnRenamed(column, column.replace(' ', '_'))
return renamed_df
# 関数の実行
renamed_df = renameColumns(raw_data)
df = addIdColumn(renamed_df, 'wine_id')
# 特徴量テーブルに含めないターゲットカラム ('quality') を削除します
features_df = df.drop('quality')
display(features_df)
新規のカタログの作成、あるいは既存カタログの再利用
新規にカタログを作成するには、メタストアに対するCREATE CATALOG
権限が必要です。既存のカタログを使用する場合には、カタログに対するUSE CATALOG
権限が必要です。
catalog_name = "takaakiyayoi_catalog"
# 新規カタログを作成:
# spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
# spark.sql(f"USE CATALOG {catalog_name}")
# あるいは、既存カタログを再利用:
spark.sql(f"USE CATALOG {catalog_name}")
カタログに新規スキーマを作成
カタログに新規スキーマを作成するには、カタログに対するCREATE SCHEMA
権限が必要です。
spark.sql("CREATE SCHEMA IF NOT EXISTS wine_db")
spark.sql("USE SCHEMA wine_db")
# それぞれの実行ごとにユニークなテーブル名を作成。複数回ノートブックを実行する際のエラーを回避します。
table_name = f"{catalog_name}.wine_db.wine_db_" + str(uuid.uuid4())[:6]
print(table_name)
takaakiyayoi_catalog.wine_db.wine_db_807ef4
特徴量テーブルの作成
最初のステップではFeatureEngineeringClient
を作成します。
fe = FeatureEngineeringClient()
# ノートブックでfeature engineering client APIの関数のヘルプを取得できます :
# help(fe.<function_name>)
# 例:
# help(fe.create_table)
特徴量テーブルを作成します。完全なAPIリファレンスについては(AWS|Azure|GCP)をご覧ください。
fe.create_table(
name=table_name,
primary_keys=["wine_id"],
df=features_df,
schema=features_df.schema,
description="ワインの特徴量"
)
Unity Catalogの特徴量エンジニアリングを用いたモデルのトレーニング
特徴量テーブルには予測ターゲットは含まれません。しかし、トレーニングデータセットには予測ターゲットの値が必要です。また、モデルが推論で使用されるまで利用できない特徴量が存在する場合があります。
この例では、推論時にのみ観測できるワインの特性を表現する特徴量 real_time_measurement
を使用します。この特徴量はトレーニングで使用され、推論時にはワインの特徴量の値として提供されます。
以下の例ではreal_time_measurement
としてダミーデータを追加してます。
## inference_data_df には、 wine_id (主キー)、quality (予測ターゲット)、リアルタイムの特徴量が含まれます
inference_data_df = df.select("wine_id", "quality", (10 * rand()).alias("real_time_measurement"))
display(inference_data_df)
特徴量テーブルから特徴量を検索するために指定されたlookup_key
とオンライン特徴量real_time_measurement
を使用するトレーニングデータセットを構築するためにFeatureLookup
を使用します。feature_names
パラメータを指定しない場合には、主キーを除くすべての特徴量が返却されます。
def load_data(table_name, lookup_key):
# FeatureLookupで`feature_names`パラメータを指定しない場合、主キーを除くすべての特徴量が返却されます
model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
# fe.create_training_setはinference_data_dfと主キーがマッチするmodel_feature_lookupsの特徴量を検索します
training_set = fe.create_training_set(df=inference_data_df, feature_lookups=model_feature_lookups, label="quality", exclude_columns="wine_id")
training_pd = training_set.load_df().toPandas()
# トレーニングデータセット、テストデータセットを作成
X = training_pd.drop("quality", axis=1)
y = training_pd["quality"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test, training_set
# トレーニングデータセットとテストデータセットを作成
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()
fe.create_training_set
のdf
に指定されたinference_data_df
とmodel_feature_lookups
に指定された検索条件に従って取得された特徴量テーブルが結合されたトレーニングデータセットを取得することができます。
トレーニングしたモデルがUnity Catalog配下で管理されるように設定します。
from mlflow.tracking.client import MlflowClient
# Unity CatalogのモデルにアクセスするようにMLflowクライアントを設定
mlflow.set_registry_uri("databricks-uc")
model_name = f"{catalog_name}.wine_db.wine_model"
client = MlflowClient()
try:
client.delete_registered_model(model_name) # 作成済みの場合にはモデルを削除
except:
None
次のセルのコードはscikit-learnのRandomForestRegressorモデルをトレーニングし、UCのFeature Engineeringを用いてモデルを記録します。
このコードはトレーニングのパラメータと結果を追跡するためのMLflowエクスペリメントをスタートします。モデルのオートロギングをオフ(mlflow.sklearn.autolog(log_models=False)
)にしていることに注意してください。これは、モデルはfe.log_model
を用いて記録されるためです。
# MLflowオートロギングを無効化して、UCのFeature Engineeringを用いてモデルを記録
mlflow.sklearn.autolog(log_models=False)
def train_model(X_train, X_test, y_train, y_test, training_set, fe):
## モデルのフィッティングと記録
with mlflow.start_run() as run:
rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
fe.log_model(
model=rf,
artifact_path="wine_quality_prediction",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=model_name,
)
train_model(X_train, X_test, y_train, y_test, training_set, fe)
記録されたモデルを参照するには、このノートブックのMLflowエクスペリメントページに移動します。エクスペリメントページにアクセスするには、左のナビゲーションバーのエクスペリメントアイコンをクリックします:
リストからノートブックのエクスペリメントを探します。ノートブックと同じ名前になっており、この場合feature-store-with-uc-basic-example
となります。
エクスペリメントページを表示するにはエクスペリメント名をクリックします。このページのArtifactsセクションには、fe.log_model
を呼び出した際に作成された、パッケージングされたUCのFeature Engineeringモデルが表示されます。
また、このモデルは自動的にUnity Catalogに登録されます。
バッチスコアリング
推論において新規データに対して、パッケージングされたFeature Engineering in UCモデルを適用するには、score_batch
を使用します。入力データには主キーのカラムwine_id
とリアルタイムの特徴量であるreal_time_measurement
のみが必要となります。モデルは自動で特徴量テーブルからすべてのその他の特徴量を検索します。
# ヘルパー関数
def get_latest_model_version(model_name):
latest_version = 1
mlflow_client = MlflowClient()
for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
version_int = int(mv.version)
if version_int > latest_version:
latest_version = version_int
return latest_version
## シンプルにするために、この例では予測の入力データとしてinference_data_dfを使います
batch_input_df = inference_data_df.drop("quality") # ラベルカラムの削除
latest_model_version = get_latest_model_version(model_name)
predictions_df = fe.score_batch(model_uri=f"models:/{model_name}/{latest_model_version}", df=batch_input_df)
display(predictions_df["wine_id", "prediction"])
特徴量テーブルの修正
新たな特徴量を追加することでデータフレームを修正したとします。特徴量テーブルを更新するにはmode="merge"
でfe.write_table
を使用します。
## 特徴量を保持するデータフレームの修正
so2_cols = ["free_sulfur_dioxide", "total_sulfur_dioxide"]
new_features_df = (features_df.withColumn("average_so2", expr("+".join(so2_cols)) / 2))
display(new_features_df)
fe.write_table
でmode="merge"
を指定して特徴量テーブルを更新します。
fe.write_table(
name=table_name,
df=new_features_df,
mode="merge"
)
特徴量テーブルから特徴量を読み込むにはfe.read_table()
を使用します。
# 最新バージョンの特徴量テーブルを表示します
# 現行バージョンで削除された特徴量は表示されますが、値はnullとなります
display(fe.read_table(name=table_name))
更新された特徴量テーブルを用いた新たなモデルバージョンのトレーニング
def load_data(table_name, lookup_key):
model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
# fe.create_training_set は inference_data_df とキーがマッチする model_feature_lookups 特徴量を検索します
training_set = fe.create_training_set(df=inference_data_df, feature_lookups=model_feature_lookups, label="quality", exclude_columns="wine_id")
training_pd = training_set.load_df().toPandas()
# トレーニングデータセットとテストデータセットの作成
X = training_pd.drop("quality", axis=1)
y = training_pd["quality"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
return X_train, X_test, y_train, y_test, training_set
X_train, X_test, y_train, y_test, training_set = load_data(table_name, "wine_id")
X_train.head()
トレーニングデータセットを構築する際、特徴量を検索するために指定された key
を使用します。
def train_model(X_train, X_test, y_train, y_test, training_set, fe):
## モデルのフィッティングと記録
with mlflow.start_run() as run:
rf = RandomForestRegressor(max_depth=3, n_estimators=20, random_state=42)
rf.fit(X_train, y_train)
y_pred = rf.predict(X_test)
mlflow.log_metric("test_mse", mean_squared_error(y_test, y_pred))
mlflow.log_metric("test_r2_score", r2_score(y_test, y_pred))
fe.log_model(
model=rf,
artifact_path="feature-store-model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name=model_name,
)
train_model(X_train, X_test, y_train, y_test, training_set, fe)
score_batch
を用いて特徴量に最新バージョンの登録MLflowモデルを適用します。
## シンプルにするために、この例では予測の入力データとしてinference_data_dfを使います
batch_input_df = inference_data_df.drop("quality") # ラベルカラムの削除
latest_model_version = get_latest_model_version(model_name)
predictions_df = fe.score_batch(model_uri=f"models:/{model_name}/{latest_model_version}", df=batch_input_df)
display(predictions_df["wine_id","prediction"])
特徴量テーブルの権限コントロールと削除
- Unity Catalogの特徴量テーブルに誰がアクセスできるのかをコントロールするには、カタログエクスプローラのテーブル詳細ページにあるPermissionsボタンを使います。
- Unity Catalog特徴量テーブルを削除するには、カタログエクスプローラのテーブル詳細ページにあるケバブメニューをクリックし、Deleteを選択します。UIを用いてUnity Catalog特徴量テーブルを削除すると、対応するDeltaテーブルも削除されます。
カタログエクスプローラによるリネージの確認
特徴量テーブルと機械学習モデルの両方がUnity Catalogで管理されることで、リネージも追跡されます。特徴量テーブルの依存関係を開くと、この特徴量テーブルがどのモデルによって使用されているのかを確認することができます。
これまでは、特徴量テーブルも機械学習モデルも各自の環境で開発して終わりのケースが多かったと思うのですが、このように、これらを一元管理することでモデルのトレーニングで使用された特徴量の追跡や、特徴量テーブルのコンシューマーを容易に特定できるようになります。是非ご活用ください!