0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DatabricksのGPUクラスターでトレーニングしたPyTorchモデルをMLflowモデルサービングで提供する

Last updated at Posted at 2022-02-08

GPUクラスターでPytorchモデルをトレーニングし、MLflowのモデルサービングでモデルをサーブしようとすると、以下のエラーに遭遇します。これはGPUでトレーニングされたモデルをCPUに再マッピングしていないためです。

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

上のメッセージでは、torch.load with map_location=torch.device('cpu')を使うように指示が出ていますが、MLflowのモデルサービングのサーバ上でモデルがロードされる際にこの処理を埋め込むには工夫が必要となります。

このエラーを回避するために、モデルサービングを行う際にCPUへのマッピングを行う処理を組み込んだPyFuncカスタムモデルを構築して、カスタムモデルをMLflowでロギング・サービングします。

本書では、PyTorchによるMNIST文字認識モデルの構築のシナリオで上記対策を行います。

ライブラリの読み込み

Python
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from collections import namedtuple

import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

import time
import numpy as np

from matplotlib.pyplot import imshow

from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
from mlflow.tracking.client import MlflowClient

準備

パラメーター

このノートブックはGPUクラスター、非GPUクラスターの両方で動作しますが、GPUクラスターを使うことをお勧めします。ウィジェットUSE_GPUyesに設定すると、PyTorchはCUDA-enabled GPUを活用して、より高速に動作します。

  • batch_size: トレーニングミニバッチのサンプル数
  • test_batch_size: テスト/推論ミニバッチのサンプル数。テスト/推論では後ろ向きのパスを行う必要がないため、通常これはbatch_sizeよりも大きな数となります。

トレーニングアルゴリズムは、epochsに対する一連のトレーニングセットを活用します。
学習率lrとモーメンタムファクターmomentumを用いたStochastic Gradient Descentを使用します。
詳細はPyTorch SGD implementationを参照ください。

Python
MNIST_DIR = '/tmp/data/mnist'
use_cuda = USE_GPU and torch.cuda.is_available()

Params = namedtuple('Params', ['batch_size', 'test_batch_size', 'epochs', 'lr', 'momentum', 'seed', 'cuda', 'log_interval'])
args = Params(batch_size=64, test_batch_size=1000, epochs=10, lr=0.01, momentum=0.5, seed=1, cuda=use_cuda, log_interval=200)

MNISTデータセットの準備

データセットをダウンロードし、行をシャッフルし、バッチを作成して特徴量の標準化を行います。

Python
torch.manual_seed(args.seed)

data_transform_fn = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))])

train_loader = torch.utils.data.DataLoader(
        datasets.MNIST(MNIST_DIR, train=True, download=True,
                       transform=data_transform_fn),
        batch_size=args.batch_size, shuffle=True, num_workers=1)

test_loader = torch.utils.data.DataLoader(
        datasets.MNIST(MNIST_DIR, train=False, 
                       transform=data_transform_fn),
        batch_size=args.test_batch_size, shuffle=True, num_workers=1)

モデルの定義

CNNモデルの構築

ここでは、2つのconvolutionalレイヤー (conv)、2つのfully connected (fc)レイヤーを持つCNNモデルを作成します。また、convレイヤーとfcレイヤーの間にdropoutレイヤーを追加します。

Python
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        #print("x.type:", type(x))
      
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)
      
model = Net()
model.share_memory() # 勾配は遅延で配置されるので、ここではこれを共有しません。

Pyfuncカスタムモデルの構築

モデルサービングのサーバーにデプロイする際にCPUにマッピングを行う処理を追加します。

Python
import mlflow.pytorch
from mlflow.utils.environment import _mlflow_conda_env

class PytorchWrapper(mlflow.pyfunc.PythonModel):
    """
    Pytorchモデルをトレーニング、使用するクラス
    """

    def load_context(self, context):
        """このメソッドはpyfunc.load_model()を用いてMLflowモデルがロードされ、Pythonモデルが構築されると呼び出されます
        Args:
            context: モデルのアーティファクトが保存されるMLflowコンテキスト
        """
        import torch
        import torchvision

        # アーティファクトとして保存されているモデルのロード
        # モデルサービングのサーバーはCPUマシンなので、CPUデバイスへの再マッピングをおこないます
        device = torch.device('cpu') 
        
        model = Net()
        model.load_state_dict(torch.load(context.artifacts["torch_model_path"], map_location=device))
        model.eval()

        self.model = model

    def predict(self, context, model_input):
        """これは抽象化関数です。PyTorchモデルを取り出すためにメソッドをカスタマイズします。
        Args:
            context ([type]): モデルのアーティファクトが格納されるMLflowコンテキスト
            model_input ([type]): 検知するデータ
        Returns:
            [type]: ロードされたモデルアーティファクト
        """
        import pandas as pd
        import torch
        
        print("model_input.type", type(model_input))
        print("model_input.shape", model_input.shape)
        
        # 入力がTensorではない場合にはTensorに変換します。これはノートブックからの呼び出しと、モデルサービングでは入力値の型が異なるためです
        if not torch.is_tensor(model_input):
          model_input = torch.from_numpy(model_input.astype(np.float32))
        ##with torch.no_grad():

        print("converted model_input.type", type(model_input))
        print("converted model_input.shape", model_input.shape)
        
        prediction = self.model(model_input)

        print("prediction.type", type(prediction))
        print("prediction.shape", prediction.shape)
        
        # 出力がTensorの場合にはnumpy.arrayに変換します
        if isinstance(prediction, torch.Tensor):
          prediction = prediction.cpu().detach().numpy().copy()
        
        return prediction

モデルのトレーニング

モデルをトレーニングするために、Negative Log Likelihoodロスを定義し、momentumを用いてStochastic Gradient Descentオプティマイザを作成します。optimizer.step()に続いてloss.backward()を呼び出すことで、モデルのパラメーターを更新します。

Python
def train_epoch(epoch, args, model, data_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(data_loader):
        if args.cuda:
            data, target = data.cuda(), target.cuda()      
        data, target = Variable(data), Variable(target)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(data_loader.dataset),
                100. * batch_idx / len(data_loader), loss.data.item()))


def test_epoch(model, data_loader):
    model.eval()
    test_loss = 0
    correct = 0
    for data, target in data_loader:
        if args.cuda:
            data, target = data.cuda(), target.cuda()      
        data, target = Variable(data, volatile=True), Variable(target)
        output = model(data)
        test_loss += F.nll_loss(output, target, size_average=False).data.item() # バッチの損失を加算します
        pred = output.data.max(1)[1] # 最大のlog-probabilityのインデックスを取得します
        correct += pred.eq(target.data).cpu().sum()

    test_loss /= len(data_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(data_loader.dataset),
        100. * correct / len(data_loader.dataset)))

MLflowでカスタムモデルをロギングします。

Python
with mlflow.start_run() as run:  

  # エポックに対してトレーニングループを起動します (ループごとに評価)
  if args.cuda:
      model = model.cuda()

  optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)

  for epoch in range(1, args.epochs + 1):
      train_epoch(epoch, args, model, train_loader, optimizer)
      test_epoch(model, test_loader)
      
      # モデルの保存パス
      PATH = "model.pt"
      # モデルの保存      
      torch.save(model.state_dict(), PATH)
      
      # アーティファクトとしてモデルを参照できるようにする
      artifacts = {"torch_model_path": PATH}
  
      # MLflowにはモデルをサービングする際に用いられるconda環境を作成するユーティリティが含まれています。
      # 必要な依存関係がconda.yamlに保存され、モデルとともに記録されます。    
      conda_env =  _mlflow_conda_env(
        additional_conda_deps=None,
        additional_pip_deps=["torch==1.10.0", 
                             "torchvision==0.11.1",
                            ],
        additional_conda_channels=None,
      )    
  
  # モデルのロギング
  mlflow.pyfunc.log_model("model",
                          python_model=PytorchWrapper(),
                          artifacts=artifacts,
                          conda_env=conda_env)

モデルによる推論

上でトレーニングしたモデルの動作確認を行います。

Python
# シグネチャを定義するためにモデルの入出力を取得します
single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)

single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor').to("cuda") # DoubleTensorの代替

out_predict = model(single_loaded_img_conv)
print(out_predict)

Screen Shot 2022-02-08 at 16.11.35.png

モデルサービング

モデルレジストリへの登録

シグネチャの定義

トレーニングしたモデルをモデルレジストリに登録します。登録する際に入出力のスキーマとなるシグネチャ、入力サンプルを定義します。

Python
logged_run = f"runs:/{run.info.run_id}/model"
loaded_model = mlflow.pyfunc.load_model(logged_run)

# シグネチャを定義するためにモデルの入出力を取得します
single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)

single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor') # DoubleTensorの代替

out_predict = loaded_model.predict(single_loaded_img_conv)
print(out_predict)

# MLflowモデルレジストリに格納するためにtensor入力を用いてモデルのシグネチャを作成します
input_img_np = single_loaded_img_conv.to('cpu').detach().numpy().copy()

signature = infer_signature(input_img_np, out_predict)

# どのように見えるかを確認します
print("sigunature:", signature)

Screen Shot 2022-02-08 at 16.19.49.png

入力サンプルの準備

Python
# np.expand_dims() は、第2引数の axis で指定した場所の直前に dim=1 を挿入します
input_example = np.expand_dims(input_img_np[0], axis=0)
input_example.shape

上記のシグネチャ、入力サンプルを指定してモデルレジストリに登録します。

Python
# np.expand_dims() は、第2引数の axis で指定した場所の直前に dim=1 を挿入します
input_example = np.expand_dims(input_img_np[0], axis=0)
input_example.shape

Screen Shot 2022-02-08 at 16.21.45.png

モデルレジストリに登録されたモデルの動作確認

モデルレジストリから最新バージョン番号を取得して、モデルをロードします。

Python
# モデルレジストリの最新バージョンを取得
client = MlflowClient()
results = client.get_latest_versions(model_name, stages=["None"])

latest_version = ''

for res in results:
  print("name={}; run_id={}; version={}".format(res.name, res.run_id, res.version))
  latest_version = res.version

print(f"latest version in {model_name}:", latest_version)  
Python
# モデルがreadyになるまで待ちます
def wait_until_ready(model_name, model_version):
  client = MlflowClient()
  for _ in range(10):
    model_version_details = client.get_model_version(
      name=model_name,
      version=model_version,
    )
    status = ModelVersionStatus.from_string(model_version_details.status)
    print("Model %s version %s status: %s" % (model_name, model_version, ModelVersionStatus.to_string(status)))
    if status == ModelVersionStatus.READY:
      break
    time.sleep(1)

wait_until_ready(model_name, latest_version)

Screen Shot 2022-02-08 at 16.31.57.png

モデルレジストリからロードしたモデルで動作確認をします。

Python
# トレーニングしたモデル
logged_model_uri = f'models:/{model_name}/{latest_version}'
# モデルのロード
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)

single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)

single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor') # DoubleTensorの代替

input_img_np = single_loaded_img_conv.to('cpu').detach().numpy().copy()
#print(input_img_np)
input_data = np.expand_dims(input_img_np[0], axis=0)

#with torch.no_grad():
out_predict = loaded_model.predict(input_img_np)
print(out_predict)

REST API経由でのモデルの呼び出し

Python
import os

# トークンは機密性の高い情報なので、ノートブックに直接記載しません。事前にCLIでシークレットとして登録しておいたトークンを呼び出します
token = dbutils.secrets.get("demo-token-takaaki.yayoi", "token")
os.environ["DATABRICKS_TOKEN"] = token

REST API呼び出しを行う関数を準備します。

Python
import os
import requests
import numpy as np
import pandas as pd

# tensorをエンドポイントに引き渡す際のフォーマットに変換
def create_tf_serving_json(data):
  return {"inputs": {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def score_model(dataset):
  # モデルのREST APIエンドポイント(モデルサービングの画面で確認できます)
  url = f'https://<Databricksホスト名>/model/{model_name}/{latest_version}/invocations'
  #print(url)
  headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}'}
  
  # datasetがデータフレームの場合はJSONに変換、そうでない場合はtensorを渡す際のJSONにフォーマットに変換
  data_json = dataset.to_dict(orient='split') if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
  #print(data_json)
  
  # API呼び出し
  response = requests.request(method='POST', headers=headers, url=url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

最後にモデルサービングされているモデルをREST API経由で呼び出します。

Python
data = train_loader.dataset.data[10]
print("data.type:", type(data))
print("data.shape:", data.shape)
imshow(data)

data_conv = data[None, None]
data_conv = data_conv.type('torch.FloatTensor') # DoubleTensorの代替
#print(data_conv.shape)

input_img_np = data_conv.to('cpu').detach().numpy().copy()
#print(input_img_np)
input_data = np.expand_dims(input_img_np[0], axis=0)

# モデルサービングは、比較的小さいデータバッチにおいて低レーテンシーで予測するように設計されています。
served_predictions = score_model(input_data)
print("分類結果:", served_predictions)

Screen Shot 2022-02-08 at 16.38.40.png

また、モデルサービングの画面で動作確認することも可能です。

Screen Shot 2022-02-08 at 16.43.04.png

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?