こちらの続編です。
こちらのタクシーサンプルノートブックをウォークスルーします。
翻訳し、カタログとスキーマ名を設定できるようにしたノートブックがこちらです。
このノートブックは、Unity Catalogで特徴量エンジニアリングを使用してNYCイエローキャブの運賃を予測するモデルを作成する方法を示しています。以下のステップが含まれます:
- Unity Catalogで直接時系列特徴量を計算して書き込む。
- これらの特徴量を使用して運賃を予測するモデルをトレーニングする。
- 既存の特徴量を使用して新しいデータバッチでそのモデルを評価する。
要件
- Databricks Runtime 13.3 LTS for Machine Learning以上
- Databricks Runtime for Machine Learningにアクセスできない場合は、このノートブックをDatabricks Runtime 13.3 LTS以上で実行できます。その場合、このノートブックの最初に
%pip install databricks-feature-engineering
を実行してください。
- Databricks Runtime for Machine Learningにアクセスできない場合は、このノートブックをDatabricks Runtime 13.3 LTS以上で実行できます。その場合、このノートブックの最初に

特徴両テーブルやモデルを格納するカタログ、スキーマを定義します。
CATALOG_NAME = "users"
SCHEMA_NAME = "takaaki_yayoi"
特徴量の計算
特徴量の計算に使用する生データの読み込み
nyc-taxi-tiny
データセットを読み込みます。これは、以下の変換を適用して、完全な NYC Taxi Data から /databricks-datasets/nyctaxi-with-zipcodes/subsampled
に生成されました。
- 緯度と経度の座標を ZIP コードに変換する UDF を適用し、DataFrame に ZIP コード列を追加します。
- Spark
DataFrame
API の.sample()
メソッドを使用して、日付範囲クエリに基づいてデータセットを小さなデータセットにサブサンプリングします。 - 特定の列の名前を変更し、不要な列を削除します。
raw_data = spark.read.format("delta").load("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")
display(raw_data)
tpep_pickup_datetime | tpep_dropoff_datetime | trip_distance | fare_amount | pickup_zip | dropoff_zip |
---|---|---|---|---|---|
2016-02-16T22:40:45.000+00:00 | 2016-02-16T22:59:25.000+00:00 | 5.35 | 18.5 | 10003 | 11238 |
2016-02-05T16:06:44.000+00:00 | 2016-02-05T16:26:03.000+00:00 | 6.5 | 21.5 | 10282 | 10001 |
2016-02-08T07:39:25.000+00:00 | 2016-02-08T07:44:14.000+00:00 | 0.9 | 5.5 | 10119 | 10003 |
2016-02-29T22:25:33.000+00:00 | 2016-02-29T22:38:09.000+00:00 | 3.5 | 13.5 | 10001 | 11222 |
タクシー料金の取引データから、乗車および降車の ZIP コードに基づいて2つのグループの特徴量を計算します。
乗車特徴量
- 旅行回数(時間ウィンドウ = 1時間、スライディングウィンドウ = 15分)
- 平均運賃額(時間ウィンドウ = 1時間、スライディングウィンドウ = 15分)
降車特徴量
- 旅行回数(時間ウィンドウ = 30分)
- 旅行が週末に終了するかどうか(Pythonコードを使用したカスタム特徴量)

ヘルパー関数
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, StringType
from pytz import timezone
@udf(returnType=IntegerType())
def is_weekend(dt):
tz = "America/New_York"
return int(dt.astimezone(timezone(tz)).weekday() >= 5) # 5 = 土曜日, 6 = 日曜日
def filter_df_by_ts(df, ts_column, start_date, end_date):
if ts_column and start_date:
df = df.filter(col(ts_column) >= start_date)
if ts_column and end_date:
df = df.filter(col(ts_column) < end_date)
return df
データサイエンティストのカスタムコードによる特徴量の計算
def pickup_features_fn(df, ts_column, start_date, end_date):
"""
pickup_features フィーチャーグループを計算します。
フィーチャーを特定の時間範囲に制限するには、ts_column、start_date、および/または end_date を kwargs として渡します。
"""
df = filter_df_by_ts(df, ts_column, start_date, end_date)
pickupzip_features = (
df.groupBy(
"pickup_zip", window("tpep_pickup_datetime", "1 hour", "15 minutes")
) # 1時間のウィンドウ、15分ごとにスライド
.agg(
mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"),
count("*").alias("count_trips_window_1h_pickup_zip"),
)
.select(
col("pickup_zip").alias("zip"),
unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
col("mean_fare_window_1h_pickup_zip").cast(FloatType()),
col("count_trips_window_1h_pickup_zip").cast(IntegerType()),
)
)
return pickupzip_features
def dropoff_features_fn(df, ts_column, start_date, end_date):
"""
dropoff_features フィーチャーグループを計算します。
フィーチャーを特定の時間範囲に制限するには、ts_column、start_date、および/または end_date を kwargs として渡します。
"""
df = filter_df_by_ts(df, ts_column, start_date, end_date)
dropoffzip_features = (
df.groupBy("dropoff_zip", window("tpep_dropoff_datetime", "30 minute"))
.agg(count("*").alias("count_trips_window_30m_dropoff_zip"))
.select(
col("dropoff_zip").alias("zip"),
unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
col("count_trips_window_30m_dropoff_zip").cast(IntegerType()),
is_weekend(col("window.end")).alias("dropoff_is_weekend"),
)
)
return dropoffzip_features
from datetime import datetime
pickup_features = pickup_features_fn(
df=raw_data,
ts_column="tpep_pickup_datetime",
start_date=datetime(2016, 1, 1),
end_date=datetime(2016, 1, 31),
)
dropoff_features = dropoff_features_fn(
df=raw_data,
ts_column="tpep_dropoff_datetime",
start_date=datetime(2016, 1, 1),
end_date=datetime(2016, 1, 31),
)
display(pickup_features)
zip | ts | mean_fare_window_1h_pickup_zip | count_trips_window_1h_pickup_zip |
---|---|---|---|
10119 | 2016-01-28T11:00:00.000+00:00 | 10.5 | 2 |
10199 | 2016-01-18T17:30:00.000+00:00 | 6 | 1 |
10022 | 2016-01-28T07:00:00.000+00:00 | 10.5 | 1 |
10282 | 2016-01-06T15:30:00.000+00:00 | 6.5 | 1 |
10011 | 2016-01-14T14:00:00.000+00:00 | 5 | 1 |
10110 | 2016-01-17T22:45:00.000+00:00 | 8 | 1 |
10007 | 2016-01-29T02:45:00.000+00:00 | 26.5 | 1 |
10011 | 2016-01-29T02:00:00.000+00:00 | 3.5 | 1 |
display(dropoff_features)
zip | ts | count_trips_window_30m_dropoff_zip | dropoff_is_weekend |
---|---|---|---|
10020 | 2016-01-09T14:00:00.000+00:00 | 1 | 1 |
10011 | 2016-01-22T21:00:00.000+00:00 | 2 | 0 |
10018 | 2016-01-09T15:00:00.000+00:00 | 1 | 1 |
10020 | 2016-01-07T22:30:00.000+00:00 | 1 | 0 |
10032 | 2016-01-04T13:30:00.000+00:00 | 1 | 0 |
10019 | 2016-01-18T22:30:00.000+00:00 | 2 | 0 |
Unity Catalogで新しい時系列特徴量テーブルを作成する
まず、新しいカタログを作成するか、既存のカタログを再利用して、特徴量テーブルを格納するスキーマを作成します。
- 新しいカタログを作成するには、メタストアに対する
CREATE CATALOG
権限が必要です。 - 既存のカタログを使用するには、カタログに対する
USE CATALOG
権限が必要です。 - カタログに新しいスキーマを作成するには、カタログに対する
CREATE SCHEMA
権限が必要です。
# カタログとスキーマを指定。存在しない場合には作成してください。
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")
次に、主キー制約を使用してUnity Catalogに時系列特徴量テーブルを作成します。
CREATE TABLE
SQL構文を使用してUnity Catalogに直接テーブルを作成できます。主キー制約を使用して主キー列を指定します。時系列テーブルの場合、TIMESERIES
を使用して時系列列を注釈します(AWS|Azure|GCP)。
時系列の列はTimestampType
またはDateType
でなければなりません。
%sql
CREATE TABLE IF NOT EXISTS trip_pickup_time_series_features(
zip INT NOT NULL,
ts TIMESTAMP NOT NULL,
mean_fare_window_1h_pickup_zip FLOAT,
count_trips_window_1h_pickup_zip INT,
CONSTRAINT trip_pickup_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "タクシー料金。乗車時系列の特徴量。";
%sql
CREATE TABLE IF NOT EXISTS trip_dropoff_time_series_features(
zip INT NOT NULL,
ts TIMESTAMP NOT NULL,
count_trips_window_30m_dropoff_zip INT,
dropoff_is_weekend INT,
CONSTRAINT trip_dropoff_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "タクシー料金。降車時系列の特徴量。";
初期特徴量をUnity Catalogの特徴量テーブルに書き込む
Feature Engineeringクライアントのインスタンスを作成します。
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
write_table
APIを使用して、特徴量をUnity Catalogの特徴量テーブルに書き込みます。
時系列特徴量テーブルに書き込むには、DataFrameに時系列列として指定する列が含まれている必要があります。
spark.conf.set("spark.sql.shuffle.partitions", "5")
fe.write_table(
name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features",
df=pickup_features
)
fe.write_table(
name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features",
df=dropoff_features
)
これで、特徴両テーブルが作成され、特徴量が登録されました。
特徴量の更新
write_table
関数を使用して特徴量テーブルの値を更新します。
display(raw_data)
tpep_pickup_datetime | tpep_dropoff_datetime | trip_distance | fare_amount | pickup_zip | dropoff_zip |
---|---|---|---|---|---|
2016-02-16T22:40:45.000+00:00 | 2016-02-16T22:59:25.000+00:00 | 5.35 | 18.5 | 10003 | 11238 |
2016-02-05T16:06:44.000+00:00 | 2016-02-05T16:26:03.000+00:00 | 6.5 | 21.5 | 10282 | 10001 |
2016-02-08T07:39:25.000+00:00 | 2016-02-08T07:44:14.000+00:00 | 0.9 | 5.5 | 10119 | 10003 |
2016-02-29T22:25:33.000+00:00 | 2016-02-29T22:38:09.000+00:00 | 3.5 | 13.5 | 10001 | 11222 |
2016-02-03T17:21:02.000+00:00 | 2016-02-03T17:23:24.000+00:00 | 0.3 | 3.5 | 10028 | 10028 |
# 新しいバッチのpickup_featuresフィーチャーグループを計算する。
new_pickup_features = pickup_features_fn(
df=raw_data,
ts_column="tpep_pickup_datetime",
start_date=datetime(2016, 2, 1),
end_date=datetime(2016, 2, 29),
)
# 新しいpickup features DataFrameをフィーチャーテーブルに書き込む
fe.write_table(
name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features",
df=new_pickup_features,
mode="merge",
)
# 新しいバッチのdropoff_featuresフィーチャーグループを計算する。
new_dropoff_features = dropoff_features_fn(
df=raw_data,
ts_column="tpep_dropoff_datetime",
start_date=datetime(2016, 2, 1),
end_date=datetime(2016, 2, 29),
)
# 新しいdropoff features DataFrameをフィーチャーテーブルに書き込む
fe.write_table(
name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features",
df=new_dropoff_features,
mode="merge",
)
書き込み時には、merge
モードがサポートされています。
fe.write_table(
name="ml.taxi_example.trip_pickup_time_series_features",
df=new_pickup_features,
mode="merge",
)
データは、df.isStreaming
が True
に設定されているデータフレームを渡すことで、特徴量テーブルにストリーミングすることもできます:
fe.write_table(
name="ml.taxi_example.trip_pickup_time_series_features",
df=streaming_pickup_features,
mode="merge",
)
Databricks Jobsを使用してノートブックを定期的にスケジュールし、特徴量を更新することができます (AWS|Azure|GCP)。
アナリストは、例えば次のようにSQLを使用してUnity Catalogの特徴量テーブルと対話できます:
%sql
SELECT
SUM(count_trips_window_30m_dropoff_zip) AS num_rides,
dropoff_is_weekend
FROM
trip_dropoff_time_series_features
WHERE
dropoff_is_weekend IS NOT NULL
GROUP BY
dropoff_is_weekend;
num_rides | dropoff_is_weekend |
---|---|
15564 | 0 |
5670 | 1 |
特徴量の検索と発見
Unity CatalogのFeatures UIで特徴量テーブルを発見できます。"trip_pickup_time_series_features"または"trip_dropoff_time_series_features"で検索し、テーブル名をクリックして、カタログエクスプローラーUIでテーブルスキーマ、メタデータ、系統などの詳細を確認できます。特徴量テーブルの説明も編集できます。特徴量の発見と系統追跡の詳細については、(AWS|Azure|GCP)を参照してください。
カタログエクスプローラーUIで特徴量テーブルの権限も設定できます。詳細については、(AWS|Azure|GCP)を参照してください。
モデルのトレーニング
このセクションでは、ポイントインタイムルックアップを使用して時系列のpickupおよびdropoff特徴量テーブルを使用してトレーニングセットを作成し、トレーニングセットを使用してモデルをトレーニングする方法を説明します。タクシー料金を予測するためにLightGBMモデルをトレーニングします。
ヘルパー関数
import mlflow.pyfunc
def get_latest_model_version(model_name):
latest_version = 1
mlflow_client = MlflowClient()
for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
version_int = int(mv.version)
if version_int > latest_version:
latest_version = version_int
return latest_version
トレーニングデータセットの作成方法の理解
モデルをトレーニングするためには、トレーニングデータセットを作成する必要があります。トレーニングデータセットは以下で構成されます:
- 生の入力データ
- Unity Catalogの特徴量テーブルからの特徴量
生の入力データが必要な理由は以下の通りです:
- 主キーおよび時系列カラムは、ポイントインタイムの正確性を持つ特徴量と結合するために使用されます (AWS|Azure|GCP)。
- 特徴量テーブルに含まれていない
trip_distance
のような生の特徴量。 - モデルのトレーニングに必要な
fare
のような予測ターゲット。
以下のビジュアル概要は、生の入力データがUnity Catalogの特徴量と組み合わされてトレーニングデータセットを生成する様子を示しています:

これらの概念は、トレーニングデータセット作成のドキュメントでさらに説明されています (AWS|Azure|GCP)。
次のセルでは、必要な特徴量ごとにFeatureLookup
を作成して、Unity Catalogからモデルトレーニング用の特徴量をロードします。
時系列特徴量テーブルから特徴量値をポイントインタイムでルックアップするには、特徴量のFeatureLookup
でtimestamp_lookup_key
を指定する必要があります。これは、時系列特徴量をルックアップするためのタイムスタンプを含むDataFrameカラムの名前を示します。DataFrameの各行に対して、取得される特徴量値は、DataFrameのtimestamp_lookup_key
カラムに指定されたタイムスタンプより前の最新の特徴量値であり、主キーがDataFrameのlookup_key
カラムの値と一致するものです。一致する特徴量値が存在しない場合はnull
が返されます。
from databricks.feature_engineering import FeatureLookup
import mlflow
pickup_features_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features"
dropoff_features_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features"
pickup_feature_lookups = [
FeatureLookup(
table_name=pickup_features_table,
feature_names=[
"mean_fare_window_1h_pickup_zip",
"count_trips_window_1h_pickup_zip",
],
lookup_key=["pickup_zip"],
timestamp_lookup_key="tpep_pickup_datetime",
),
]
dropoff_feature_lookups = [
FeatureLookup(
table_name=dropoff_features_table,
feature_names=["count_trips_window_30m_dropoff_zip", "dropoff_is_weekend"],
lookup_key=["dropoff_zip"],
timestamp_lookup_key="tpep_dropoff_datetime",
),
]
MLflowクライアントを設定してUnity Catalogのモデルにアクセスする
import mlflow
mlflow.set_registry_uri("databricks-uc")
fe.create_training_set(..)
が呼び出されると、以下のステップが実行されます:
-
TrainingSet
オブジェクトが作成され、モデルのトレーニングに使用する特定の特徴量が特徴量テーブルから選択されます。各特徴量は、以前に作成されたFeatureLookup
によって指定されます。 -
特徴量は各
FeatureLookup
のlookup_key
に従って生の入力データと結合されます。 -
データリーク問題を回避するためにポイントインタイムルックアップが適用されます。
timestamp_lookup_key
に基づいて、最新の特徴量値のみが結合されます。
その後、TrainingSet
はトレーニング用のDataFrameに変換されます。このDataFrameには、taxi_dataのカラムと、FeatureLookups
で指定された特徴量が含まれます。
# 既存のランを終了します(このノートブックが2回目以降に実行されている場合)
mlflow.end_run()
# モデルをログに記録するために必要なmlflowランを開始します
mlflow.start_run()
# タイムスタンプ列は追加の特徴量エンジニアリングが行われない限り、モデルがデータに過剰適合する可能性が高いため、
# それらを除外してトレーニングを行わないようにします。
exclude_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
# 生データと両方の特徴量テーブルからの対応する特徴量をマージしたトレーニングセットを作成します
training_set = fe.create_training_set(
df=raw_data,
feature_lookups=pickup_feature_lookups + dropoff_feature_lookups,
label="fare_amount",
exclude_columns=exclude_columns,
)
# sklearnでモデルをトレーニングするために渡すことができるデータフレームにTrainingSetをロードします
training_df = training_set.load_df()
# トレーニングデータフレームを表示します。これには、生の入力データと特徴量テーブルからの特徴量(例:`dropoff_is_weekend`)の両方が含まれています。
display(training_df)
trip_distance | pickup_zip | dropoff_zip | mean_fare_window_1h_pickup_zip | count_trips_window_1h_pickup_zip | count_trips_window_30m_dropoff_zip | dropoff_is_weekend | fare_amount |
---|---|---|---|---|---|---|---|
0.89 | 7002 | 7002 | null | null | null | null | 8.5 |
2.3 | 7002 | 7002 | 8.5 | 1 | 1 | 0 | 12.5 |
20.85 | 10013 | 7008 | 6.5 | 1 | null | null | 275 |
7.88 | 10028 | 7024 | 10 | 1 | null | null | 38 |
9.1 | 10103 | 7024 | 12.5 | 1 | 1 | 0 | 58 |
TrainingSet.to_df
で返されたデータに対してLightGBMモデルをトレーニングし、FeatureEngineeringClient.log_model
でモデルをログします。モデルは特徴量メタデータと共にパッケージ化されます。
from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
import lightgbm as lgb
import mlflow.lightgbm
from mlflow.models.signature import infer_signature
# 特徴量とラベルを含む列を取得します
features_and_label = training_df.columns
# トレーニングのためにデータをPandas配列に収集します
data = training_df.toPandas()[features_and_label]
# データをトレーニングセットとテストセットに分割します
train, test = train_test_split(data, random_state=123)
X_train = train.drop(["fare_amount"], axis=1)
X_test = test.drop(["fare_amount"], axis=1)
y_train = train.fare_amount
y_test = test.fare_amount
# MLflowの自動ロギングを有効にします
mlflow.lightgbm.autolog()
train_lgb_dataset = lgb.Dataset(X_train, label=y_train.values)
test_lgb_dataset = lgb.Dataset(X_test, label=y_test.values)
# LightGBMのパラメータを設定します
param = {"num_leaves": 32, "objective": "regression", "metric": "rmse"}
num_rounds = 100
# LightGBMモデルをトレーニングします
model = lgb.train(param, train_lgb_dataset, num_rounds)
# トレーニング済みモデルをMLflowでログし、特徴量ルックアップ情報と共にパッケージ化します。
fe.log_model(
model=model,
artifact_path="model_packaged",
flavor=mlflow.lightgbm,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged",
)
トレーニングデータセット、ノートブックなどの情報とともにモデルが記録されます。
カタログエクスプローラーでモデルのリネージを確認
カタログエクスプローラーのテーブル詳細ページにアクセスします。「依存関係」タブに移動し、「リネージグラフを見る」をクリックします。特徴量テーブルに下流のモデルリネージが表示されます。
カスタムPyFuncモデルの構築とログ
モデルに前処理や後処理のコードを追加し、バッチ推論で処理された予測を生成するには、これらのメソッドをカプセル化するカスタムPyFunc MLflowモデルを構築できます。次のセルは、モデルからの数値予測に基づいて文字列出力を返す例を示しています。
class fareClassifier(mlflow.pyfunc.PythonModel):
def __init__(self, trained_model):
self.model = trained_model
def preprocess_result(self, model_input):
return model_input
def postprocess_result(self, results):
"""予測結果を後処理します。
運賃の範囲を作成し、予測された範囲を返します。"""
return [
"$0 - $9.99" if result < 10 else "$10 - $19.99" if result < 20 else " > $20"
for result in results
]
def predict(self, context, model_input):
processed_df = self.preprocess_result(model_input.copy())
results = self.model.predict(processed_df)
return self.postprocess_result(results)
pyfunc_model = fareClassifier(model)
# 現在のMLflowランを終了し、新しいpyfuncモデルをログするために新しいランを開始します
mlflow.end_run()
with mlflow.start_run() as run:
fe.log_model(
model=pyfunc_model,
artifact_path="pyfunc_packaged_model",
flavor=mlflow.pyfunc,
training_set=training_set,
registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged",
)
スコアリング: バッチ推論
別のデータサイエンティストがこのモデルを別のデータバッチに適用したいとします。
推論に使用するデータを表示し、予測ターゲットである fare_amount
列を強調表示するように並べ替えます。
cols = [
"fare_amount",
"trip_distance",
"pickup_zip",
"dropoff_zip",
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
]
new_taxi_data = raw_data.select(cols)
display(new_taxi_data)
fare_amount | trip_distance | pickup_zip | dropoff_zip | tpep_pickup_datetime | tpep_dropoff_datetime |
---|---|---|---|---|---|
18.5 | 5.35 | 10003 | 11238 | 2016-02-16T22:40:45.000+00:00 | 2016-02-16T22:59:25.000+00:00 |
21.5 | 6.5 | 10282 | 10001 | 2016-02-05T16:06:44.000+00:00 | 2016-02-05T16:26:03.000+00:00 |
5.5 | 0.9 | 10119 | 10003 | 2016-02-08T07:39:25.000+00:00 | 2016-02-08T07:44:14.000+00:00 |
13.5 | 3.5 | 10001 | 11222 | 2016-02-29T22:25:33.000+00:00 | 2016-02-29T22:38:09.000+00:00 |
3.5 | 0.3 | 10028 | 10028 | 2016-02-03T17:21:02.000+00:00 | 2016-02-03T17:23:24.000+00:00 |
score_batch
API を使用して、Unity Catalog の Feature Engineering から必要な特徴量を取得し、データのバッチでモデルを評価します。
時系列特徴量テーブルからの特徴量でトレーニングされたモデルをスコアリングする場合、トレーニング中にモデルにパッケージ化されたメタデータを使用して、適切な特徴量がポイントインタイムルックアップで取得されます。FeatureEngineeringClient.score_batch
に提供する DataFrame には、FeatureEngineeringClient.create_training_set
に提供された FeatureLookup の timestamp_lookup_key
と同じ名前とデータ型のタイムスタンプ列が含まれている必要があります。
# モデルURIを取得
latest_model_version = get_latest_model_version(
f"{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged"
)
model_uri = f"models:/{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged/{latest_model_version}"
# score_batchを呼び出してモデルから予測を取得
with_predictions = fe.score_batch(model_uri=model_uri, df=new_taxi_data)
記録された PyFunc モデルを使用してスコアリングするには:
latest_pyfunc_version = get_latest_model_version(
f"{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged"
)
pyfunc_model_uri = (
f"models:/{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged/{latest_pyfunc_version}"
)
pyfunc_predictions = fe.score_batch(
model_uri=pyfunc_model_uri, df=new_taxi_data, result_type="string"
)

タクシー料金予測を表示
このコードは列を並べ替えて、タクシー料金の予測を最初の列に表示します。predicted_fare_amount
が実際の fare_amount
と大まかに一致することに注意してください。ただし、モデルの精度を向上させるには、より多くのデータと特徴エンジニアリングが必要です。
import pyspark.sql.functions as func
cols = [
"prediction",
"fare_amount",
"trip_distance",
"pickup_zip",
"dropoff_zip",
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"mean_fare_window_1h_pickup_zip",
"count_trips_window_1h_pickup_zip",
"count_trips_window_30m_dropoff_zip",
"dropoff_is_weekend",
]
with_predictions_reordered = (
with_predictions.select(
cols,
)
.withColumnRenamed(
"prediction",
"predicted_fare_amount",
)
.withColumn(
"predicted_fare_amount",
func.round("predicted_fare_amount", 2),
)
)
display(with_predictions_reordered)
predicted_fare_amount | fare_amount | trip_distance | pickup_zip | dropoff_zip | tpep_pickup_datetime | tpep_dropoff_datetime | mean_fare_window_1h_pickup_zip | count_trips_window_1h_pickup_zip | count_trips_window_30m_dropoff_zip | dropoff_is_weekend |
---|---|---|---|---|---|---|---|---|---|---|
27.77 | 8.5 | 0.89 | 7002 | 7002 | 2016-01-22T13:42:38.000+00:00 | 2016-01-22T13:54:09.000+00:00 | null | null | null | null |
16.36 | 12.5 | 2.3 | 7002 | 7002 | 2016-02-26T11:11:35.000+00:00 | 2016-02-26T11:27:32.000+00:00 | 8.5 | 1 | 1 | 0 |
121.96 | 275 | 20.85 | 10013 | 7008 | 2016-02-12T20:55:19.000+00:00 | 2016-02-12T21:52:38.000+00:00 | 6.5 | 1 | null | null |
39.34 | 38 | 7.88 | 10028 | 7024 | 2016-02-09T00:08:12.000+00:00 | 2016-02-09T00:20:06.000+00:00 | 10 | 1 | null | null |
35.13 | 58 | 9.1 | 10103 | 7024 | 2016-02-22T18:13:00.000+00:00 | 2016-02-22T18:38:14.000+00:00 | 12.5 | 1 | 1 | 0 |
PyFunc 予測を表示
display(pyfunc_predictions.select("fare_amount", "prediction"))
fare_amount | prediction |
---|---|
8.5 | > $20 |
12.5 | $10 - $19.99 |
275 | > $20 |
38 | > $20 |
58 | > $20 |
次のステップ
- Features UI でこの例で作成された特徴量テーブルを探索します。
- 特徴テーブルをオンラインストアに公開します (AWS|Azure)。
- Unity Catalog でモデルを Model Serving にデプロイします (AWS|Azure)。
- このノートブックを自分のデータに適用し、独自の特徴量テーブルを作成します。