GPUクラスターでPytorchモデルをトレーニングし、MLflowのモデルサービングでモデルをサーブしようとすると、以下のエラーに遭遇します。これはGPUでトレーニングされたモデルをCPUに再マッピングしていないためです。
RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.
上のメッセージでは、torch.load with map_location=torch.device('cpu')
を使うように指示が出ていますが、MLflowのモデルサービングのサーバ上でモデルがロードされる際にこの処理を埋め込むには工夫が必要となります。
このエラーを回避するために、モデルサービングを行う際にCPUへのマッピングを行う処理を組み込んだPyFuncカスタムモデルを構築して、カスタムモデルをMLflowでロギング・サービングします。
本書では、PyTorchによるMNIST文字認識モデルの構築のシナリオで上記対策を行います。
ライブラリの読み込み
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from collections import namedtuple
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import time
import numpy as np
from matplotlib.pyplot import imshow
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus
from mlflow.tracking.client import MlflowClient
準備
パラメーター
このノートブックはGPUクラスター、非GPUクラスターの両方で動作しますが、GPUクラスターを使うことをお勧めします。ウィジェットUSE_GPU
をyes
に設定すると、PyTorchはCUDA-enabled GPUを活用して、より高速に動作します。
-
batch_size
: トレーニングミニバッチのサンプル数 -
test_batch_size
: テスト/推論ミニバッチのサンプル数。テスト/推論では後ろ向きのパスを行う必要がないため、通常これはbatch_size
よりも大きな数となります。
トレーニングアルゴリズムは、epochs
に対する一連のトレーニングセットを活用します。
学習率lr
とモーメンタムファクターmomentum
を用いたStochastic Gradient Descentを使用します。
詳細はPyTorch SGD implementationを参照ください。
MNIST_DIR = '/tmp/data/mnist'
use_cuda = USE_GPU and torch.cuda.is_available()
Params = namedtuple('Params', ['batch_size', 'test_batch_size', 'epochs', 'lr', 'momentum', 'seed', 'cuda', 'log_interval'])
args = Params(batch_size=64, test_batch_size=1000, epochs=10, lr=0.01, momentum=0.5, seed=1, cuda=use_cuda, log_interval=200)
MNISTデータセットの準備
データセットをダウンロードし、行をシャッフルし、バッチを作成して特徴量の標準化を行います。
torch.manual_seed(args.seed)
data_transform_fn = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))])
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(MNIST_DIR, train=True, download=True,
transform=data_transform_fn),
batch_size=args.batch_size, shuffle=True, num_workers=1)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(MNIST_DIR, train=False,
transform=data_transform_fn),
batch_size=args.test_batch_size, shuffle=True, num_workers=1)
モデルの定義
CNNモデルの構築
ここでは、2つのconvolutionalレイヤー (conv)、2つのfully connected (fc)レイヤーを持つCNNモデルを作成します。また、convレイヤーとfcレイヤーの間にdropoutレイヤーを追加します。
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
#print("x.type:", type(x))
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
model.share_memory() # 勾配は遅延で配置されるので、ここではこれを共有しません。
Pyfuncカスタムモデルの構築
モデルサービングのサーバーにデプロイする際にCPUにマッピングを行う処理を追加します。
import mlflow.pytorch
from mlflow.utils.environment import _mlflow_conda_env
class PytorchWrapper(mlflow.pyfunc.PythonModel):
"""
Pytorchモデルをトレーニング、使用するクラス
"""
def load_context(self, context):
"""このメソッドはpyfunc.load_model()を用いてMLflowモデルがロードされ、Pythonモデルが構築されると呼び出されます
Args:
context: モデルのアーティファクトが保存されるMLflowコンテキスト
"""
import torch
import torchvision
# アーティファクトとして保存されているモデルのロード
# モデルサービングのサーバーはCPUマシンなので、CPUデバイスへの再マッピングをおこないます
device = torch.device('cpu')
model = Net()
model.load_state_dict(torch.load(context.artifacts["torch_model_path"], map_location=device))
model.eval()
self.model = model
def predict(self, context, model_input):
"""これは抽象化関数です。PyTorchモデルを取り出すためにメソッドをカスタマイズします。
Args:
context ([type]): モデルのアーティファクトが格納されるMLflowコンテキスト
model_input ([type]): 検知するデータ
Returns:
[type]: ロードされたモデルアーティファクト
"""
import pandas as pd
import torch
print("model_input.type", type(model_input))
print("model_input.shape", model_input.shape)
# 入力がTensorではない場合にはTensorに変換します。これはノートブックからの呼び出しと、モデルサービングでは入力値の型が異なるためです
if not torch.is_tensor(model_input):
model_input = torch.from_numpy(model_input.astype(np.float32))
##with torch.no_grad():
print("converted model_input.type", type(model_input))
print("converted model_input.shape", model_input.shape)
prediction = self.model(model_input)
print("prediction.type", type(prediction))
print("prediction.shape", prediction.shape)
# 出力がTensorの場合にはnumpy.arrayに変換します
if isinstance(prediction, torch.Tensor):
prediction = prediction.cpu().detach().numpy().copy()
return prediction
モデルのトレーニング
モデルをトレーニングするために、Negative Log Likelihoodロスを定義し、momentumを用いてStochastic Gradient Descentオプティマイザを作成します。optimizer.step()
に続いてloss.backward()
を呼び出すことで、モデルのパラメーターを更新します。
def train_epoch(epoch, args, model, data_loader, optimizer):
model.train()
for batch_idx, (data, target) in enumerate(data_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
data, target = Variable(data), Variable(target)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(data_loader.dataset),
100. * batch_idx / len(data_loader), loss.data.item()))
def test_epoch(model, data_loader):
model.eval()
test_loss = 0
correct = 0
for data, target in data_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
data, target = Variable(data, volatile=True), Variable(target)
output = model(data)
test_loss += F.nll_loss(output, target, size_average=False).data.item() # バッチの損失を加算します
pred = output.data.max(1)[1] # 最大のlog-probabilityのインデックスを取得します
correct += pred.eq(target.data).cpu().sum()
test_loss /= len(data_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(data_loader.dataset),
100. * correct / len(data_loader.dataset)))
MLflowでカスタムモデルをロギングします。
with mlflow.start_run() as run:
# エポックに対してトレーニングループを起動します (ループごとに評価)
if args.cuda:
model = model.cuda()
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train_epoch(epoch, args, model, train_loader, optimizer)
test_epoch(model, test_loader)
# モデルの保存パス
PATH = "model.pt"
# モデルの保存
torch.save(model.state_dict(), PATH)
# アーティファクトとしてモデルを参照できるようにする
artifacts = {"torch_model_path": PATH}
# MLflowにはモデルをサービングする際に用いられるconda環境を作成するユーティリティが含まれています。
# 必要な依存関係がconda.yamlに保存され、モデルとともに記録されます。
conda_env = _mlflow_conda_env(
additional_conda_deps=None,
additional_pip_deps=["torch==1.10.0",
"torchvision==0.11.1",
],
additional_conda_channels=None,
)
# モデルのロギング
mlflow.pyfunc.log_model("model",
python_model=PytorchWrapper(),
artifacts=artifacts,
conda_env=conda_env)
モデルによる推論
上でトレーニングしたモデルの動作確認を行います。
# シグネチャを定義するためにモデルの入出力を取得します
single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)
single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor').to("cuda") # DoubleTensorの代替
out_predict = model(single_loaded_img_conv)
print(out_predict)
モデルサービング
モデルレジストリへの登録
シグネチャの定義
トレーニングしたモデルをモデルレジストリに登録します。登録する際に入出力のスキーマとなるシグネチャ、入力サンプルを定義します。
logged_run = f"runs:/{run.info.run_id}/model"
loaded_model = mlflow.pyfunc.load_model(logged_run)
# シグネチャを定義するためにモデルの入出力を取得します
single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)
single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor') # DoubleTensorの代替
out_predict = loaded_model.predict(single_loaded_img_conv)
print(out_predict)
# MLflowモデルレジストリに格納するためにtensor入力を用いてモデルのシグネチャを作成します
input_img_np = single_loaded_img_conv.to('cpu').detach().numpy().copy()
signature = infer_signature(input_img_np, out_predict)
# どのように見えるかを確認します
print("sigunature:", signature)
入力サンプルの準備
# np.expand_dims() は、第2引数の axis で指定した場所の直前に dim=1 を挿入します
input_example = np.expand_dims(input_img_np[0], axis=0)
input_example.shape
上記のシグネチャ、入力サンプルを指定してモデルレジストリに登録します。
# np.expand_dims() は、第2引数の axis で指定した場所の直前に dim=1 を挿入します
input_example = np.expand_dims(input_img_np[0], axis=0)
input_example.shape
モデルレジストリに登録されたモデルの動作確認
モデルレジストリから最新バージョン番号を取得して、モデルをロードします。
# モデルレジストリの最新バージョンを取得
client = MlflowClient()
results = client.get_latest_versions(model_name, stages=["None"])
latest_version = ''
for res in results:
print("name={}; run_id={}; version={}".format(res.name, res.run_id, res.version))
latest_version = res.version
print(f"latest version in {model_name}:", latest_version)
# モデルがreadyになるまで待ちます
def wait_until_ready(model_name, model_version):
client = MlflowClient()
for _ in range(10):
model_version_details = client.get_model_version(
name=model_name,
version=model_version,
)
status = ModelVersionStatus.from_string(model_version_details.status)
print("Model %s version %s status: %s" % (model_name, model_version, ModelVersionStatus.to_string(status)))
if status == ModelVersionStatus.READY:
break
time.sleep(1)
wait_until_ready(model_name, latest_version)
モデルレジストリからロードしたモデルで動作確認をします。
# トレーニングしたモデル
logged_model_uri = f'models:/{model_name}/{latest_version}'
# モデルのロード
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)
single_loaded_img = train_loader.dataset.data[0]
imshow(single_loaded_img)
single_loaded_img_conv = single_loaded_img[None, None]
single_loaded_img_conv = single_loaded_img_conv.type('torch.FloatTensor') # DoubleTensorの代替
input_img_np = single_loaded_img_conv.to('cpu').detach().numpy().copy()
#print(input_img_np)
input_data = np.expand_dims(input_img_np[0], axis=0)
#with torch.no_grad():
out_predict = loaded_model.predict(input_img_np)
print(out_predict)
REST API経由でのモデルの呼び出し
import os
# トークンは機密性の高い情報なので、ノートブックに直接記載しません。事前にCLIでシークレットとして登録しておいたトークンを呼び出します
token = dbutils.secrets.get("demo-token-takaaki.yayoi", "token")
os.environ["DATABRICKS_TOKEN"] = token
REST API呼び出しを行う関数を準備します。
import os
import requests
import numpy as np
import pandas as pd
# tensorをエンドポイントに引き渡す際のフォーマットに変換
def create_tf_serving_json(data):
return {"inputs": {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}
def score_model(dataset):
# モデルのREST APIエンドポイント(モデルサービングの画面で確認できます)
url = f'https://<Databricksホスト名>/model/{model_name}/{latest_version}/invocations'
#print(url)
headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}'}
# datasetがデータフレームの場合はJSONに変換、そうでない場合はtensorを渡す際のJSONにフォーマットに変換
data_json = dataset.to_dict(orient='split') if isinstance(dataset, pd.DataFrame) else create_tf_serving_json(dataset)
#print(data_json)
# API呼び出し
response = requests.request(method='POST', headers=headers, url=url, json=data_json)
if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')
return response.json()
最後にモデルサービングされているモデルをREST API経由で呼び出します。
data = train_loader.dataset.data[10]
print("data.type:", type(data))
print("data.shape:", data.shape)
imshow(data)
data_conv = data[None, None]
data_conv = data_conv.type('torch.FloatTensor') # DoubleTensorの代替
#print(data_conv.shape)
input_img_np = data_conv.to('cpu').detach().numpy().copy()
#print(input_img_np)
input_data = np.expand_dims(input_img_np[0], axis=0)
# モデルサービングは、比較的小さいデータバッチにおいて低レーテンシーで予測するように設計されています。
served_predictions = score_model(input_data)
print("分類結果:", served_predictions)
また、モデルサービングの画面で動作確認することも可能です。