LoginSignup
0
0

More than 1 year has passed since last update.

Databricks Feature Storeを用いたモデルのトレーニング

Posted at

Train models using the Databricks Feature Store | Databricks on AWS [2022/9/23時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

本書では、Databricks Feature Storeの特徴量を用いてどの様にモデルをトレーニングするのかを説明します。最初に、どの様に特徴量を使用し、joinするのかを定義するトレーニングデータセットを作成する必要があります。そして、モデルをトレーニングを行う際にモデルは特徴量へのリファレンスを保持します。

推論にモデルを使う際、Feature Storeから特徴量を主翼させる様に選択することができます。また、モデルをサーバレスリアルタイム推論あるいは、DatabricksのクラシックMLflowモデルサービングでサーブすることもでき、その際には、オンラインストアに公開されている特徴量を自動で検索します。

注意
現時点では、Feature StoreはUnity Catalogメタストアをサポートしていません。Unity Catalogが有効化されたワークスペースでは、特徴量テーブルはデフォルトのHiveメタストアにのみ書き込むことができます。

トレーニングデータセットの作成

モデルとレーングで特徴量テーブルから特定の特徴量を選択するには、FeatureStoreClient.create_training_setAPIとFeatureLookupというオブジェクトを用いてトレーニングデータセットを作成します。FeatureLookupは、トレーニングセットで使用するそれぞれの特徴量を、特徴量テーブルの名前、特徴量の名前、FeatureStoreClient.create_training_setに渡されるデータフレームと特徴量テーブルをjoinする際のキーを用いて指定します。

FeatureLookupを作成する際にはfeature_namesパラメーターを使用します。feature_namesは、トレーニングセットを作成する時点での特徴量テーブルを検索するために、単一の特徴量の名前、特徴量名のリスト、None(主キーを除くすべての特徴量)を受け取ります。

本書には、両方のバージョンの構文に対するコードサンプルが含まれています。

この例では、trainingSet.load_dfによって返却されるデータフレームには、feature_lookupsに含まれているそれぞれの特徴量のカラムが含まれています。FeatureStoreClient.create_training_setに渡されるデータフレームに対して、exclude_columnsを用いて除外されたカラムを除くすべてのカラムを保持します。

Python
from databricks.feature_store import FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d', 'total_purchases_7d'],
      lookup_key = 'customer_id'
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups = feature_lookups,
  label = 'rating',
 exclude_columns = ['customer_id', 'product_id']
)

training_df = training_set.load_df()

検索キーが主キーにマッチしない際にトレーニングデータセットを作成

トレーニングセットのカラム名のFeatureLookupで引数lookup_keyを使います。FeatureStoreClient.create_training_setは、特徴量テーブルが作成された際に指定された主キーの順序を用いて、lookup_key引数で指定されたトレーニングセットのカラム間の順序ありjoinを行います。

この例では、recommender_system.customer_featuresには以下の主キーがあります: customer_id, dt

recommender_system.product_features特徴量テーブルには、主キーproduct_idがあります。

training_dfに以下のカラムがある場合、

  • cid
  • transaction_dt
  • product_id
  • rating

以下のコードは、TrainingSetに対して適切な特徴量検索を行います。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d', 'total_purchases_7d'],
      lookup_key = ['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]

FeatureStoreClient.create_training_setが呼び出されると、left joinを行い、recommender_system.customer_featurestraining_dfとを、以下のコードの様に(transaction_dt,cid)に対応するキー(customer_id,dt)を用いてテーブルをjoinします。

Python
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

異なる特徴量テーブルから同じ名前を持つ二つの特徴量を含むトレーニングデータセットを作成

FeatureLookupでオプションの引数output_nameを使います。指定される名前がTrainingSet.load_dfで返却されるデータフレームの特徴量名として使用されます。例えば、以下のコードでは、training_set.load_dfによって返却されるデータフレームには、カラムcustomer_heightproduct_heightが含まれています。

注意
子もデータフレームにおけるlookup_keyカラムの型は、参照する特徴量テーブルの主キーの型と一致する必要があります。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['height'],
      lookup_key = 'customer_id',
      output_name = 'customer_height',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['height'],
      lookup_key = 'product_id',
      output_name = 'product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

教師なし機械学習モデルのトレーニングデータセットを作成

教師なし機械学習モデルのトレーニングセットを作成する際には、label=Noneを設定します。例えば、以下のトレーニングセットは異なる顧客を彼らの興味に基づいてグルーピングするために使用することができます。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['interests'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = None,
    exclude_columns = ['customer_id']
  )

  training_df = training_set.load_df()

特徴量テーブルを用いたモデルのトレーニングとバッチ推論の実行

Feature Storeの特徴量を用いてモデルをトレーニングする際、モデルは特徴量へのリファレンスを保持します。推論にモデルを使う際、Feature Storeから特徴量を取得する様にすることができます。モデルで使用される特徴量の主キーを指定する必要があります。モデルはワークスペースのFeature Storeから必要な特徴量を取得します。スコアリングの過程で必要に応じて特徴量をjoinします。

推論の際に特徴量検索をサポートするためには:

  • FeatureStoreClient.log_modelを用いてモデルを記録する必要があります。

  • モデルをトレーニングするために、TrainingSet.load_dfで返却されるデータフレームを使う必要があります。モデルトレーニングを行う前に、何かしらの方法でこのデータフレームを編集した場合には、推論にモデルを使う際にその変更は適用されません。これは、モデルのパフォーマンスを引き下げます。

  • モデルの型は対応するMLflowのpython_flavorが存在する必要があります。MLflowでは、以下の様に大部分のPythonモデルトレーニングフレームワークをサポートしています。

    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras (python_flavor mlflow.kerasを使用)
  • カスタムのMLflow pyfuncモデル

Python
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d'],
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]


fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model,
    "recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri,
    batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

特徴量メタデータがパッケージングされたモデルのスコアリングの際にカスタム特徴量を使用

デフォルトでは、特徴量メタデータがパッケージングされたモデルは、推論時にFeature Storeから特徴量を取得します。スコアリングでカスタムの特徴量を使う際には、FeatureStoreClient.score_batch()に渡すデータフレームにそれらを含めます。

例えば、モデルにこれらの2つの特徴量をパッケージングするとします。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['account_creation_date', 'num_lifetime_purchases'],
      lookup_key = 'customer_id',
    ),
  ]

推論時には、account_creation_dateというカラムを含むデータフレームに対してFeatureStoreClient.score_batchを呼び出すことで、特徴量account_creation_dateに対するカスタムの値を指定することができます。この場合、APIはFeature Storeから特徴量num_lifetime_purchasesのみを検索し、モデルスコアリングには提供されたカスタムのaccount_creation_dateも使用します。

Python
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  'models:/ban_prediction_model/1',
  batch_df
)

Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニング、スコアリングする

Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニングすることができます。特徴量のメタデータをモデルにパッケージングすると、モデルは推論時にFeature Storeから特徴量を取得します。

モデルをトレーニングするには、FeatureStoreClient.create_training_setに渡すデータフレームにカラムとして追加のデータを含めます。このサンプルでは、Feature Storeからの特徴量total_purchases_30dと、追加のカラムbrowserを使用します。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

推論の際、FeatureStoreClient.score_batchで使用されるデータフレームにはカラムbrowserを含める必要があります。

Python
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri,
  batch_df
)

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0