Work with feature tables | Databricks on AWS [2022/2/3時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
特徴量テーブルの検索、特徴量リネージュ、鮮度のトラッキング、特徴量テーブルの削除に関しては、Use the Feature Store UIを参照ください。
注意
データベースと特徴量テーブル名にハイフン(-)を含めることはできません。
特徴量テーブルのためのデータベースを作成する
あらゆる特徴量テーブルを作成する前に、テーブルを格納するためのデータベースを作成する必要があります。
%sql CREATE DATABASE IF NOT EXISTS <database_name>
特徴量テーブルはDeltaテーブルとして格納されます。create_table
(Databricks ランタイム10.2 ML以降)、create_feature_table
(Databricksランタイム10.1 ML以前)を用いて特徴量テーブルを作成する際、データベース名を指定する必要があります。例えば、以下の引数はデータベースrecommender_system
にテーブルcustomer_features
を作成します。
name='recommender_system.customer_features'
オンラインストアに特徴量テーブルを公開する際、デフォルトのテーブル名、データベース名はテーブルを作成した際に指定したものが使用されます。publish_table
メソッドで別の名前を指定することもできます。
Databricks Feature StoreのUIでは、他のメタデータと共にオンラインストアにおけるテーブル、データベース名を表示します。
Databricks Feature Storeで特徴量テーブルを作成する
特徴量テーブルを作成する基本的なステップは以下の通りとなります。
- 特徴量を計算するPython関数を記述します。それぞれの関数のアウトプットは、ユニークな主キーを持つApache Sparkデータフレームである必要があります。主キーは1つ以上のカラムで構成することができます。
-
FeatureStoreClient
のインスタンスを作成し、create_table
(Databricks ランタイム10.2 ML以降)、create_feature_table
(Databricksランタイム10.1 ML以前)を用いて特徴量テーブルを作成します。 -
write_table
を用いて特徴量テーブルにデータを追加します。
Databricks ランタイム10.2 ML以降
from databricks.feature_store import feature_table
def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass
# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient
customer_features_df = compute_customer_features(df)
fs = FeatureStoreClient()
customer_feature_table = fs.create_table(
name='recommender_system.customer_features',
primary_keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)
# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.
# customer_feature_table = fs.create_table(
# ...
# df=customer_features_df,
# ...
# )
# To use a composite key, pass all keys in the create_table call
# customer_feature_table = fs.create_table(
# ...
# primary_keys=['customer_id', 'date'],
# ...
# )
# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table
fs.write_table(
name='recommender_system.customer_features',
df = customer_features_df,
mode = 'overwrite'
)
Databricksランタイム10.1 ML以前
from databricks.feature_store import feature_table
def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass
# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient
customer_features_df = compute_customer_features(df)
fs = FeatureStoreClient()
customer_feature_table = fs.create_feature_table(
name='recommender_system.customer_features',
keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)
# An alternative is to use `create_feature_table` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.
# customer_feature_table = fs.create_feature_table(
# ...
# features_df=customer_features_df,
# ...
# )
# To use a composite key, pass all keys in the create_feature_table call
# customer_feature_table = fs.create_feature_table(
# ...
# keys=['customer_id', 'date'],
# ...
# )
# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table
fs.write_table(
name='recommender_system.customer_features',
df = customer_features_df,
mode = 'overwrite'
)from databricks.feature_store import feature_table
def compute_customer_features(data):
''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
pass
# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient
customer_features_df = compute_customer_features(df)
fs = FeatureStoreClient()
customer_feature_table = fs.create_feature_table(
name='recommender_system.customer_features',
keys='customer_id',
schema=customer_features_df.schema,
description='Customer features'
)
# An alternative is to use `create_feature_table` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.
# customer_feature_table = fs.create_feature_table(
# ...
# features_df=customer_features_df,
# ...
# )
# To use a composite key, pass all keys in the create_feature_table call
# customer_feature_table = fs.create_feature_table(
# ...
# keys=['customer_id', 'date'],
# ...
# )
# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table
fs.write_table(
name='recommender_system.customer_features',
df = customer_features_df,
mode = 'overwrite'
)
特徴量テーブルを更新する
新たな特徴量を追加するか、主キーに基づいて特定の行を更新することで特徴量テーブルを更新することができます。
以下の特徴量テーブルのメタデータを更新することはできません。
- 主キー
- パーティションキー
- 既存特徴量の名称、型
既存の特徴量テーブルに新規特徴量を追加
2つの方法のいずれかで既存特徴量テーブルに新規特徴量を追加することができます。
- 既存の特徴量計算関数を更新し、返却されるデータフレームを用いて
write_table
を実行します。これにより、特徴量テーブルのスキーマが更新され、主キーに基づいて新規特徴量がマージされます。 - 新規の特徴量を計算するために、新たな特徴量計算関数を作成します。この新規の特徴量計算関数から返却されるデータフレームには、特徴量テーブルの主キーと(定義されている場合には)パーティションキーを含める必要があります。既存特徴量テーブルに新規特徴量を書き込むために、同じ主キーを持つデータフレームを用いて
write_table
を実行します。
特徴量テーブルの特定行のみを更新
write_table
でmode = "merge"
を使用します。write_table
の呼び出しで送信されるデータフレームに存在しない主キーを持つ行は変更されません。
fs.write_table(
name='recommender.customer_features',
df = customer_features_df,
mode = 'merge'
)
特徴量テーブルを更新するジョブをスケジュールする
特徴量テーブルの特徴量が常に最新の値になるように、日次など一定周期で特徴量テーブルを更新するノートブックを実行するジョブを作成することをお勧めします。スケジュールされていないジョブを既に作成されているのであれば、特徴量が常に最新の状態になるようにスケジュールジョブに変換することができます。
以下の例に示すように、特徴量を更新するコードではmode='merge'
を使用してください。
fs = FeatureStoreClient()
customer_features_df = compute_customer_features(data)
fs.write_table(
df=customer_features_df,
name='recommender_system.customer_features',
mode='merge'
)
日次特徴量の過去の値を格納する
複合主キーを用いて特徴量テーブルを定義します。主キーには日付を含めてください。例えば、特徴量テーブルstore_purchases
に対しては、複合主キー(date, user_id
)を使用し、効率的に読み込みを行うためにパーティションキーdate
を指定します。
特徴量テーブルを興味のある期間date
でフィルタリングするコードを作成します。特徴量テーブルを最新の状態に保つために、定期的に特徴量を書き込むスケジュールジョブをセットアップするか、特徴量テーブルに新規特徴量をストリームで書き込むようにセットアップします。
特徴量を更新するためのストリーミング特徴量計算パイプラインを作成する
ストリーミング特徴量計算パイプラインを作成するために、write_table
の引数としてストリーミングのDataFrame
を引き渡します。このメソッドはStreamingQuery
オブジェクトを返却します。
def compute_additional_customer_features(data):
''' Returns Streaming DataFrame
'''
pass # not shown
customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)
fs.write_table(
df=stream_df,
name='recommender_system.customer_features',
mode='merge'
)
特徴量テーブルから読み込む
特徴量を読み込むにはread_table
を使用します。
特徴量テーブルのメタデータを読み込むAPIは、お使いのDatabricksランタイムバージョンに依存します。Databricksランタイム10.2 ML以降ではget_table
を使用してください。Databricks Runtime 10.1 ML以前ではget_feature_table
を使用してください。
特定のタイムスタンプのデータを読み込む
Databricksの特徴量テーブルはDeltaテーブルですので、任意のタイムスタンプの特徴量を読み込むことができます。
import datetime
yesterday = datetime.date.today() - datetime.timedelta(days=1)
# read customer_features values from 1 day ago
customer_features_df = fs.read_table(
name='recommender_system.customer_features',
as_of_delta_timestamp=str(yesterday)
)
トレーニングデータセットを作成する
モデルトレーニングのために特徴量テーブルから特定の特徴量を選択するために、トレーニングデータセットを作成します。
トレーニングデータセットを作成するには以下の手順を踏みます。
- トレーニングデータセットで使用したい特徴量を指定するために、
FeatureLookup
を作成します。引数lookup_key
は、特徴量テーブルの主キーとジョインを行うために、training_df
データにおける結合キーとなるカラム名を指定します。 - トレーニングデータセットを定義するために、
create_training_set
をコールします。
この例では、trainingSet.load_df
で返却されるデータフレームには、feature_lookups
で指定される特徴量カラムが含まれています。これにより、exclude_columns
を用いて除外されるカラムを除いて、FeatureStoreClient.create_training_set
に渡されるデータフレームの全てのカラムが保持されます。
from databricks.feature_store import FeatureLookup
# Model training flow uses these features from Feature Store
# 'total_purchases_30d' feature table 'recommender_system.customer_features' and
# 'category' from 'recommender_system.product_features'
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'total_purchases_30d',
lookup_key = 'customer_id'
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_name = 'category',
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# Training DataFrame must have lookup keys 'customer_id' and 'product_id' and label 'rating'
training_set = fs.create_training_set(
df=training_df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df()
検索キーが主キーとマッチしない際のTrainingSetを作成する
トレーニングセットにおけるカラム名に対して、FeatureLookup
で引数lookup_key
を使用します。FeatureStoreClient.create_training_set
は、特徴量テーブルが作成された際に指定された主キーの順序を用いて、lookup_key
で指定されたトレーニングセットのカラム間の順序つきジョインを実行します。
この例では、recommender_system.customer_features
には以下の主キーがあります。
customer_id
dt
そして、recommender_system.product_features
特徴量テーブルには主キーproduct_id
があります。
training_df
に以下のカラムがある場合、
cid
transaction_dt
product_id
rating
以下のコードはTrainingSet
に対して、適切な特徴量ルックアップを作成します。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'total_purchases_30d',
lookup_key = ['cid', 'transaction_dt']
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_name = 'category',
lookup_key = 'product_id'
)
]
FeatureStoreClient.create_training_set
を呼び出す際、以下のコードに示すように(transaction_dt,cid
)に対応する(customer_id,dt
)のキーを用いて、recommender_system.customer_features
とtraining_df
を結合するレフトジョインを実行します。
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
customer_features_df,
on=[training_df.cid == customer_features_df.customer_id,
training_df.transaction_dt == customer_features_df.dt],
how="left"
).join(
product_features_df,
on="product_id",
how="left"
)
異なる特徴量テーブルで同じ名称を持つ二つの特徴量を含むTrainingSetを作成する
FeatureLookup
で、オプションの引数output_name
を使用します。指定された名称はTrainingSet.load_df
から返却されるデータフレームにおける特徴量名として使用されます。例えば、以下のコードを用いることで、training_set.load_df
で返却されるデータフレームには、カラムcustomer_height
とproduct_height
が含まれます。
注意
データフレームにおけるlookup_key
カラムの型は、参照する特徴量テーブルの主キーの型と一致する必要があります。
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'height',
lookup_key = 'customer_id',
output_name = 'customer_height',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_name = 'height',
lookup_key = 'product_id',
output_name = 'product_height'
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
教師なし機械学習モデルに対するTrainingSetを作成する
教師なし学習モデルに対するトレーニングセットを作成する際に、label=None
を設定します。例えば、以下のトレーニングセットは異なる顧客を彼らの興味に基づいてグループにクラスタリングするために使用することができます。
Databricks Runtime 9.0 ML以前
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'interests',
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = None,
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
Databricks Runtime 9.1 ML LTS以降
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['interests'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
with mlflow.start_run():
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = None,
exclude_columns = ['customer_id']
)
training_df = training_set.load_df()
特徴量テーブルを用いてモデルをトレーニングし、バッチ推論を実行する
Feature Storeの特徴量を用いてモデルをトレーニングする際、モデルは特徴量への参照を保持します。推論にモデルを使用する際、Feature Storeから取得する特徴量を選択することができます。モデルで使用する特徴量の主キーを指定する必要があります。モデルはワークスペースにおけるFeature Storeから必要とする特徴量を収集します。そして、スコアリングの過程で必要となる特徴量を結合します。
推論時の特徴量検索をサポートするには以下の手順を踏みます。
-
FeatureStoreClient.log_model
を用いてモデルをロギングします。 - モデルをトレーニングするために
TrainingSet.load_df
から返却されるデータwフレームを使用する必要があります。モデルのトレーニングの前にこのデータフレームを変更した場合、モデルを推論に使う際にはこの変更は適用されません。これによって、モデルのパフォーマンスが悪化します。 - モデルのタイプには、MLflowで対応する
python_flavor
を指定する必要があります。MLflowでは以下のように多くのPythonモデルトレーニングフレームワークをサポートしています。- scikit-learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras (
python_flavor mlflow.keras
を使用) - カスタムMLflow pyfuncモデル
Databricks Runtime 9.0 ML以前
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'total_purchases_30d',
lookup_key = 'customer_id',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_name = 'category',
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model,
"recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri,
batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
Databricks Runtime 9.1 ML LTS以降
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
FeatureLookup(
table_name = 'recommender_system.product_features',
feature_names = ['category'],
lookup_key = 'product_id'
)
]
fs = FeatureStoreClient()
with mlflow.start_run():
# df has columns ['customer_id', 'product_id', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id', 'product_id']
)
training_df = training_set.load_df().toPandas()
# "training_df" columns ['total_purchases_30d', 'category', 'rating']
X_train = training_df.drop(['rating'], axis=1)
y_train = training_df.rating
model = linear_model.LinearRegression().fit(X_train, y_train)
fs.log_model(
model,
"recommendation_model",
flavor=mlflow.sklearn,
training_set=training_set,
registered_model_name="recommendation_model"
)
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
model_uri,
batch_df
)
# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’
特徴量メタデータでパッケージされたモデルをスコアリングする際にカスタムの特徴量を使用する
デフォルトでは、特徴量メタデータでパッケージされたモデルは、推論時にFeature Storeから特徴量を検索します。スコアリング時にカスタムの特徴量を使用するには、FeatureStoreClient.score_batch()に渡すデータフレームにカスタム特徴量を含めます。
例えば、これら2つの特徴量でモデルをパッケージするとします。
Databricks Runtime 9.0 ML以前
# This syntax is deprecated with Databricks Runtime for Machine Learning 9.1 ML and above.
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'account_creation_date',
lookup_key = 'customer_id',
),
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'num_lifetime_purchases',
lookup_key = 'customer_id'
),
]
Databricks Runtime 9.1 ML LTS以降
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['account_creation_date', 'num_lifetime_purchases'],
lookup_key = 'customer_id',
),
]
推論時には、account_creation_date
というカラムを含むデータフレームを用いてFeatureStoreClient.score_batch
をコールすることで、特徴量account_creation_date
を含めることができます。この場合、APIはモデルスコアリングの際、Feature Storeから特徴量num_lifetime_purchases
のみを検索し、カスタムのaccount_creation_date
カラムの値とともに使用します。
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
'models:/ban_prediction_model/1',
batch_df
)
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルのトレーニング、スコアリングを行う
Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニングすることができます。特徴量メタデータでモデルをパッケージすると、推論の際モデルはFeature Storeから特徴量を取得します。
モデルをトレーニングするために、FeatureStoreClient.create_training_set
に渡すデータフレームのカラムとして、追加のデータを含めます。この例では、Feature Storeからのtotal_purchases_30d
と、外部カラムbrowser
を使用します。
Databricks Runtime 9.0 ML以前
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_name = 'total_purchases_30d',
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id'] # 'browser' is not excluded
)
Databricks Runtime 9.1 ML LTS以降
feature_lookups = [
FeatureLookup(
table_name = 'recommender_system.customer_features',
feature_names = ['total_purchases_30d'],
lookup_key = 'customer_id',
),
]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
df,
feature_lookups = feature_lookups,
label = 'rating',
exclude_columns = ['customer_id'] # 'browser' is not excluded
)
推論の際には、FeatureStoreClient.score_batch
で使用するデータフレームにbrowser
カラムが含まれている必要があります。
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
model_uri,
batch_df
)
オンライン特徴量ストアに特徴量を公開する
リアルタイムのサービングのためにオンラインストアに特徴量を公開するためには、publish_table
を使用します。サポートされるオンラインストアに関してはオンラインストアを参照ください。
Databricksシークレットを用いてオンラインストアに認証情報を提供する
オンラインストアに特徴量テーブルを公開する際には認証情報を提供する必要があります。Databricksのシークレットにこれらの認証情報を格納し、公開時にwrite_secret_prefix
を用いてこれらを参照することをお勧めします。
また、DatabricksがホストするMLflowモデルがオンラインストアに接続し、特徴量を検索できるようにするために認証情報を提供する必要があります。これらの認証情報はDatabricksシークレットに格納し、公開時にread_secret_prefix
を引き渡す必要があります。
- オンラインストアの認証情報を含む2つのシークレットスコープを作成します:1つは読み取り専用アクセスのため(ここでは
<read_only_scope>
とします)、もう1つは読み書きアクセスのためのもの(ここでは<read_write_scope>
とします)です。あるいは、既存のシークレットスコープを使用することもできます。 - ターゲットオンラインストアに対するユニークな名称を選択します。ここでは
<prefix>
とします。以下のシークレットを作成します。- ターゲットのオンライストアに対して読み取りアクセスを行うユーザー:
databricks secrets put --scope <read_only_scope> --key <prefix>-user
- ターゲットのオンライストアに対して読み取りアクセスを行うユーザーのパスワード:
databricks secrets put --scope <read_only_scope> --key <prefix>-password
- ターゲットのオンラインストアに対して読み書きアクセスを行うユーザー:
databricks secrets put --scope <read_write_scope> --key <prefix>-user
- ターゲットのオンラインストアに対して読み書きアクセスを行うユーザーのパスワード:
databricks secrets put --scope <read_write_scope> --key <prefix>-password
- ターゲットのオンライストアに対して読み取りアクセスを行うユーザー:
注意
ワークスペースごとにシークレットスコープの最大数の制限があることに注意してください。制限値を超過しないようにするために、すべてのオンラインストアにアクセスするための一つのシークレットを定義して共有することができます。
オンラインストアにバッチ計算の特徴量を公開する
更新された特徴量を定期的に公開するために、Databricksジョブを作成、スケジュールすることができます。このジョブには、更新特徴量を計算するためのコードを含めることができますし、特徴量の更新を計算し公開する別のジョブを作成、実行することもできます。
以下のコードでは、名前が「recommender_system」であるオンラインデータベースが既にオンラインストアに存在しており、オフラインストアの名前と一致していることを想定しています。データベースに「customer_features」というテーブルが存在しない場合、このコードはテーブルを作成します。また、特徴量は日次で計算され、パーティションカラム_dt
として保存されます。
注意
以下のコードでは、認証情報を格納するためにDatabricksのシークレットを設定済みであることを想定しています。
import datetime
from databricks.feature_store.online_store_spec import AmazonRdsMySqlSpec
def getSecret(key, scope="feature-store"):
return dbutils.secrets.get(scope, key)
hostname = getSecret("hostname")
port = int(getSecret("port"))
user = getSecret(key="user")
password = getSecret(key="password")
online_store = AmazonRdsMySqlSpec(hostname, port, user, password)
fs.publish_table(
name='recommender_system.customer_features',
online_store=online_store,
filter_condition=f"_dt = '{str(datetime.date.today())}'",
mode='merge'
)
オンラインストアにストリーミングの特徴量を公開する
オンラインストアに継続的に特徴量をストリーミングするには、streaming=True
を設定します。
fs.publish_table(
name='recommender_system.customer_features',
online_store=online_store,
streaming=True
)
オンラインストアに選択した特徴量を公開する
特定の特徴量のみをオンラインストアに公開するには、公開する特徴量の名前を指定するために引数features
を使用します。主キーとタイムスタンプキーは常に公開されます。引数features
を指定しない場合、あるいは値がNoneの場合、オフラインの特徴量テーブルの全ての特徴量が公開されます。
fs.publish_table(
name='recommender_system.customer_features',
online_store=online_store,
features=["total_purchases_30d"]
)
特定のデータベースに特徴量テーブルを公開する
オンラインストアの指定においては、データベース名(db
)、テーブル名(table
)を指定します。これらのパラメーターを指定しない場合、オフラインの特徴量テーブルの名称とデータベース名称が使用されます。
db = new_db
table = new_table
online_store = AmazonRdsMySqlSpec(hostname, port, user, password, db, table)
指定した名称のデータベースはオンラインストアで作成済みである必要があります。
既存のオンライン特徴量テーブル、特定の行を上書きする
publish_table
のコールでmode='overwrite'
を使用します。オフラインテーブルのデータを用いて、オンラインテーブルは完全に上書きされます。
online_store = AmazonRdsMySqlSpec(hostname, port, user, password)
fs.publish_table(
name='recommender_system.customer_features',
online_store=online_store,
mode='overwrite'
)
特定の行のみを上書きするには、引数filter_condition
を使用します。
fs.publish_table(
name='recommender_system.customer_features',
online_store=online_store,
filter_condition=f"_dt = '{str(datetime.date.today())}'",
mode='merge'
)
DatabricksがホストするMLflwoモデルがオンラインストアから特徴量を検索できるようにする
実験段階
このサービングされているMLflowモデルがオンラインストアから特徴量を検索する機能は実験的なものです。
要件
- モデルはDatabricksランタイム10.1 ML以降を用いて
FeatureStoreClient.log_model
でロギングされる必要があります。 - オンラインストアは読み取り専用認証情報を用いて公開される必要があります。
注意
モデルトレーニングの後を含め、モデルデプロイメントの前の任意のタイミングで特徴量テーブルを公開することができます。
自動特徴量検索
Databricksモデルサービングの自動特徴量検索は、以下のオンラインストアをサポートしています。
- Amazon Aurora (MySQL-compatible)
- Amazon RDS MySQL
自動特徴量検索は、以下のシンプルなデータタイプでのみサポートされています。
IntegerType
FloatType
BooleanType
StringType
DoubleType
LongType
TimestampType
DateType
ShortType
オンラインモデルスコアリングにおける特徴量の上書き
(FeatureStoreClient.log_model
でロギングされた)モデルが必要とする全ての特徴量は、モデルスコアリングのために自動でオンラインストアから検索されます。REST APIを用いてモデルのスコアリングを行う際に特徴量を上書きするには、APIのペイロードの一部に特徴量を含めるようにしてください。
注意
新たな特徴量の値は、背後にあるモデルが期待する特徴量のデータタイプに準拠する必要があります。
サポートされるデータタイプ
Feature Storeでは以下のPySpark data typesをサポートしています。
IntegerType
FloatType
BooleanType
StringType
DoubleType
LongType
TimestampType
DateType
-
ShortType
(Databricks Runtime 9.1 LTS ML以降) -
BinaryType
(Databricks Runtime 10.1 ML以降) -
DecimalType
(Databricks Runtime 10.1 ML以降) -
ArrayType
(Databricks Runtime 9.1 LTS ML以降) -
MapType
(Databricks Runtime 10.1 ML以降)
Feature StoreのUIでは特徴量のデータ型に関するメタデータが表示されます。
オンラインストアに公開する際、ArrayType
とMapType
はJSONフォーマットとして格納されます。