0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks Unity Catalogにおける特徴量エンジニアリングの高度なサンプル

Last updated at Posted at 2025-03-05

こちらの続編です。

こちらのタクシーサンプルノートブックをウォークスルーします。

翻訳し、カタログとスキーマ名を設定できるようにしたノートブックがこちらです。

このノートブックは、Unity Catalogで特徴量エンジニアリングを使用してNYCイエローキャブの運賃を予測するモデルを作成する方法を示しています。以下のステップが含まれます:

  • Unity Catalogで直接時系列特徴量を計算して書き込む。
  • これらの特徴量を使用して運賃を予測するモデルをトレーニングする。
  • 既存の特徴量を使用して新しいデータバッチでそのモデルを評価する。

要件

  • Databricks Runtime 13.3 LTS for Machine Learning以上
    • Databricks Runtime for Machine Learningにアクセスできない場合は、このノートブックをDatabricks Runtime 13.3 LTS以上で実行できます。その場合、このノートブックの最初に%pip install databricks-feature-engineeringを実行してください。

特徴両テーブルやモデルを格納するカタログ、スキーマを定義します。

CATALOG_NAME = "users"
SCHEMA_NAME = "takaaki_yayoi"

特徴量の計算

特徴量の計算に使用する生データの読み込み

nyc-taxi-tiny データセットを読み込みます。これは、以下の変換を適用して、完全な NYC Taxi Data から /databricks-datasets/nyctaxi-with-zipcodes/subsampled に生成されました。

  1. 緯度と経度の座標を ZIP コードに変換する UDF を適用し、DataFrame に ZIP コード列を追加します。
  2. Spark DataFrame API の .sample() メソッドを使用して、日付範囲クエリに基づいてデータセットを小さなデータセットにサブサンプリングします。
  3. 特定の列の名前を変更し、不要な列を削除します。
raw_data = spark.read.format("delta").load("/databricks-datasets/nyctaxi-with-zipcodes/subsampled")
display(raw_data)
tpep_pickup_datetime tpep_dropoff_datetime trip_distance fare_amount pickup_zip dropoff_zip
2016-02-16T22:40:45.000+00:00 2016-02-16T22:59:25.000+00:00 5.35 18.5 10003 11238
2016-02-05T16:06:44.000+00:00 2016-02-05T16:26:03.000+00:00 6.5 21.5 10282 10001
2016-02-08T07:39:25.000+00:00 2016-02-08T07:44:14.000+00:00 0.9 5.5 10119 10003
2016-02-29T22:25:33.000+00:00 2016-02-29T22:38:09.000+00:00 3.5 13.5 10001 11222

タクシー料金の取引データから、乗車および降車の ZIP コードに基づいて2つのグループの特徴量を計算します。

乗車特徴量

  1. 旅行回数(時間ウィンドウ = 1時間、スライディングウィンドウ = 15分)
  2. 平均運賃額(時間ウィンドウ = 1時間、スライディングウィンドウ = 15分)

降車特徴量

  1. 旅行回数(時間ウィンドウ = 30分)
  2. 旅行が週末に終了するかどうか(Pythonコードを使用したカスタム特徴量)

ヘルパー関数

from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, IntegerType, StringType
from pytz import timezone


@udf(returnType=IntegerType())
def is_weekend(dt):
    tz = "America/New_York"
    return int(dt.astimezone(timezone(tz)).weekday() >= 5)  # 5 = 土曜日, 6 = 日曜日


def filter_df_by_ts(df, ts_column, start_date, end_date):
    if ts_column and start_date:
        df = df.filter(col(ts_column) >= start_date)
    if ts_column and end_date:
        df = df.filter(col(ts_column) < end_date)
    return df

データサイエンティストのカスタムコードによる特徴量の計算

def pickup_features_fn(df, ts_column, start_date, end_date):
    """
    pickup_features フィーチャーグループを計算します。
    フィーチャーを特定の時間範囲に制限するには、ts_column、start_date、および/または end_date を kwargs として渡します。
    """
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    pickupzip_features = (
        df.groupBy(
            "pickup_zip", window("tpep_pickup_datetime", "1 hour", "15 minutes")
        )  # 1時間のウィンドウ、15分ごとにスライド
        .agg(
            mean("fare_amount").alias("mean_fare_window_1h_pickup_zip"),
            count("*").alias("count_trips_window_1h_pickup_zip"),
        )
        .select(
            col("pickup_zip").alias("zip"),
            unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
            col("mean_fare_window_1h_pickup_zip").cast(FloatType()),
            col("count_trips_window_1h_pickup_zip").cast(IntegerType()),
        )
    )
    return pickupzip_features


def dropoff_features_fn(df, ts_column, start_date, end_date):
    """
    dropoff_features フィーチャーグループを計算します。
    フィーチャーを特定の時間範囲に制限するには、ts_column、start_date、および/または end_date を kwargs として渡します。
    """
    df = filter_df_by_ts(df, ts_column, start_date, end_date)
    dropoffzip_features = (
        df.groupBy("dropoff_zip", window("tpep_dropoff_datetime", "30 minute"))
        .agg(count("*").alias("count_trips_window_30m_dropoff_zip"))
        .select(
            col("dropoff_zip").alias("zip"),
            unix_timestamp(col("window.end")).cast("timestamp").alias("ts"),
            col("count_trips_window_30m_dropoff_zip").cast(IntegerType()),
            is_weekend(col("window.end")).alias("dropoff_is_weekend"),
        )
    )
    return dropoffzip_features
from datetime import datetime

pickup_features = pickup_features_fn(
    df=raw_data,
    ts_column="tpep_pickup_datetime",
    start_date=datetime(2016, 1, 1),
    end_date=datetime(2016, 1, 31),
)
dropoff_features = dropoff_features_fn(
    df=raw_data,
    ts_column="tpep_dropoff_datetime",
    start_date=datetime(2016, 1, 1),
    end_date=datetime(2016, 1, 31),
)
display(pickup_features)
zip ts mean_fare_window_1h_pickup_zip count_trips_window_1h_pickup_zip
10119 2016-01-28T11:00:00.000+00:00 10.5 2
10199 2016-01-18T17:30:00.000+00:00 6 1
10022 2016-01-28T07:00:00.000+00:00 10.5 1
10282 2016-01-06T15:30:00.000+00:00 6.5 1
10011 2016-01-14T14:00:00.000+00:00 5 1
10110 2016-01-17T22:45:00.000+00:00 8 1
10007 2016-01-29T02:45:00.000+00:00 26.5 1
10011 2016-01-29T02:00:00.000+00:00 3.5 1
display(dropoff_features)
zip ts count_trips_window_30m_dropoff_zip dropoff_is_weekend
10020 2016-01-09T14:00:00.000+00:00 1 1
10011 2016-01-22T21:00:00.000+00:00 2 0
10018 2016-01-09T15:00:00.000+00:00 1 1
10020 2016-01-07T22:30:00.000+00:00 1 0
10032 2016-01-04T13:30:00.000+00:00 1 0
10019 2016-01-18T22:30:00.000+00:00 2 0

Unity Catalogで新しい時系列特徴量テーブルを作成する

まず、新しいカタログを作成するか、既存のカタログを再利用して、特徴量テーブルを格納するスキーマを作成します。

  • 新しいカタログを作成するには、メタストアに対する CREATE CATALOG 権限が必要です。
  • 既存のカタログを使用するには、カタログに対する USE CATALOG 権限が必要です。
  • カタログに新しいスキーマを作成するには、カタログに対する CREATE SCHEMA 権限が必要です。
# カタログとスキーマを指定。存在しない場合には作成してください。
spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")

次に、主キー制約を使用してUnity Catalogに時系列特徴量テーブルを作成します。

CREATE TABLE SQL構文を使用してUnity Catalogに直接テーブルを作成できます。主キー制約を使用して主キー列を指定します。時系列テーブルの場合、TIMESERIESを使用して時系列列を注釈します(AWSAzureGCP)。

時系列の列はTimestampTypeまたはDateTypeでなければなりません。

%sql
CREATE TABLE IF NOT EXISTS trip_pickup_time_series_features(
  zip INT NOT NULL,
  ts TIMESTAMP NOT NULL,
  mean_fare_window_1h_pickup_zip FLOAT,
  count_trips_window_1h_pickup_zip INT,
  CONSTRAINT trip_pickup_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "タクシー料金。乗車時系列の特徴量。";
%sql
CREATE TABLE IF NOT EXISTS trip_dropoff_time_series_features(
  zip INT NOT NULL,
  ts TIMESTAMP NOT NULL,
  count_trips_window_30m_dropoff_zip INT,
  dropoff_is_weekend INT,
  CONSTRAINT trip_dropoff_time_series_features_pk PRIMARY KEY (zip, ts TIMESERIES)
)
COMMENT "タクシー料金。降車時系列の特徴量。";

初期特徴量をUnity Catalogの特徴量テーブルに書き込む

Feature Engineeringクライアントのインスタンスを作成します。

from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()

write_table APIを使用して、特徴量をUnity Catalogの特徴量テーブルに書き込みます。

時系列特徴量テーブルに書き込むには、DataFrameに時系列列として指定する列が含まれている必要があります。

spark.conf.set("spark.sql.shuffle.partitions", "5")
fe.write_table(
    name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features",
    df=pickup_features
)
fe.write_table(
    name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features",
    df=dropoff_features
)

これで、特徴両テーブルが作成され、特徴量が登録されました。

Screenshot 2025-03-05 at 14.42.25.png
Screenshot 2025-03-05 at 14.43.06.png

特徴量の更新

write_table 関数を使用して特徴量テーブルの値を更新します。

display(raw_data)
tpep_pickup_datetime tpep_dropoff_datetime trip_distance fare_amount pickup_zip dropoff_zip
2016-02-16T22:40:45.000+00:00 2016-02-16T22:59:25.000+00:00 5.35 18.5 10003 11238
2016-02-05T16:06:44.000+00:00 2016-02-05T16:26:03.000+00:00 6.5 21.5 10282 10001
2016-02-08T07:39:25.000+00:00 2016-02-08T07:44:14.000+00:00 0.9 5.5 10119 10003
2016-02-29T22:25:33.000+00:00 2016-02-29T22:38:09.000+00:00 3.5 13.5 10001 11222
2016-02-03T17:21:02.000+00:00 2016-02-03T17:23:24.000+00:00 0.3 3.5 10028 10028
# 新しいバッチのpickup_featuresフィーチャーグループを計算する。
new_pickup_features = pickup_features_fn(
    df=raw_data,
    ts_column="tpep_pickup_datetime",
    start_date=datetime(2016, 2, 1),
    end_date=datetime(2016, 2, 29),
)
# 新しいpickup features DataFrameをフィーチャーテーブルに書き込む
fe.write_table(
    name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features",
    df=new_pickup_features,
    mode="merge",
)

# 新しいバッチのdropoff_featuresフィーチャーグループを計算する。
new_dropoff_features = dropoff_features_fn(
    df=raw_data,
    ts_column="tpep_dropoff_datetime",
    start_date=datetime(2016, 2, 1),
    end_date=datetime(2016, 2, 29),
)
# 新しいdropoff features DataFrameをフィーチャーテーブルに書き込む
fe.write_table(
    name=f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features",
    df=new_dropoff_features,
    mode="merge",
)

書き込み時には、merge モードがサポートされています。

    fe.write_table(
      name="ml.taxi_example.trip_pickup_time_series_features",
      df=new_pickup_features,
      mode="merge",
    )

データは、df.isStreamingTrue に設定されているデータフレームを渡すことで、特徴量テーブルにストリーミングすることもできます:

    fe.write_table(
      name="ml.taxi_example.trip_pickup_time_series_features",
      df=streaming_pickup_features,
      mode="merge",
    )

Databricks Jobsを使用してノートブックを定期的にスケジュールし、特徴量を更新することができます (AWS|Azure|GCP)。

アナリストは、例えば次のようにSQLを使用してUnity Catalogの特徴量テーブルと対話できます:

%sql
SELECT
  SUM(count_trips_window_30m_dropoff_zip) AS num_rides,
  dropoff_is_weekend
FROM
  trip_dropoff_time_series_features
WHERE
  dropoff_is_weekend IS NOT NULL
GROUP BY
  dropoff_is_weekend;
num_rides dropoff_is_weekend
15564 0
5670 1

特徴量の検索と発見

Unity CatalogのFeatures UIで特徴量テーブルを発見できます。"trip_pickup_time_series_features"または"trip_dropoff_time_series_features"で検索し、テーブル名をクリックして、カタログエクスプローラーUIでテーブルスキーマ、メタデータ、系統などの詳細を確認できます。特徴量テーブルの説明も編集できます。特徴量の発見と系統追跡の詳細については、(AWS|Azure|GCP)を参照してください。

カタログエクスプローラーUIで特徴量テーブルの権限も設定できます。詳細については、(AWS|Azure|GCP)を参照してください。

Screenshot 2025-03-05 at 14.47.23.png

モデルのトレーニング

このセクションでは、ポイントインタイムルックアップを使用して時系列のpickupおよびdropoff特徴量テーブルを使用してトレーニングセットを作成し、トレーニングセットを使用してモデルをトレーニングする方法を説明します。タクシー料金を予測するためにLightGBMモデルをトレーニングします。

ヘルパー関数

import mlflow.pyfunc

def get_latest_model_version(model_name):
    latest_version = 1
    mlflow_client = MlflowClient()
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
        version_int = int(mv.version)
        if version_int > latest_version:
            latest_version = version_int
    return latest_version

トレーニングデータセットの作成方法の理解

モデルをトレーニングするためには、トレーニングデータセットを作成する必要があります。トレーニングデータセットは以下で構成されます:

  1. 生の入力データ
  2. Unity Catalogの特徴量テーブルからの特徴量

生の入力データが必要な理由は以下の通りです:

  1. 主キーおよび時系列カラムは、ポイントインタイムの正確性を持つ特徴量と結合するために使用されます (AWS|Azure|GCP)。
  2. 特徴量テーブルに含まれていないtrip_distanceのような生の特徴量。
  3. モデルのトレーニングに必要なfareのような予測ターゲット。

以下のビジュアル概要は、生の入力データがUnity Catalogの特徴量と組み合わされてトレーニングデータセットを生成する様子を示しています:

これらの概念は、トレーニングデータセット作成のドキュメントでさらに説明されています (AWS|Azure|GCP)。

次のセルでは、必要な特徴量ごとにFeatureLookupを作成して、Unity Catalogからモデルトレーニング用の特徴量をロードします。

時系列特徴量テーブルから特徴量値をポイントインタイムでルックアップするには、特徴量のFeatureLookuptimestamp_lookup_keyを指定する必要があります。これは、時系列特徴量をルックアップするためのタイムスタンプを含むDataFrameカラムの名前を示します。DataFrameの各行に対して、取得される特徴量値は、DataFrameのtimestamp_lookup_keyカラムに指定されたタイムスタンプより前の最新の特徴量値であり、主キーがDataFrameのlookup_keyカラムの値と一致するものです。一致する特徴量値が存在しない場合はnullが返されます。

from databricks.feature_engineering import FeatureLookup
import mlflow

pickup_features_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_pickup_time_series_features"
dropoff_features_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.trip_dropoff_time_series_features"

pickup_feature_lookups = [
    FeatureLookup(
        table_name=pickup_features_table,
        feature_names=[
            "mean_fare_window_1h_pickup_zip",
            "count_trips_window_1h_pickup_zip",
        ],
        lookup_key=["pickup_zip"],
        timestamp_lookup_key="tpep_pickup_datetime",
    ),
]

dropoff_feature_lookups = [
    FeatureLookup(
        table_name=dropoff_features_table,
        feature_names=["count_trips_window_30m_dropoff_zip", "dropoff_is_weekend"],
        lookup_key=["dropoff_zip"],
        timestamp_lookup_key="tpep_dropoff_datetime",
    ),
]

MLflowクライアントを設定してUnity Catalogのモデルにアクセスする

import mlflow
mlflow.set_registry_uri("databricks-uc")

fe.create_training_set(..)が呼び出されると、以下のステップが実行されます:

  1. TrainingSetオブジェクトが作成され、モデルのトレーニングに使用する特定の特徴量が特徴量テーブルから選択されます。各特徴量は、以前に作成されたFeatureLookupによって指定されます。

  2. 特徴量は各FeatureLookuplookup_keyに従って生の入力データと結合されます。

  3. データリーク問題を回避するためにポイントインタイムルックアップが適用されます。timestamp_lookup_keyに基づいて、最新の特徴量値のみが結合されます。

その後、TrainingSetはトレーニング用のDataFrameに変換されます。このDataFrameには、taxi_dataのカラムと、FeatureLookupsで指定された特徴量が含まれます。

# 既存のランを終了します(このノートブックが2回目以降に実行されている場合)
mlflow.end_run()

# モデルをログに記録するために必要なmlflowランを開始します
mlflow.start_run()

# タイムスタンプ列は追加の特徴量エンジニアリングが行われない限り、モデルがデータに過剰適合する可能性が高いため、
# それらを除外してトレーニングを行わないようにします。
exclude_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

# 生データと両方の特徴量テーブルからの対応する特徴量をマージしたトレーニングセットを作成します
training_set = fe.create_training_set(
    df=raw_data,
    feature_lookups=pickup_feature_lookups + dropoff_feature_lookups,
    label="fare_amount",
    exclude_columns=exclude_columns,
)

# sklearnでモデルをトレーニングするために渡すことができるデータフレームにTrainingSetをロードします
training_df = training_set.load_df()
# トレーニングデータフレームを表示します。これには、生の入力データと特徴量テーブルからの特徴量(例:`dropoff_is_weekend`)の両方が含まれています。
display(training_df)
trip_distance pickup_zip dropoff_zip mean_fare_window_1h_pickup_zip count_trips_window_1h_pickup_zip count_trips_window_30m_dropoff_zip dropoff_is_weekend fare_amount
0.89 7002 7002 null null null null 8.5
2.3 7002 7002 8.5 1 1 0 12.5
20.85 10013 7008 6.5 1 null null 275
7.88 10028 7024 10 1 null null 38
9.1 10103 7024 12.5 1 1 0 58

TrainingSet.to_dfで返されたデータに対してLightGBMモデルをトレーニングし、FeatureEngineeringClient.log_modelでモデルをログします。モデルは特徴量メタデータと共にパッケージ化されます。

from sklearn.model_selection import train_test_split
from mlflow.tracking import MlflowClient
import lightgbm as lgb
import mlflow.lightgbm
from mlflow.models.signature import infer_signature

# 特徴量とラベルを含む列を取得します
features_and_label = training_df.columns

# トレーニングのためにデータをPandas配列に収集します
data = training_df.toPandas()[features_and_label]

# データをトレーニングセットとテストセットに分割します
train, test = train_test_split(data, random_state=123)
X_train = train.drop(["fare_amount"], axis=1)
X_test = test.drop(["fare_amount"], axis=1)
y_train = train.fare_amount
y_test = test.fare_amount

# MLflowの自動ロギングを有効にします
mlflow.lightgbm.autolog()
train_lgb_dataset = lgb.Dataset(X_train, label=y_train.values)
test_lgb_dataset = lgb.Dataset(X_test, label=y_test.values)

# LightGBMのパラメータを設定します
param = {"num_leaves": 32, "objective": "regression", "metric": "rmse"}
num_rounds = 100

# LightGBMモデルをトレーニングします
model = lgb.train(param, train_lgb_dataset, num_rounds)
# トレーニング済みモデルをMLflowでログし、特徴量ルックアップ情報と共にパッケージ化します。
fe.log_model(
    model=model,
    artifact_path="model_packaged",
    flavor=mlflow.lightgbm,
    training_set=training_set,
    registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged",
)

トレーニングデータセット、ノートブックなどの情報とともにモデルが記録されます。

Screenshot 2025-03-05 at 14.51.34.png

カタログエクスプローラーでモデルのリネージを確認

カタログエクスプローラーのテーブル詳細ページにアクセスします。「依存関係」タブに移動し、「リネージグラフを見る」をクリックします。特徴量テーブルに下流のモデルリネージが表示されます。

Screenshot 2025-03-05 at 14.57.48.png

カスタムPyFuncモデルの構築とログ

モデルに前処理や後処理のコードを追加し、バッチ推論で処理された予測を生成するには、これらのメソッドをカプセル化するカスタムPyFunc MLflowモデルを構築できます。次のセルは、モデルからの数値予測に基づいて文字列出力を返す例を示しています。

class fareClassifier(mlflow.pyfunc.PythonModel):
    def __init__(self, trained_model):
        self.model = trained_model

    def preprocess_result(self, model_input):
        return model_input

    def postprocess_result(self, results):
        """予測結果を後処理します。
        運賃の範囲を作成し、予測された範囲を返します。"""

        return [
            "$0 - $9.99" if result < 10 else "$10 - $19.99" if result < 20 else " > $20"
            for result in results
        ]

    def predict(self, context, model_input):
        processed_df = self.preprocess_result(model_input.copy())
        results = self.model.predict(processed_df)
        return self.postprocess_result(results)


pyfunc_model = fareClassifier(model)

# 現在のMLflowランを終了し、新しいpyfuncモデルをログするために新しいランを開始します
mlflow.end_run()

with mlflow.start_run() as run:
    fe.log_model(
        model=pyfunc_model,
        artifact_path="pyfunc_packaged_model",
        flavor=mlflow.pyfunc,
        training_set=training_set,
        registered_model_name=f"{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged",
    )

スコアリング: バッチ推論

別のデータサイエンティストがこのモデルを別のデータバッチに適用したいとします。

推論に使用するデータを表示し、予測ターゲットである fare_amount 列を強調表示するように並べ替えます。

cols = [
    "fare_amount",
    "trip_distance",
    "pickup_zip",
    "dropoff_zip",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
]
new_taxi_data = raw_data.select(cols)
display(new_taxi_data)
fare_amount trip_distance pickup_zip dropoff_zip tpep_pickup_datetime tpep_dropoff_datetime
18.5 5.35 10003 11238 2016-02-16T22:40:45.000+00:00 2016-02-16T22:59:25.000+00:00
21.5 6.5 10282 10001 2016-02-05T16:06:44.000+00:00 2016-02-05T16:26:03.000+00:00
5.5 0.9 10119 10003 2016-02-08T07:39:25.000+00:00 2016-02-08T07:44:14.000+00:00
13.5 3.5 10001 11222 2016-02-29T22:25:33.000+00:00 2016-02-29T22:38:09.000+00:00
3.5 0.3 10028 10028 2016-02-03T17:21:02.000+00:00 2016-02-03T17:23:24.000+00:00

score_batch API を使用して、Unity Catalog の Feature Engineering から必要な特徴量を取得し、データのバッチでモデルを評価します。

時系列特徴量テーブルからの特徴量でトレーニングされたモデルをスコアリングする場合、トレーニング中にモデルにパッケージ化されたメタデータを使用して、適切な特徴量がポイントインタイムルックアップで取得されます。FeatureEngineeringClient.score_batch に提供する DataFrame には、FeatureEngineeringClient.create_training_set に提供された FeatureLookup の timestamp_lookup_key と同じ名前とデータ型のタイムスタンプ列が含まれている必要があります。

# モデルURIを取得
latest_model_version = get_latest_model_version(
    f"{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged"
)
model_uri = f"models:/{CATALOG_NAME}.{SCHEMA_NAME}.taxi_example_fare_time_series_packaged/{latest_model_version}"

# score_batchを呼び出してモデルから予測を取得
with_predictions = fe.score_batch(model_uri=model_uri, df=new_taxi_data)

記録された PyFunc モデルを使用してスコアリングするには:

latest_pyfunc_version = get_latest_model_version(
    f"{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged"
)
pyfunc_model_uri = (
    f"models:/{CATALOG_NAME}.{SCHEMA_NAME}.pyfunc_taxi_fare_time_series_packaged/{latest_pyfunc_version}"
)

pyfunc_predictions = fe.score_batch(
    model_uri=pyfunc_model_uri, df=new_taxi_data, result_type="string"
)

タクシー料金予測を表示

このコードは列を並べ替えて、タクシー料金の予測を最初の列に表示します。predicted_fare_amount が実際の fare_amount と大まかに一致することに注意してください。ただし、モデルの精度を向上させるには、より多くのデータと特徴エンジニアリングが必要です。

import pyspark.sql.functions as func

cols = [
    "prediction",
    "fare_amount",
    "trip_distance",
    "pickup_zip",
    "dropoff_zip",
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "mean_fare_window_1h_pickup_zip",
    "count_trips_window_1h_pickup_zip",
    "count_trips_window_30m_dropoff_zip",
    "dropoff_is_weekend",
]

with_predictions_reordered = (
    with_predictions.select(
        cols,
    )
    .withColumnRenamed(
        "prediction",
        "predicted_fare_amount",
    )
    .withColumn(
        "predicted_fare_amount",
        func.round("predicted_fare_amount", 2),
    )
)

display(with_predictions_reordered)
predicted_fare_amount fare_amount trip_distance pickup_zip dropoff_zip tpep_pickup_datetime tpep_dropoff_datetime mean_fare_window_1h_pickup_zip count_trips_window_1h_pickup_zip count_trips_window_30m_dropoff_zip dropoff_is_weekend
27.77 8.5 0.89 7002 7002 2016-01-22T13:42:38.000+00:00 2016-01-22T13:54:09.000+00:00 null null null null
16.36 12.5 2.3 7002 7002 2016-02-26T11:11:35.000+00:00 2016-02-26T11:27:32.000+00:00 8.5 1 1 0
121.96 275 20.85 10013 7008 2016-02-12T20:55:19.000+00:00 2016-02-12T21:52:38.000+00:00 6.5 1 null null
39.34 38 7.88 10028 7024 2016-02-09T00:08:12.000+00:00 2016-02-09T00:20:06.000+00:00 10 1 null null
35.13 58 9.1 10103 7024 2016-02-22T18:13:00.000+00:00 2016-02-22T18:38:14.000+00:00 12.5 1 1 0

PyFunc 予測を表示

display(pyfunc_predictions.select("fare_amount", "prediction"))
fare_amount prediction
8.5 > $20
12.5 $10 - $19.99
275 > $20
38 > $20
58 > $20

次のステップ

  1. Features UI でこの例で作成された特徴量テーブルを探索します。
  2. 特徴テーブルをオンラインストアに公開します (AWS|Azure)。
  3. Unity Catalog でモデルを Model Serving にデプロイします (AWS|Azure)。
  4. このノートブックを自分のデータに適用し、独自の特徴量テーブルを作成します。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?