LoginSignup
0
1

More than 1 year has passed since last update.

TectonとDatabricksを使って15分でプロダクションMLアプリケーションを実現する

Posted at

How to Realize Real-time ML in 15 Minutes with Tecton and Databricks - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

機械学習(ML)システムをプロダクションに持っていくことはいまだに困難なことです。多くのチームにとって、リアルタイムデータを操作するリアルタイムMLシステムの構築は未だ夢物語となっています。多くの場合、これらのシステムはバッチのプロダクションシステムと結合されています。スケーラブルなリアルタイムのデータパイプライン、スケーラブルなモデル推論エンドポイントの構築、これら全てをプロダクションのアプリケーションにインテグレーションするリアルタイムMLの道のりには多くの課題が存在します。

この記事では、適切なツールを用いて、どのようにしてこれらの課題を劇的にシンプルにできるのかをご説明します。TectonとDatabricksを用いることで、リアルタイムデータ処理とオンライン推論を含むリアルタイムMLシステムのMVPを数分で構築することができます。

一つのノートブックでリアルタイムMLシステムを構築する

このサンプルでは、トランザクションを許可あるいは拒否するかを決定するリアルタイム不正検知システムの構築にフォーカスします。リアルタイム不正検知システムの構築における最も困難な課題の2つは以下のようなものとなります。

  • リアルタイムの推論: 通常、モデル推論は支払処理を遅延させないため、100ms未満で行われるように、非常に高速な予測が必要となります。
  • リアルタイムの特徴量: 多くの場合、不正トランザクションを検知するために最も必要な重要なデータは、過去数秒に発生した事象を表現するものです。素晴らしい不正検知システムを構築するためには、トランザクションが発生してから数秒以内に特徴量を更新する必要があります。

TectonとDatabricksを用いることで、これらの課題をとてもシンプルなものにすることができます。

  • DatabricksとネイティブなMLflowインテグレーションを用いることで、リアルタイム予測を行うためのサービングエンドポイントを作成しテストすることができます。
  • Tectonはリアルタイムで特徴量を計算する高性能ストリーム集計処理の構築に役立ちます。

リアルタイムプロダクションMLシステムを構築する以下の4つのステップをウォークスルーしていきましょう。

  1. Tectonで高性能ストリーム処理パイプラインを構築
  2. DatabricksとMLflowを用いてTectonからの特徴量でモデルをトレーニングする
  3. MLflowを用いてDatabricksでモデルサービングエンドポイントを作成する
  4. モデルサービングエンドポイントとTectonからのリアルタイム特徴量を用いてリアルタイム予測を行う

Tectonでストリーミング処理パイプラインを構築する

Tectonの特徴量プラットフォームは、特徴量の定義をシンプルにし、皆様のMLモデルでこれらの特徴量を利用できるように構築されています。不正検知モデルでは以下のようにいくつか異なるタイプの特徴量が必要となります。

  • 昨年の国内の平均トランザクションのサイズ(日次で計算)
  • 最新1分間におけるユーザーごとのトランザクション数(トランザクションのストリームから連続的に計算)
  • ユーザーの自宅からトランザクション地点への距離(トランザクション時にオンデマンドで計算)

それぞれのタイプの特徴量は、異なるタイプのデータパイプラインを必要とし、Tectonはこれらのタイプの特徴量の全ての構築に役立ちます。多くの場合、最も難しいタイプの特徴量であるリアルタイムストリーミングの特徴量にフォーカスしましょう。

Tectonで特徴量「最新1分間と最新5分間におけるユーザーごとのトランザクション数」は異顔のように実装できます。

Python
@stream_window_aggregate_feature_view(
    inputs={'transactions': Input(transactions_stream)},
    entities=[user],
    mode='spark_sql',
    aggregation_slide_period='continuous',
    aggregations=[
        FeatureAggregation(column='counter', function='count', time_windows=['1m', '5m'])
    ],
    online=True,
    offline=True,
    feature_start_time=datetime(2022, 4, 1),
    family='fraud',
    tags={'release': 'production'},
    owner='david@tecton.ai',
    description='Number of transactions a user has made recently'
)
def user_continuous_transaction_count(transactions):
    return f'''
        SELECT
            user_id,
            1 as counter,
            timestamp
        FROM
            {transactions}
        '''

この例では、Tectonビルトインの低レーテンシーストリーミング集計処理を活用しており、ユーザーが行なったトランザクション数のカウントをリアルタイムで保持することができます。

ユーザーのトランザクション数を計算するためのTectonで構築された特徴量パイプライン

この特徴量が適用されると、この特徴量をリアルタイム(モデル推論で使用)とオフライン(モデルトレーニングで使用)で利用できるように、TectonはDatabricks内のデータパイプラインのオーケストレーションをスタートします。過去の特徴量はDelta Lakeに保管されるので、構築した全ての特徴量はお使いのデータレイクハウスでネイティブに利用できることを意味します。

DatabricksとMLflowを用いてTectonからの特徴量でモデルをトレーニングする

Tectonで特徴量が計算されると、不正検知モデルをトレーニングできるようになります。以下のことを行なっているノートブックをチェックしてみてください。

  • Tectonのタイムトラベル機能を用いたトレーニングデータの生成
  • トランザクションが不正かそうでないかを予測するSKlearnモデルのトレーニング
  • MLflowを用いたエクスペリメントの追跡
Python
# 1. Fetching a Spark DataFrame of historical labeled transactions
# 2. Renaming columns to match the expected join keys for the Feature Service
# 3. Selecting the join keys, request data, event timestamp, and label
training_events = ws.get_data_source("transactions_batch").get_dataframe().to_spark() \
                        .filter("partition_0 == 2022").filter("partition_2 == 05") \
                        .select("user_id", "merchant", "timestamp", "amt", "is_fraud") \
                        .cache()


training_data = fraud_detection_feature_service.get_historical_features(spine=training_events, timestamp_key="timestamp").to_spark()
training_data_pd = training_data.drop("user_id", "merchant", "timestamp", "amt").toPandas()
y = training_data_pd['is_fraud']
x = training_data_pd.drop('is_fraud', axis=1)
X_train, X_test, y_train, y_test = train_test_split(x, y)
Python
with mlflow.start_run() as run:
  n_estimators = 100
  max_depth = 6
  max_features = 3
  # Create and train model
  rf = RandomForestRegressor(n_estimators = n_estimators, max_depth = max_depth, max_features = max_features)
  rf.fit(X_train, y_train)
  # Make predictions
  predictions = rf.predict(X_test)
  
  # Log parameters
  mlflow.log_param("num_trees", n_estimators)
  mlflow.log_param("maxdepth", max_depth)
  mlflow.log_param("max_feat", max_features)
  mlflow.log_param("tecton_feature_service", feature_service_name)
  
  # Log model
  mlflow.sklearn.log_model(rf, "random-forest-model")
  
  # Create metrics
  mse = mean_squared_error(y_test, predictions)
    
  # Log metrics
  mlflow.log_metric("mse", mse)

MLflowを用いてDatabricksでモデルサービングエンドポイントを作成する

これでトレーニング済みのモデルを手に入れたので、モデルエンドポイントを作成するためにDatabricksのMLflowを活用します。最初にMLflowのモデルレジストリにモデルを登録します。

MLflowモデルレジストリにトレーニングしたモデルを登録


「tecton-databricks-fraud-model」という新規モデルを作成

次に、サービングエンドポイントを作成するためにMLflowを利用します。

MLflowモデルレジストリUIからサービングを有効化

モデルがデプロイされたら、エンドポイントのURLをメモしておきます。

これで、リアルタイムでトランザクションのスコアリングを実行することができる予測エンドポイントを手に入れました。残りは、予測時に必要となる特徴量の収集です。

モデルサービングエンドポイントとTectonからのリアルタイム特徴量を用いてリアルタイム予測を行う

先ほど作成したモデルエンドポイントは、入力として特徴量を受け取り、トランザクションが不正である確率の予測値を出力します。これらの特徴量の収集は、いくつか困難な問題を突きつけます。

  • レーテンシーの制約: 全体的なレーテンシーの制限に収まるように、非常に高速に(< 50ms)特徴量を検索(計算)する必要があります。
  • 特徴量の鮮度: トランザクションが発生しているのと同じように、(1分間のトランザクション数のように)我々が定義した特徴量もリアルタイムで更新されることを期待します。

Tectonはこれらの困難な問題を解決するために、特徴量サービングインフラストラクチャを提供します。Tectonは、高スケールかつ低レーテンシーで特徴量ベクトルをサービングするように設計されています。特徴量を構築した際、すでに我々は自身のモデルに最新の特徴量を生成するために活用するリアルタイムストリーミングパイプラインをセットアップしています。

Tectonのおかげで、Tectonの特徴量サービングエンドポイントに対するシンプルなRESTコールを用いることで、リアルタイムで特徴量を収集することができます。

Bash
curl -X POST https://app.tecton.ai/api/v1/feature-service/get-features\
     -H "Authorization: Tecton-key $TECTON_API_KEY" -d\
'{
  "params": {
    "feature_service_name": "fraud_detection_feature_service",
    "join_key_map": {
      "user_id": "USER_ID_VALUE"
    },
    "request_context_map": {
      "amt": 12345678.9
    },
    "workspace_name": "tecton-databricks-demo"
  }

Tectonから特徴量を収集し、リアルタイムの不正予測を取得するためにモデルエンドポイントに送信する流れを全てまとめたノートブックで残りをチェックしてください

Python
def score_model(dataset):
  headers = {'Authorization': f'Bearer {my_token}'}
  data_json = dataset.to_dict(orient='split')
  response = requests.request(method='POST', headers=headers, url=model_url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

amount=12345.0
df = fraud_detection_feature_service.get_online_features(
  join_keys={'user_id': 'user_131340471060', 'merchant': 'fraud_Schmitt Inc'},
  request_data={"amt": amount}
).to_pandas().fillna(0)

prediction = score_model(df)

print(prediction[0])

まとめ

リアルタイムMLシステムの構築は大変なタスクです!ストリーミングデータパイプラインの構築のような個々のコンポーネントは、手動で行ったら数ヶ月もかかるエンジニアリングプロジェクトになるでしょう。幸運なことに、TectonとDatabricksを用いてリアルタイムMLシステムを構築することで、これらの複雑性の多くをシンプルにすることができます。一つのノートブックで、リアルタイム不正検知モデルをトレーニング、デプロイ、サービングすることができ、これにかかる時間は約15分です。

TectonがどれだけリアルタイムMLシステムを支援するのかを知りたいのであれば、Data & AIサミットでのTectonのセッションScaling ML at CashApp with TectonBuilding Production-Ready Recommender Systems with Feature Storesをチェックしてください。

Databricks 無料トライアル

Databricks 無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1