1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Databricks Feature Storeで特徴量テーブルを操作する

Last updated at Posted at 2021-12-08

Work with feature tables | Databricks on AWS [2022/2/3時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

特徴量テーブルの検索、特徴量リネージュ、鮮度のトラッキング、特徴量テーブルの削除に関しては、Use the Feature Store UIを参照ください。

注意
データベースと特徴量テーブル名にハイフン(-)を含めることはできません。

特徴量テーブルのためのデータベースを作成する

あらゆる特徴量テーブルを作成する前に、テーブルを格納するためのデータベースを作成する必要があります。

%sql CREATE DATABASE IF NOT EXISTS <database_name>

特徴量テーブルはDeltaテーブルとして格納されます。create_table(Databricks ランタイム10.2 ML以降)、create_feature_table(Databricksランタイム10.1 ML以前)を用いて特徴量テーブルを作成する際、データベース名を指定する必要があります。例えば、以下の引数はデータベースrecommender_systemにテーブルcustomer_featuresを作成します。

name='recommender_system.customer_features'

オンラインストアに特徴量テーブルを公開する際、デフォルトのテーブル名、データベース名はテーブルを作成した際に指定したものが使用されます。publish_tableメソッドで別の名前を指定することもできます。

Databricks Feature StoreのUIでは、他のメタデータと共にオンラインストアにおけるテーブル、データベース名を表示します。

Databricks Feature Storeで特徴量テーブルを作成する

特徴量テーブルを作成する基本的なステップは以下の通りとなります。

  1. 特徴量を計算するPython関数を記述します。それぞれの関数のアウトプットは、ユニークな主キーを持つApache Sparkデータフレームである必要があります。主キーは1つ以上のカラムで構成することができます。
  2. FeatureStoreClientのインスタンスを作成し、create_table(Databricks ランタイム10.2 ML以降)、create_feature_table(Databricksランタイム10.1 ML以前)を用いて特徴量テーブルを作成します。
  3. write_tableを用いて特徴量テーブルにデータを追加します。

Databricks ランタイム10.2 ML以降

python
from databricks.feature_store import feature_table

def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_table(
  name='recommender_system.customer_features',
  primary_keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_table` and specify the `df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_table(
#  ...
#  df=customer_features_df,
#  ...
# )

# To use a composite key, pass all keys in the create_table call

# customer_feature_table = fs.create_table(
#   ...
#   primary_keys=['customer_id', 'date'],
#   ...
# )

# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table

fs.write_table(
  name='recommender_system.customer_features',
  df = customer_features_df,
  mode = 'overwrite'
)

Databricksランタイム10.1 ML以前

Python
from databricks.feature_store import feature_table

def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_feature_table(
  name='recommender_system.customer_features',
  keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_feature_table` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_feature_table(
#  ...
#  features_df=customer_features_df,
#  ...
# )

# To use a composite key, pass all keys in the create_feature_table call

# customer_feature_table = fs.create_feature_table(
#   ...
#   keys=['customer_id', 'date'],
#   ...
# )

# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table

fs.write_table(
  name='recommender_system.customer_features',
  df = customer_features_df,
  mode = 'overwrite'
)from databricks.feature_store import feature_table

def compute_customer_features(data):
  ''' Feature computation code returns a DataFrame with 'customer_id' as primary key'''
  pass

# create feature table keyed by customer_id
# take schema from DataFrame output by compute_customer_features
from databricks.feature_store import FeatureStoreClient

customer_features_df = compute_customer_features(df)

fs = FeatureStoreClient()

customer_feature_table = fs.create_feature_table(
  name='recommender_system.customer_features',
  keys='customer_id',
  schema=customer_features_df.schema,
  description='Customer features'
)

# An alternative is to use `create_feature_table` and specify the `features_df` argument.
# This code automatically saves the features to the underlying Delta table.

# customer_feature_table = fs.create_feature_table(
#  ...
#  features_df=customer_features_df,
#  ...
# )

# To use a composite key, pass all keys in the create_feature_table call

# customer_feature_table = fs.create_feature_table(
#   ...
#   keys=['customer_id', 'date'],
#   ...
# )

# Use write_table to write data to the feature table
# Overwrite mode does a full refresh of the feature table

fs.write_table(
  name='recommender_system.customer_features',
  df = customer_features_df,
  mode = 'overwrite'
)

特徴量テーブルを更新する

新たな特徴量を追加するか、主キーに基づいて特定の行を更新することで特徴量テーブルを更新することができます。

以下の特徴量テーブルのメタデータを更新することはできません。

  • 主キー
  • パーティションキー
  • 既存特徴量の名称、型

既存の特徴量テーブルに新規特徴量を追加

2つの方法のいずれかで既存特徴量テーブルに新規特徴量を追加することができます。

  • 既存の特徴量計算関数を更新し、返却されるデータフレームを用いてwrite_table を実行します。これにより、特徴量テーブルのスキーマが更新され、主キーに基づいて新規特徴量がマージされます。
  • 新規の特徴量を計算するために、新たな特徴量計算関数を作成します。この新規の特徴量計算関数から返却されるデータフレームには、特徴量テーブルの主キーと(定義されている場合には)パーティションキーを含める必要があります。既存特徴量テーブルに新規特徴量を書き込むために、同じ主キーを持つデータフレームを用いてwrite_tableを実行します。

特徴量テーブルの特定行のみを更新

write_tablemode = "merge"を使用します。write_tableの呼び出しで送信されるデータフレームに存在しない主キーを持つ行は変更されません。

Python
fs.write_table(
  name='recommender.customer_features',
  df = customer_features_df,
  mode = 'merge'
)

特徴量テーブルを更新するジョブをスケジュールする

特徴量テーブルの特徴量が常に最新の値になるように、日次など一定周期で特徴量テーブルを更新するノートブックを実行するジョブを作成することをお勧めします。スケジュールされていないジョブを既に作成されているのであれば、特徴量が常に最新の状態になるようにスケジュールジョブに変換することができます。

以下の例に示すように、特徴量を更新するコードではmode='merge'を使用してください。

Python
fs = FeatureStoreClient()

customer_features_df = compute_customer_features(data)

fs.write_table(
  df=customer_features_df,
  name='recommender_system.customer_features',
  mode='merge'
)

日次特徴量の過去の値を格納する

複合主キーを用いて特徴量テーブルを定義します。主キーには日付を含めてください。例えば、特徴量テーブルstore_purchasesに対しては、複合主キー(date, user_id)を使用し、効率的に読み込みを行うためにパーティションキーdateを指定します。

特徴量テーブルを興味のある期間dateでフィルタリングするコードを作成します。特徴量テーブルを最新の状態に保つために、定期的に特徴量を書き込むスケジュールジョブをセットアップするか、特徴量テーブルに新規特徴量をストリームで書き込むようにセットアップします。

特徴量を更新するためのストリーミング特徴量計算パイプラインを作成する

ストリーミング特徴量計算パイプラインを作成するために、write_tableの引数としてストリーミングのDataFrameを引き渡します。このメソッドはStreamingQueryオブジェクトを返却します。

Python
def compute_additional_customer_features(data):
  ''' Returns Streaming DataFrame
  '''
  pass  # not shown

customer_transactions = spark.readStream.load("dbfs:/events/customer_transactions")
stream_df = compute_additional_customer_features(customer_transactions)

fs.write_table(
  df=stream_df,
  name='recommender_system.customer_features',
  mode='merge'
)

特徴量テーブルから読み込む

特徴量を読み込むにはread_tableを使用します。

特徴量テーブルのメタデータを読み込むAPIは、お使いのDatabricksランタイムバージョンに依存します。Databricksランタイム10.2 ML以降ではget_tableを使用してください。Databricks Runtime 10.1 ML以前ではget_feature_tableを使用してください。

特定のタイムスタンプのデータを読み込む

Databricksの特徴量テーブルはDeltaテーブルですので、任意のタイムスタンプの特徴量を読み込むことができます。

Python
import datetime
yesterday = datetime.date.today() - datetime.timedelta(days=1)

# read customer_features values from 1 day ago
customer_features_df = fs.read_table(
  name='recommender_system.customer_features',
  as_of_delta_timestamp=str(yesterday)
)

トレーニングデータセットを作成する

モデルトレーニングのために特徴量テーブルから特定の特徴量を選択するために、トレーニングデータセットを作成します。

トレーニングデータセットを作成するには以下の手順を踏みます。

  1. トレーニングデータセットで使用したい特徴量を指定するために、FeatureLookupを作成します。引数lookup_keyは、特徴量テーブルの主キーとジョインを行うために、training_dfデータにおける結合キーとなるカラム名を指定します。
  2. トレーニングデータセットを定義するために、create_training_setをコールします。

この例では、trainingSet.load_dfで返却されるデータフレームには、feature_lookupsで指定される特徴量カラムが含まれています。これにより、exclude_columnsを用いて除外されるカラムを除いて、FeatureStoreClient.create_training_setに渡されるデータフレームの全てのカラムが保持されます。

Python
from databricks.feature_store import FeatureLookup

# Model training flow uses these features from Feature Store
# 'total_purchases_30d' feature table 'recommender_system.customer_features' and
# 'category' from 'recommender_system.product_features'
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id'
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# Training DataFrame must have lookup keys 'customer_id' and 'product_id' and label 'rating'
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id', 'product_id']
)

training_df = training_set.load_df()

検索キーが主キーとマッチしない際のTrainingSetを作成する

トレーニングセットにおけるカラム名に対して、FeatureLookupで引数lookup_keyを使用します。FeatureStoreClient.create_training_setは、特徴量テーブルが作成された際に指定された主キーの順序を用いて、lookup_key で指定されたトレーニングセットのカラム間の順序つきジョインを実行します。

この例では、recommender_system.customer_featuresには以下の主キーがあります。

  1. customer_id
  2. dt

そして、recommender_system.product_features特徴量テーブルには主キーproduct_idがあります。

training_dfに以下のカラムがある場合、

  • cid
  • transaction_dt
  • product_id
  • rating

以下のコードはTrainingSetに対して、適切な特徴量ルックアップを作成します。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = ['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]

FeatureStoreClient.create_training_setを呼び出す際、以下のコードに示すように(transaction_dt,cid)に対応する(customer_id,dt)のキーを用いて、recommender_system.customer_featurestraining_dfを結合するレフトジョインを実行します。

Python
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

異なる特徴量テーブルで同じ名称を持つ二つの特徴量を含むTrainingSetを作成する

FeatureLookupで、オプションの引数output_nameを使用します。指定された名称はTrainingSet.load_dfから返却されるデータフレームにおける特徴量名として使用されます。例えば、以下のコードを用いることで、training_set.load_dfで返却されるデータフレームには、カラムcustomer_heightproduct_height が含まれます。

注意
データフレームにおけるlookup_keyカラムの型は、参照する特徴量テーブルの主キーの型と一致する必要があります。

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'height',
      lookup_key = 'customer_id',
      output_name = 'customer_height',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'height',
      lookup_key = 'product_id',
      output_name = 'product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

教師なし機械学習モデルに対するTrainingSetを作成する

教師なし学習モデルに対するトレーニングセットを作成する際に、label=Noneを設定します。例えば、以下のトレーニングセットは異なる顧客を彼らの興味に基づいてグループにクラスタリングするために使用することができます。

Databricks Runtime 9.0 ML以前

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'interests',
      lookup_key = 'customer_id',
    ),
  ]
fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = None,
    exclude_columns = ['customer_id']
  )
  training_df = training_set.load_df()

Databricks Runtime 9.1 ML LTS以降

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['interests'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = None,
    exclude_columns = ['customer_id']
  )

  training_df = training_set.load_df()

特徴量テーブルを用いてモデルをトレーニングし、バッチ推論を実行する

Feature Storeの特徴量を用いてモデルをトレーニングする際、モデルは特徴量への参照を保持します。推論にモデルを使用する際、Feature Storeから取得する特徴量を選択することができます。モデルで使用する特徴量の主キーを指定する必要があります。モデルはワークスペースにおけるFeature Storeから必要とする特徴量を収集します。そして、スコアリングの過程で必要となる特徴量を結合します。

推論時の特徴量検索をサポートするには以下の手順を踏みます。

  • FeatureStoreClient.log_modelを用いてモデルをロギングします。
  • モデルをトレーニングするためにTrainingSet.load_dfから返却されるデータwフレームを使用する必要があります。モデルのトレーニングの前にこのデータフレームを変更した場合、モデルを推論に使う際にはこの変更は適用されません。これによって、モデルのパフォーマンスが悪化します。
  • モデルのタイプには、MLflowで対応するpython_flavorを指定する必要があります。MLflowでは以下のように多くのPythonモデルトレーニングフレームワークをサポートしています。
    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras (python_flavor mlflow.kerasを使用)
    • カスタムMLflow pyfuncモデル

Databricks Runtime 9.0 ML以前

Python
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_name = 'category',
      lookup_key = 'product_id'
    )
  ]


fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model,
    "recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri,
    batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

Databricks Runtime 9.1 ML LTS以降

Python
# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d'],
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.product_features',
      feature_names = ['category'],
      lookup_key = 'product_id'
    )
  ]


fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df,
    feature_lookups = feature_lookups,
    label = 'rating',
    exclude_columns = ['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model,
    "recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri,
    batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

特徴量メタデータでパッケージされたモデルをスコアリングする際にカスタムの特徴量を使用する

デフォルトでは、特徴量メタデータでパッケージされたモデルは、推論時にFeature Storeから特徴量を検索します。スコアリング時にカスタムの特徴量を使用するには、FeatureStoreClient.score_batch()に渡すデータフレームにカスタム特徴量を含めます。

例えば、これら2つの特徴量でモデルをパッケージするとします。

Databricks Runtime 9.0 ML以前

Python
# This syntax is deprecated with Databricks Runtime for Machine Learning 9.1 ML and above.
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'account_creation_date',
      lookup_key = 'customer_id',
    ),
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'num_lifetime_purchases',
      lookup_key = 'customer_id'
    ),
  ]

Databricks Runtime 9.1 ML LTS以降

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['account_creation_date', 'num_lifetime_purchases'],
      lookup_key = 'customer_id',
    ),
  ]

推論時には、account_creation_dateというカラムを含むデータフレームを用いてFeatureStoreClient.score_batchをコールすることで、特徴量account_creation_dateを含めることができます。この場合、APIはモデルスコアリングの際、Feature Storeから特徴量num_lifetime_purchasesのみを検索し、カスタムのaccount_creation_dateカラムの値とともに使用します。

Python
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  'models:/ban_prediction_model/1',
  batch_df
)

Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルのトレーニング、スコアリングを行う

Feature Storeの特徴量とFeature Store外のデータを組み合わせてモデルをトレーニングすることができます。特徴量メタデータでモデルをパッケージすると、推論の際モデルはFeature Storeから特徴量を取得します。

モデルをトレーニングするために、FeatureStoreClient.create_training_setに渡すデータフレームのカラムとして、追加のデータを含めます。この例では、Feature Storeからのtotal_purchases_30dと、外部カラムbrowserを使用します。

Databricks Runtime 9.0 ML以前

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_name = 'total_purchases_30d',
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

Databricks Runtime 9.1 ML LTS以降

Python
feature_lookups = [
    FeatureLookup(
      table_name = 'recommender_system.customer_features',
      feature_names = ['total_purchases_30d'],
      lookup_key = 'customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df,
  feature_lookups = feature_lookups,
  label = 'rating',
  exclude_columns = ['customer_id']  # 'browser' is not excluded
)

推論の際には、FeatureStoreClient.score_batchで使用するデータフレームにbrowserカラムが含まれている必要があります。

Python
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri,
  batch_df
)

オンライン特徴量ストアに特徴量を公開する

リアルタイムのサービングのためにオンラインストアに特徴量を公開するためには、publish_tableを使用します。サポートされるオンラインストアに関してはオンラインストアを参照ください。

Databricksシークレットを用いてオンラインストアに認証情報を提供する

オンラインストアに特徴量テーブルを公開する際には認証情報を提供する必要があります。Databricksのシークレットにこれらの認証情報を格納し、公開時にwrite_secret_prefixを用いてこれらを参照することをお勧めします。

また、DatabricksがホストするMLflowモデルがオンラインストアに接続し、特徴量を検索できるようにするために認証情報を提供する必要があります。これらの認証情報はDatabricksシークレットに格納し、公開時にread_secret_prefixを引き渡す必要があります。

  1. オンラインストアの認証情報を含む2つのシークレットスコープを作成します:1つは読み取り専用アクセスのため(ここでは<read_only_scope>とします)、もう1つは読み書きアクセスのためのもの(ここでは<read_write_scope>とします)です。あるいは、既存のシークレットスコープを使用することもできます。
  2. ターゲットオンラインストアに対するユニークな名称を選択します。ここでは<prefix>とします。以下のシークレットを作成します。
    • ターゲットのオンライストアに対して読み取りアクセスを行うユーザー: databricks secrets put --scope <read_only_scope> --key <prefix>-user
    • ターゲットのオンライストアに対して読み取りアクセスを行うユーザーのパスワード: databricks secrets put --scope <read_only_scope> --key <prefix>-password
    • ターゲットのオンラインストアに対して読み書きアクセスを行うユーザー: databricks secrets put --scope <read_write_scope> --key <prefix>-user
    • ターゲットのオンラインストアに対して読み書きアクセスを行うユーザーのパスワード: databricks secrets put --scope <read_write_scope> --key <prefix>-password

注意
ワークスペースごとにシークレットスコープの最大数の制限があることに注意してください。制限値を超過しないようにするために、すべてのオンラインストアにアクセスするための一つのシークレットを定義して共有することができます。

オンラインストアにバッチ計算の特徴量を公開する

更新された特徴量を定期的に公開するために、Databricksジョブを作成、スケジュールすることができます。このジョブには、更新特徴量を計算するためのコードを含めることができますし、特徴量の更新を計算し公開する別のジョブを作成、実行することもできます。

以下のコードでは、名前が「recommender_system」であるオンラインデータベースが既にオンラインストアに存在しており、オフラインストアの名前と一致していることを想定しています。データベースに「customer_features」というテーブルが存在しない場合、このコードはテーブルを作成します。また、特徴量は日次で計算され、パーティションカラム_dtとして保存されます。

注意
以下のコードでは、認証情報を格納するためにDatabricksのシークレットを設定済みであることを想定しています。

Python
import datetime

from databricks.feature_store.online_store_spec import AmazonRdsMySqlSpec

def getSecret(key, scope="feature-store"):
   return dbutils.secrets.get(scope, key)

hostname = getSecret("hostname")
port = int(getSecret("port"))
user = getSecret(key="user")
password = getSecret(key="password")

online_store = AmazonRdsMySqlSpec(hostname, port, user, password)

fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  filter_condition=f"_dt = '{str(datetime.date.today())}'",
  mode='merge'
)

オンラインストアにストリーミングの特徴量を公開する

オンラインストアに継続的に特徴量をストリーミングするには、streaming=Trueを設定します。

Python
fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  streaming=True
)

オンラインストアに選択した特徴量を公開する

特定の特徴量のみをオンラインストアに公開するには、公開する特徴量の名前を指定するために引数featuresを使用します。主キーとタイムスタンプキーは常に公開されます。引数featuresを指定しない場合、あるいは値がNoneの場合、オフラインの特徴量テーブルの全ての特徴量が公開されます。

Python
fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  features=["total_purchases_30d"]
)

特定のデータベースに特徴量テーブルを公開する

オンラインストアの指定においては、データベース名(db)、テーブル名(table)を指定します。これらのパラメーターを指定しない場合、オフラインの特徴量テーブルの名称とデータベース名称が使用されます。

Python
db = new_db
table = new_table

online_store = AmazonRdsMySqlSpec(hostname, port, user, password, db, table)

指定した名称のデータベースはオンラインストアで作成済みである必要があります。

既存のオンライン特徴量テーブル、特定の行を上書きする

publish_tableのコールでmode='overwrite'を使用します。オフラインテーブルのデータを用いて、オンラインテーブルは完全に上書きされます。

Python
online_store = AmazonRdsMySqlSpec(hostname, port, user, password)

fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  mode='overwrite'
)

特定の行のみを上書きするには、引数filter_conditionを使用します。

Python
fs.publish_table(
  name='recommender_system.customer_features',
  online_store=online_store,
  filter_condition=f"_dt = '{str(datetime.date.today())}'",
  mode='merge'
)

DatabricksがホストするMLflwoモデルがオンラインストアから特徴量を検索できるようにする

実験段階
このサービングされているMLflowモデルがオンラインストアから特徴量を検索する機能は実験的なものです。

要件

  • モデルはDatabricksランタイム10.1 ML以降を用いてFeatureStoreClient.log_modelでロギングされる必要があります。
  • オンラインストアは読み取り専用認証情報を用いて公開される必要があります。

注意
モデルトレーニングの後を含め、モデルデプロイメントの前の任意のタイミングで特徴量テーブルを公開することができます。

自動特徴量検索

Databricksモデルサービングの自動特徴量検索は、以下のオンラインストアをサポートしています。

  • Amazon Aurora (MySQL-compatible)
  • Amazon RDS MySQL

自動特徴量検索は、以下のシンプルなデータタイプでのみサポートされています。

  • IntegerType
  • FloatType
  • BooleanType
  • StringType
  • DoubleType
  • LongType
  • TimestampType
  • DateType
  • ShortType

オンラインモデルスコアリングにおける特徴量の上書き

(FeatureStoreClient.log_modelでロギングされた)モデルが必要とする全ての特徴量は、モデルスコアリングのために自動でオンラインストアから検索されます。REST APIを用いてモデルのスコアリングを行う際に特徴量を上書きするには、APIのペイロードの一部に特徴量を含めるようにしてください。

注意
新たな特徴量の値は、背後にあるモデルが期待する特徴量のデータタイプに準拠する必要があります。

サポートされるデータタイプ

Feature Storeでは以下のPySpark data typesをサポートしています。

  • IntegerType
  • FloatType
  • BooleanType
  • StringType
  • DoubleType
  • LongType
  • TimestampType
  • DateType
  • ShortType (Databricks Runtime 9.1 LTS ML以降)
  • BinaryType (Databricks Runtime 10.1 ML以降)
  • DecimalType (Databricks Runtime 10.1 ML以降)
  • ArrayType (Databricks Runtime 9.1 LTS ML以降)
  • MapType (Databricks Runtime 10.1 ML以降)

Feature StoreのUIでは特徴量のデータ型に関するメタデータが表示されます。

オンラインストアに公開する際、ArrayTypeMapTypeはJSONフォーマットとして格納されます。

Databricks 無料トライアル

Databricks 無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?