Train models using the Databricks Feature Store | Databricks on AWS [2022/9/23時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、Databricks Feature Storeの特徴量を用いてどの様にモデルをトレーニングするのかを説明します。最初に、どの様に特徴量を使用し、joinするのかを定義するトレーニングデータセットを作成する必要があります。そして、モデルをトレーニングを行う際にモデルは特徴量へのリファレンスを保持します。
推論にモデルを使う際、Feature Storeから特徴量を主翼させる様に選択することができます。また、モデルをサーバレスリアルタイム推論あるいは、DatabricksのクラシックMLflowモデルサービングでサーブすることもでき、その際には、オンラインストアに公開されている特徴量を自動で検索します。
注意
現時点では、Feature StoreはUnity Catalogメタストアをサポートしていません。Unity Catalogが有効化されたワークスペースでは、特徴量テーブルはデフォルトのHiveメタストアにのみ書き込むことができます。
トレーニングデータセットの作成
モデルとレーングで特徴量テーブルから特定の特徴量を選択するには、FeatureStoreClient.create_training_set
APIとFeatureLookup
というオブジェクトを用いてトレーニングデータセットを作成します。FeatureLookup
は、トレーニングセットで使用するそれぞれの特徴量を、特徴量テーブルの名前、特徴量の名前、FeatureStoreClient.create_training_set
に渡されるデータフレームと特徴量テーブルをjoinする際のキーを用いて指定します。
FeatureLookup
を作成する際にはfeature_names
パラメーターを使用します。feature_names
は、トレーニングセットを作成する時点での特徴量テーブルを検索するために、単一の特徴量の名前、特徴量名のリスト、None(主キーを除くすべての特徴量)を受け取ります。
本書には、両方のバージョンの構文に対するコードサンプルが含まれています。
この例では、trainingSet.load_df
によって返却されるデータフレームには、feature_lookups
に含まれているそれぞれの特徴量のカラムが含まれています。FeatureStoreClient.create_training_set
に渡されるデータフレームに対して、exclude_columns
を用いて除外されたカラムを除くすべてのカラムを保持します。
from databricks.feature_store import FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d', 'total_purchases_7d'],
lookup_key = 'customer_id'
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
df=training_df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df()
検索キーが主キーにマッチしない際にトレーニングデータセットを作成
トレーニングセットのカラム名のFeatureLookup
で引数lookup_key
を使います。FeatureStoreClient.create_training_set
は、特徴量テーブルが作成された際に指定された主キーの順序を用いて、lookup_key
引数で指定されたトレーニングセットのカラム間の順序ありjoinを行います。
この例では、recommender_system.customer_features
には以下の主キーがあります: customer_id
, dt
。
recommender_system.product_features
特徴量テーブルには、主キーproduct_id
があります。
training_df
に以下のカラムがある場合、
cid
transaction_dt
product_id
rating
以下のコードは、TrainingSet
に対して適切な特徴量検索を行います。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d', 'total_purchases_7d'],
lookup_key = ['cid', 'transaction_dt']
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
FeatureStoreClient.create_training_set
が呼び出されると、left joinを行い、recommender_system.customer_features
とtraining_df
とを、以下のコードの様に(transaction_dt,cid
)に対応するキー(customer_id,dt
)を用いてテーブルをjoinします。
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
異なる特徴量テーブルから同じ名前を持つ二つの特徴量を含むトレーニングデータセットを作成
FeatureLookup
でオプションの引数output_name
を使います。指定される名前がTrainingSet.load_df
で返却されるデータフレームの特徴量名として使用されます。例えば、以下のコードでは、training_set.load_df
によって返却されるデータフレームには、カラムcustomer_height
とproduct_height
が含まれています。
注意
子もデータフレームにおけるlookup_key
カラムの型は、参照する特徴量テーブルの主キーの型と一致する必要があります。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['height'],
lookup_key = 'customer_id',
output_name = 'customer_height',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['height'],
lookup_key = 'product_id',
output_name = 'product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
教師なし機械学習モデルのトレーニングデータセットを作成
教師なし機械学習モデルのトレーニングセットを作成する際には、label=None
を設定します。例えば、以下のトレーニングセットは異なる顧客を彼らの興味に基づいてグルーピングするために使用することができます。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['interests'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = None,
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
特徴量テーブルを用いたモデルのトレーニングとバッチ推論の実行
Feature Storeの特徴量を用いてモデルをトレーニングする際、モデルは特徴量へのリファレンスを保持します。推論にモデルを使う際、Feature Storeから特徴量を取得する様にすることができます。モデルで使用される特徴量の主キーを指定する必要があります。モデルはワークスペースのFeature Storeから必要な特徴量を取得します。スコアリングの過程で必要に応じて特徴量をjoinします。
推論の際に特徴量検索をサポートするためには:
-
FeatureStoreClient.log_model
を用いてモデルを記録する必要があります。 -
モデルをトレーニングするために、
TrainingSet.load_df
で返却されるデータフレームを使う必要があります。モデルトレーニングを行う前に、何かしらの方法でこのデータフレームを編集した場合には、推論にモデルを使う際にその変更は適用されません。これは、モデルのパフォーマンスを引き下げます。 -
モデルの型は対応するMLflowの
python_flavor
が存在する必要があります。MLflowでは、以下の様に大部分のPythonモデルトレーニングフレームワークをサポートしています。- scikit-learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras (
python_flavor mlflow.keras
を使用)
-
カスタムのMLflow pyfuncモデル
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model,
"recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri,
batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
特徴量メタデータがパッケージングされたモデルのスコアリングの際にカスタム特徴量を使用
デフォルトでは、特徴量メタデータがパッケージングされたモデルは、推論時にFeature Storeから特徴量を取得します。スコアリングでカスタムの特徴量を使う際には、FeatureStoreClient.score_batch()
に渡すデータフレームにそれらを含めます。
例えば、モデルにこれらの2つの特徴量をパッケージングするとします。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['account_creation_date', 'num_lifetime_purchases'],
lookup_key = 'customer_id',
),
]
推論時には、account_creation_date
というカラムを含むデータフレームに対してFeatureStoreClient.score_batch
を呼び出すことで、特徴量account_creation_date
に対するカスタムの値を指定することができます。この場合、APIはFeature Storeから特徴量num_lifetime_purchases
のみを検索し、モデルスコアリングには提供されたカスタムのaccount_creation_date
も使用します。
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
'models:/ban_prediction_model/1',
batch_df
)
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニング、スコアリングする
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニングすることができます。特徴量のメタデータをモデルにパッケージングすると、モデルは推論時にFeature Storeから特徴量を取得します。
モデルをトレーニングするには、FeatureStoreClient.create_training_set
に渡すデータフレームにカラムとして追加のデータを含めます。このサンプルでは、Feature Storeからの特徴量total_purchases_30d
と、追加のカラムbrowser
を使用します。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id'] # 'browser' is not excluded
)
推論の際、FeatureStoreClient.score_batch
で使用されるデータフレームにはカラムbrowser
を含める必要があります。
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri,
batch_df
)