0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksにおけるオンデマンド特徴量の計算

Posted at

こちらの機能のサンプルノートブックをウォークスルーします。

オンデマンド特徴量とは

「オンデマンド」とは、値が事前にわかっていないが、推論時に計算される特徴量を指します。 Databricksでは、Pythonユーザー定義関数 (UDF)を使用して、オンデマンド機能の計算方法を指定します。 これらの関数はUnity Catalogによって管理され、カタログエクスプローラーで検出できます。

オンデマンド特徴量 - 基本的なデモ

この例では、オンデマンド特徴量を使用してモデルをトレーニングし、スコアリングします。

この特徴量は、JSON文字列を解析してウェブページ上のホバー時間のリストを抽出します。これらの時間を平均し、その平均値をモデルの特徴量として渡します。

要件:

  • Databricks Runtime for ML 13.3 LTS以上を実行しているクラスター。
  • クラスターアクセスモデルはシングルユーザーである必要があります。

ヘルパー関数とノートブック変数

import json
import random
import time

import pandas as pd
import requests
from datetime import datetime
from databricks.feature_engineering import FeatureEngineeringClient, FeatureFunction
from pyspark.sql.types import IntegerType, BooleanType, StructField, StructType, StringType

import mlflow
from mlflow import MlflowClient
fe = FeatureEngineeringClient()
suffix = random.randint(1, 10000000000)
registered_model_name = f"takaakiyayoi_catalog.on_demand_demo.simple_model_{suffix}"
endpoint_name = f"on_demand_demo_simple_endpoint_{suffix}"
should_cleanup = True

セットアップ

%sql
-- デモ機能テーブルと関数を格納するスキーマを作成
CREATE SCHEMA IF NOT EXISTS takaakiyayoi_catalog.on_demand_demo
%sql
-- Python UDFを定義
CREATE OR REPLACE FUNCTION takaakiyayoi_catalog.on_demand_demo.avg_hover_time(blob STRING)
RETURNS FLOAT
LANGUAGE PYTHON
COMMENT "JSON blobからhover timeを抽出し、平均を計算"
AS $$
import json

def calculate_average_hover_time(json_blob):
    # JSON blobを解析
    data = json.loads(json_blob)

    # 'hover_time'リストが存在し、空でないことを確認
    hover_time_list = data.get('hover_time')
    if not hover_time_list:
        raise ValueError("hover_timeリストが見つからないか、リストが空です")

    # hover timeの合計時間を計算し、平均を算出
    total_duration = sum(hover_time_list)
    average_duration = total_duration / len(hover_time_list)

    return average_duration

return calculate_average_hover_time(blob)
$$

関数が定義されます。

Screenshot 2024-11-29 at 15.46.18.png

次のセルに示すように、SQLからPython UDFを呼び出すことができます。

%sql SELECT takaakiyayoi_catalog.on_demand_demo.avg_hover_time('{"hover_time": [5.0, 3.2, 4.1, 2.8, 6.7]}')

Screenshot 2024-11-29 at 15.46.59.png

トレーニングデータセットを準備します。

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("json_blob", StringType(), True),
    StructField("label", BooleanType(), True)
])
data = [
  (1, '{"hover_time": [3.5, 4.2, 5.1, 2.8, 3.3]}', True), 
  (2, '{"hover_time": [1.2, 2.3, 3.4, 4.5, 5.6, 6.7]}', False), 
  (3, '{"hover_time": [7.8, 8.9, 6.1, 4.0, 5.3]}', True)
]

label_df = spark.createDataFrame(data, schema)

オンデマンド特徴量を使用してトレーニングセットを作成する

features = [
    FeatureFunction(
        udf_name="takaakiyayoi_catalog.on_demand_demo.avg_hover_time",
        output_name="on_demand_output",
        input_bindings={"blob": "json_blob"},
    )
]

training_set = fe.create_training_set(
    df=label_df, feature_lookups=features, label="label", exclude_columns=["id"]
)
display(training_set.load_df())

生データ、オンデマンド特徴量、予測ラベルから構成されるデータセットが作成されました。

Screenshot 2024-11-29 at 15.48.11.png

トレーニングセットを使用してシンプルなモデルを記録する

簡単にするため、このノートブックではハードコーディングされたモデルを使用します。実際には、生成されたトレーニングセットでトレーニングされたモデルをログします。

class HighViewTime(mlflow.pyfunc.PythonModel):
    def predict(self, ctx, inp):
        return inp['on_demand_output'] > 5
model_name = "fs_packaged_model"
mlflow.set_registry_uri("databricks-uc")

fe.log_model(
    model=HighViewTime(),
    artifact_path=model_name,
    flavor=mlflow.pyfunc,
    training_set=training_set,
    registered_model_name=registered_model_name
)

モデルが記録、登録されます。

Screenshot 2024-11-29 at 15.49.32.png

関数とモデルのリネージを確認できます。

Screenshot 2024-11-29 at 15.51.07.png

score_batch を使用してモデルをスコアリングする

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("json_blob", StringType(), True),
])
data = [
  (4, '{"hover_time": [2.1, 3.1, 4.1, 5.1, 6.1]}'), 
  (5, '{"hover_time": [4.4, 5.5, 6.6, 7.7, 8.8]}'), 
]

scoring_df = spark.createDataFrame(data, schema)
result = fe.score_batch( 
  model_uri = f"models:/{registered_model_name}/1",
  df = scoring_df,
  result_type = 'bool'
)

display(result)

推論が行われました。

Screenshot 2024-11-29 at 15.50.23.png

特徴量ストアパッケージモデルのサービング

オンラインからオンデマンド特徴量を利用できるようにするために、このモデルをモデルサービングエンドポイントにデプロイします。

start_endpoint_json_body = {
    "name": endpoint_name,
    "config": {
        "served_entities": [
            {
                "entity_name": registered_model_name,
                "entity_version": "1",
                "workload_size": "Small",
                "scale_to_zero_enabled": True,
            }
        ]
    },
}

host_creds = mlflow.utils.databricks_utils.get_databricks_host_creds()
headers = {
  "Authorization": f"Bearer {host_creds.token}",
  "Content-Type": "application/json"
}

response = requests.request(
    url=f"{host_creds.host}/api/2.0/serving-endpoints",
    method="POST",
    json=start_endpoint_json_body,
    headers=headers
)

assert (
    response.status_code == 200
), f"Failed to launch model serving cluster: {response.text}"

print("Starting model serving endpoint. See Serving page for status.")

数分待つと、エンドポイントが利用できるようになります。

Screenshot 2024-11-29 at 15.56.23.png

エンドポイントへのクエリー

req = pd.DataFrame(
    [
        {"json_blob": '{"hover_time": [5.5, 2.3, 10.3]}'}
    ]
)
json_req = json.dumps({"dataframe_split": json.loads(req.to_json(orient="split"))})

response = requests.request(
  method="POST", 
  headers=headers, 
  url=f"{host_creds.host}/serving-endpoints/{endpoint_name}/invocations", 
  data=json_req
)

response.json()
{'predictions': [{'on_demand_output': True}]}

あるいは、サービングエンドポイントのUIを使用してリクエストを送信します:

{
  "dataframe_records": [
    {"json_blob": "{\"hover_time\": [5.5, 2.3, 10.3]}"}
  ]
}

Screenshot 2024-11-29 at 15.56.10.png

クリーンアップ

モデルサービングエンドポイントを削除します。

if should_cleanup:
  MlflowClient().delete_registered_model(name=registered_model_name)
  requests.request(
    method="DELETE", 
    headers=headers, 
    url=f"{host_creds.host}/api/2.0/preview/serving-endpoints/{endpoint_name}" 
  )

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?