Train models using the Databricks Feature Store | Databricks on AWS [2022/9/23時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、Databricks Feature Storeの特徴量を用いてどの様にモデルをトレーニングするのかを説明します。最初に、どの様に特徴量を使用し、joinするのかを定義するトレーニングデータセットを作成する必要があります。そして、モデルをトレーニングを行う際にモデルは特徴量へのリファレンスを保持します。
推論にモデルを使う際、Feature Storeから特徴量を主翼させる様に選択することができます。また、モデルをサーバレスリアルタイム推論あるいは、DatabricksのクラシックMLflowモデルサービングでサーブすることもでき、その際には、オンラインストアに公開されている特徴量を自動で検索します。
注意
現時点では、Feature StoreはUnity Catalogメタストアをサポートしていません。Unity Catalogが有効化されたワークスペースでは、特徴量テーブルはデフォルトのHiveメタストアにのみ書き込むことができます。
トレーニングデータセットの作成
モデルとレーングで特徴量テーブルから特定の特徴量を選択するには、FeatureStoreClient.create_training_setAPIとFeatureLookupというオブジェクトを用いてトレーニングデータセットを作成します。FeatureLookupは、トレーニングセットで使用するそれぞれの特徴量を、特徴量テーブルの名前、特徴量の名前、FeatureStoreClient.create_training_setに渡されるデータフレームと特徴量テーブルをjoinする際のキーを用いて指定します。
FeatureLookupを作成する際にはfeature_namesパラメーターを使用します。feature_namesは、トレーニングセットを作成する時点での特徴量テーブルを検索するために、単一の特徴量の名前、特徴量名のリスト、None(主キーを除くすべての特徴量)を受け取ります。
本書には、両方のバージョンの構文に対するコードサンプルが含まれています。
この例では、trainingSet.load_dfによって返却されるデータフレームには、feature_lookupsに含まれているそれぞれの特徴量のカラムが含まれています。FeatureStoreClient.create_training_setに渡されるデータフレームに対して、exclude_columnsを用いて除外されたカラムを除くすべてのカラムを保持します。
from databricks.feature_store import FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d', 'total_purchases_7d'],
lookup_key = 'customer_id'
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df()
検索キーが主キーにマッチしない際にトレーニングデータセットを作成
トレーニングセットのカラム名のFeatureLookupで引数lookup_keyを使います。FeatureStoreClient.create_training_setは、特徴量テーブルが作成された際に指定された主キーの順序を用いて、lookup_key引数で指定されたトレーニングセットのカラム間の順序ありjoinを行います。
この例では、recommender_system.customer_featuresには以下の主キーがあります: customer_id, dt。
recommender_system.product_features特徴量テーブルには、主キーproduct_idがあります。
training_dfに以下のカラムがある場合、
cidtransaction_dtproduct_idrating
以下のコードは、TrainingSetに対して適切な特徴量検索を行います。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d', 'total_purchases_7d'],
lookup_key = ['cid', 'transaction_dt']
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
FeatureStoreClient.create_training_setが呼び出されると、left joinを行い、recommender_system.customer_featuresとtraining_dfとを、以下のコードの様に(transaction_dt,cid)に対応するキー(customer_id,dt)を用いてテーブルをjoinします。
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
異なる特徴量テーブルから同じ名前を持つ二つの特徴量を含むトレーニングデータセットを作成
FeatureLookupでオプションの引数output_nameを使います。指定される名前がTrainingSet.load_dfで返却されるデータフレームの特徴量名として使用されます。例えば、以下のコードでは、training_set.load_dfによって返却されるデータフレームには、カラムcustomer_heightとproduct_heightが含まれています。
注意
子もデータフレームにおけるlookup_keyカラムの型は、参照する特徴量テーブルの主キーの型と一致する必要があります。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['height'],
lookup_key = 'customer_id',
output_name = 'customer_height',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['height'],
lookup_key = 'product_id',
output_name = 'product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
教師なし機械学習モデルのトレーニングデータセットを作成
教師なし機械学習モデルのトレーニングセットを作成する際には、label=Noneを設定します。例えば、以下のトレーニングセットは異なる顧客を彼らの興味に基づいてグルーピングするために使用することができます。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['interests'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = None,
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
特徴量テーブルを用いたモデルのトレーニングとバッチ推論の実行
Feature Storeの特徴量を用いてモデルをトレーニングする際、モデルは特徴量へのリファレンスを保持します。推論にモデルを使う際、Feature Storeから特徴量を取得する様にすることができます。モデルで使用される特徴量の主キーを指定する必要があります。モデルはワークスペースのFeature Storeから必要な特徴量を取得します。スコアリングの過程で必要に応じて特徴量をjoinします。
推論の際に特徴量検索をサポートするためには:
-
FeatureStoreClient.log_modelを用いてモデルを記録する必要があります。 -
モデルをトレーニングするために、
TrainingSet.load_dfで返却されるデータフレームを使う必要があります。モデルトレーニングを行う前に、何かしらの方法でこのデータフレームを編集した場合には、推論にモデルを使う際にその変更は適用されません。これは、モデルのパフォーマンスを引き下げます。 -
モデルの型は対応するMLflowの
python_flavorが存在する必要があります。MLflowでは、以下の様に大部分のPythonモデルトレーニングフレームワークをサポートしています。- scikit-learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras (
python_flavor mlflow.kerasを使用)
-
カスタムのMLflow pyfuncモデル
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model,
"recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri,
batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
特徴量メタデータがパッケージングされたモデルのスコアリングの際にカスタム特徴量を使用
デフォルトでは、特徴量メタデータがパッケージングされたモデルは、推論時にFeature Storeから特徴量を取得します。スコアリングでカスタムの特徴量を使う際には、FeatureStoreClient.score_batch()に渡すデータフレームにそれらを含めます。
例えば、モデルにこれらの2つの特徴量をパッケージングするとします。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['account_creation_date', 'num_lifetime_purchases'],
lookup_key = 'customer_id',
),
]
推論時には、account_creation_dateというカラムを含むデータフレームに対してFeatureStoreClient.score_batchを呼び出すことで、特徴量account_creation_dateに対するカスタムの値を指定することができます。この場合、APIはFeature Storeから特徴量num_lifetime_purchasesのみを検索し、モデルスコアリングには提供されたカスタムのaccount_creation_dateも使用します。
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
'models:/ban_prediction_model/1',
batch_df
)
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニング、スコアリングする
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニングすることができます。特徴量のメタデータをモデルにパッケージングすると、モデルは推論時にFeature Storeから特徴量を取得します。
モデルをトレーニングするには、FeatureStoreClient.create_training_setに渡すデータフレームにカラムとして追加のデータを含めます。このサンプルでは、Feature Storeからの特徴量total_purchases_30dと、追加のカラムbrowserを使用します。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id'] # 'browser' is not excluded
)
推論の際、FeatureStoreClient.score_batchで使用されるデータフレームにはカラムbrowserを含める必要があります。
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri,
batch_df
)