0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Databricksにおけるモデルサービング

Last updated at Posted at 2021-10-20

MLflow Model Serving on Databricks | Databricks on AWS [2021/10/14時点]の翻訳です。

プレビュー
本機能はパブリックプレビューです。

MLflowのモデルサービングを用いることで、モデルレジストリからモデルを、モデルのバージョン、ステージの基づいて自動でアップデートされるRESTエンドポイントをホストすることができます。

特定の登録モデルに対するモデルサービングを有効化する際、Databricksはモデル固有のクラスターを自動で作成し、非アーカイブの状態のモデル全てをそのクラスター上にデプロイします。エラーが発生した際にはDatabricksをクラスターを再起動し、モデルサービングを無効にした際にはクラスターを停止します。モデルサービングは自動でモデルレジストリと同期し、新たに登録されたモデルバージョンをデプロイします。デプロイされたモデルバージョンは、標準的なREST APIリクエストで問い合わせすることができます。Databricksは標準的な認証方法を用いてリクエストを認証します。

このサービスはプレビューですが、Databricksは低スループット、クリティカルではないアプリケーションでの利用をお勧めします。保証はしませんが、ターゲットのスループットは200qps(秒あたりクエリー数)であり、ターゲットの可用性は99.5%です。さらに、リクエストあたり16MBのペイロードサイズの制限があります。

それぞれのモデルバージョンはMLflowのモデルデプロイメントを用いてデプロイされ、依存関係によって指定されるConda環境で実行されます。

注意

  • サービングが有効化されている間は、アクティブなモデルバージョンがなくてもクラスターは維持されます。サービングクラスターを停止するには、登録モデルに対するモデルサービングを無効にしてください。
  • クラスターはall-purposeクラスターとみなされ、all-purposeワークロードの課金が発生します。
  • Global init scriptsはサービングクラスターでは実行されません。

要件

  • MLflowのモデルサービングはPythonのMLflowモデルで利用できます。全てのモデルの依存関係をconda環境で宣言する必要があります。
  • モデルサービングを有効化するには、クラスター作成権限を持っている必要があります。

モデルレジストリからのモデルサービング

Databricksのモデルサービングはモデルレジストリから利用できます。

モデルサービングの有効化、無効化

登録モデルページからモデルサービングを有効化できます。

  1. Servingタブをクリックします。モデルのサービングが有効化されていない場合には、Enable Servingボタンが表示されます。
  2. Enable Servingをクリックします。Servingタブが表示され、StatusがPendingとなります。数分後にStatusがReadyになります。

モデルサービングを無効化するにはStopをクリックします。

モデルサービングの検証

Servingタブから、サーブされるモデルにリクエストを送信したり、レスポンスを参照できます。

モデルバージョンのURI

それぞれのモデルバージョンに1つ以上のユニークなURIが割り当てられます。少なくとも、それぞれのモデルバージョンには以下の構成のURLが割り当てられます。

<databricks-instance>/model/<registered-model-name>/<model-version>/invocations

例えば、iris-classifierとして登録されたモデルのバージョン1を呼び出すには、以下のURIを用います。

https://<databricks-instance>/model/iris-classifier/1/invocations

ステージを指定してモデルバージョンを呼び出すこともできます。例えば、バージョン1がProductionステージにあるのであれば、以下のURIで呼び出すこともできます。

https://<databricks-instance>/model/iris-classifier/Production/invocations

利用できるモデルURIは、サービングページのModel Versionsタブの上に表示されます。

サーブされるバージョンの管理

全てのアクティブな(Archiveではない)モデルバージョンはデプロイされ、URIを用いて問い合わせを行うことができます。新規モデルバージョンが登録された時に、Databricksは自動でデプロイを行い、アーカイブされた際には古いバージョンを自動で削除します。

注意
登録モデルのデプロイされた全てのバージョンは同じクラスターを共有します。

モデルアクセス権の管理

モデルのアクセス権はモデルレジストリから継承されます。サービング機能の有効化、無効化には登録されたモデルに対するManage権限が必要となります。読み取り権限を持つユーザーは誰でもデプロイされたバージョンのスコアリングを実行できます。

デプロイされたモデルバージョンのスコアリング

デプロイされたモデルのスコアリングを実行するには、UIを使うか、モデルURIに対してREST APIのリクエストを送信します。

UIによるスコアリング

これはモデルをテストするための最も簡単でクイックな方法です。モデルの入力データをJSONフォーマットで挿入し、Send Requestをクリックします。入力サンプルとともにモデルが記録されている場合(上の図で示されています)には、Load Exampleをクリックして、入力サンプルを呼び出すことができます。

REST APIリクエストによるスコアリング

標準的なDatabricksの認証方法を用いて、REST API経由でスコアリングのリクエストを送信することができます。以下では、パーソナルアクセストークンを用いた例を示します。

https://<databricks-instance>/model/iris-classifier/Production/invocations(ここで<databricks-instance>お使いのDatabricksインスタンス名となります)のようなMODEL_VERSION_URIDATABRICKS_API_TOKENと呼ばれるDatabricks REST APIトークンがある場合、サーブされるモデルに対してクエリーをどのように行うのかを示すサンプルスニペットを以下に示します。

Bash

データフレームの入力を受け付けるモデルに対するクエリーのスニペットです。

Bash
curl -X POST -u token:$DATABRICKS_API_TOKEN $MODEL_VERSION_URI \
  -H 'Content-Type: application/json' \
  -d '[
    {
      "sepal_length": 5.1,
      "sepal_width": 3.5,
      "petal_length": 1.4,
      "petal_width": 0.2
    }
  ]'

テンソルの入力を受け付けるモデルに対するクエリーのスニペットです。テンソルの入力はTensorFlow Serving’s API docsで説明されているようにフォーマットされる必要があります。

Bash
curl -X POST -u token:$DATABRICKS_API_TOKEN $MODEL_VERSION_URI \
   -H 'Content-Type: application/json' \
   -d '{"inputs": [[5.1, 3.5, 1.4, 0.2]]}'

Python

Python
import numpy as np
import pandas as pd
import requests

def create_tf_serving_json(data):
  return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def score_model(model_uri, databricks_token, data):
  headers = {
    "Authorization": f"Bearer {databricks_token}",
    "Content-Type": "application/json",
  }
  data_json = data.to_dict(orient='records') if isinstance(data, pd.DataFrame) else create_tf_serving_json(data)
  response = requests.request(method='POST', headers=headers, url=model_uri, json=data_json)
  if response.status_code != 200:
      raise Exception(f"Request failed with status {response.status_code}, {response.text}")
  return response.json()


# Scoring a model that accepts pandas DataFrames
data =  pd.DataFrame([{
  "sepal_length": 5.1,
  "sepal_width": 3.5,
  "petal_length": 1.4,
  "petal_width": 0.2
}])
score_model(MODEL_VERSION_URI, DATABRICKS_API_TOKEN, data)

# Scoring a model that accepts tensors
data = np.asarray([[5.1, 3.5, 1.4, 0.2]])
score_model(MODEL_VERSION_URI, DATABRICKS_API_TOKEN, data)

PowerBI

以下のステップを踏むことで、Power BIデスクトップでデータセットのスコアリングを行うことができます。

  1. スコアリングしたいデータセットを開く
  2. Transform Dataに移動
  3. 左のパネルを右クリックし、Create New Queryを選択
  4. View > Advanced Editorに移動
  5. 適切なDATABRICKS_API_TOKENMODEL_VERSION_URI で置換した後で、以下のコードスニペットでクエリーのボディを置換
(dataset as table ) as table =>
let
  call_predict = (dataset as table ) as list =>
  let
    apiToken = DATABRICKS_API_TOKEN,
    modelUri = MODEL_VERSION_URI,
    responseList = Json.Document(Web.Contents(modelUri,
      [
        Headers = [
          #"Content-Type" = "application/json",
          #"Authorization" = Text.Format("Bearer #{0}", {apiToken})
        ],
        Content = Json.FromValue(dataset)
      ]
    ))
  in
    responseList,
  predictionList = List.Combine(List.Transform(Table.Split(dataset, 256), (x) => call_predict(x))),
  predictionsTable = Table.FromList(predictionList, (x) => {x}, {"Prediction"}),
  datasetWithPrediction = Table.Join(
    Table.AddIndexColumn(predictionsTable, "index"), "index",
    Table.AddIndexColumn(dataset, "index"), "index")
in
  datasetWithPrediction
  1. 好きなモデル名でクエリーの名前をつける
  2. データセットに対してアドバンスドクエリーエディタを開き、モデル関数を適用

サーバーによって受け付けられる入力データのフォーマットに関しては、MLflow documentationをご覧ください。

サーブされたモデルのモニタリング

サービングページでは、サービングクラスターのステータスとそれぞれのモデルバージョンが表示されます。

  • サービングクラスターの状態を調査するには、当該モデルの全てのサービングイベントを表示するModel Eventsタブを使用します。
  • 個々のモデルバージョンの状態を調査するには、Model Versionsタブをクリックし、LogsあるいはVersion Eventsタブをスクロールします。

サービングクラスターのカスタマイズ

サービングクラスターをカスタマイズするには、ServingタブのCluster Settingsタブを使用します。

  • サービングクラスターのメモリーサイズとコア数を変更するには、希望するクラスター設定を選択するためにInstance Typeドロップダウンメニューを選択します。Saveをクリックすると、既存のクラスターは停止され、指定された設定による新規クラスターが作成されます。
  • タグを追加するには、Add Tagフィールドに名前と値を入力し、Addをクリックします。
  • 既存のタグを編集、削除するには、TagsテーブルのActionsカラムのアイコンの一つをクリックします。

既知のエラー

ResolvePackageNotFound: pyspark=3.1.0

このエラは、モデルがpysparkに依存しており、Databricksランタイム 8.xで記録された際に発生します。このエラーに遭遇した場合には、モデルを記録する際にconda_envパラメーターを用いて、明示的にpysparkのバージョンを指定してください。

Databricks 無料トライアル

Databricks 無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?