こちらで説明されているノートブックとは別に、より簡易的で理解しやすいものが公開されていました。
こちらの最初に表示されているGet startedのリンクからアクセスできるノートブックです。
日本語にローカライズしたノートブックはこちらです。
クラスターの準備
GPUクラスターを使用します。テーブルはUnity Catalogに保存していますので、シングルユーザーアクセスモードを指定しています。
00_IngestionPCB
このソリューションアクセラレータでは、製品の品質調査のためのエンドツーエンドのパイプラインのデプロイにおいて、Databricksがどのように役立つのかを説明します。モデルはサーバレスリアルタイム推論を用いてデプロイされます。
Visual Anomaly (VisA)検知データセットを用い、PCB画像の以上を検知するためのパイプラインを構築します。
なぜ画像による品質調査を?
画像による品質調査は製造業の文脈においては一般的な課題となっています。スマートマニュファクチャリングを提供する際の鍵となります。
プロダクションレベルのパイプラインの実装
事前学習済みディープラーニングモデル、転送学習、高レベルのフレームワークによって、近年では画像分類問題は簡単になってきています。データサイエンスチームはそのようなモデルをクイックにデプロイすることはできますが、プロダクションレベルのエンドツーエンドのパイプラインの実装、画像の利用、MLOps/ガバナンスの必要性、最終的な結果の提供においては、依然として本当の課題が存在し続けています。
Databricksレイクハウスは、このような全体的なプロセスをシンプルにするように設計されており、データサイエンティストはコアのユースケースにフォーカスすることができます。
品質調査モデルを構築するために、Torchvisionを使用します。しかし、他のライブラリで同じアーキテクチャを活用することも可能です。TorchvisionライブラリはPyTorchプロジェクトの一部であり、ディープラーニングで人気のフレームワークとなっています。Torchvisionは、モデルアーキテクチャ、よく使われるデータセット、画像のトランスフォーマーと共に提供されています。
パイプライン構築の最初のステップは、データの取り込みです。Databricksでは、画像(非構造化データ)を含む任意のソースのデータをロードすることができます。これは、効率的かつ分散された方法で画像のコンテンツとともにテーブルに格納され、ラベルと関連づけられます。
品質調査画像パイプライン
これが我々が構築するパイプラインです。2つのデータセットを取り込みます。すなわち:
- PCBを含む生のサテライトイメージ(jpg)
- CSVとして保存されている不良のタイプを示すラベル
最初にこのデータをインクリメンタルにロードするデータパイプラインにフォーカスし、最終的なゴールドテーブルを作成します。
このテーブルは、我々の画像からリアルタイムで異常を検知するために、ML分離モデルをトレーニングするために活用されます!
https://registry.opendata.aws/visa/ からデータセットをダウンロード
https://registry.opendata.aws/visa/からデータセットをダウンロードするためにbash
コマンドを使います。
データはAWS S3に格納されているので、AWS CLIライブラリ(awscli
)をインストールする必要があります。
%pip install awscli
%sh
mkdir -p /tmp/data
aws s3 cp --no-progress --no-sign-request s3://amazon-visual-anomaly/VisA_20220922.tar /tmp
download: s3://amazon-visual-anomaly/VisA_20220922.tar to ../../tmp/VisA_20220922.tar
%sh
mkdir -p /tmp/data
tar xf /tmp/VisA_20220922.tar --no-same-owner -C /tmp/data/
いくつかのPCB画像を見てみましょう
ネイティブなPythonの方法でmatplotlib
を使って画像を表示することができます。
正常な画像がどのようなものであるのか、異常があるものがどのようなものであるかを見てみましょう。
from PIL import Image
import matplotlib.pyplot as plt
def display_image(path, dpi=300):
img = Image.open(path)
width, height = img.size
plt.figure(figsize=(width / dpi, height / dpi))
plt.imshow(img, interpolation="nearest", aspect="auto")
display_image("/tmp/data/pcb1/Data/Images/Normal/0000.JPG")
display_image("/tmp/data/pcb1/Data/Images/Anomaly/000.JPG")
データをDBFSに移動しましょう
クイックな覚書: Databricksファイルシステム(DBFS)は、Databricksワークスペースにマウントされ、Databricksクラスターから利用できる分散ファイルシステムです。DBFSは、ネイティブなクラウドストレージのAPIコールをUnixライクなファイルシステムコールにマッピングする、スケーラブルなオブジェクトストレージ上の抽象化レイヤーとなります。
%sh
rm -rf /dbfs/pcb1
mkdir -p /dbfs/pcb1/labels
cp -r /tmp/data/pcb1/Data/Images/ /dbfs/pcb1/
cp /tmp/data/pcb1/image_anno.csv /dbfs/pcb1/labels/
%sql
USE takaakiyayoi_catalog.pcb;
DROP TABLE IF EXISTS circuit_board;
DROP TABLE IF EXISTS circuit_board_gold;
DROP TABLE IF EXISTS circuit_board_label;
cloud_storage_path="/pcb1"
Auto LoaderによるCSVラベルファイルをロード
DatabricksのAuto Loaderを用いることで、CSVファイルを簡単にロードすることができます。
from pyspark.sql.functions import substring_index, col
(
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", True)
.option("cloudFiles.schemaLocation", f"{cloud_storage_path}/circuit_board_label_schema")
.load(f"{cloud_storage_path}/labels/")
.withColumn("filename", substring_index(col("image"), "/", -1))
.select("filename", "label")
.withColumnRenamed("label", "labelDetail")
.writeStream.trigger(availableNow=True)
.option("checkpointLocation", f"{cloud_storage_path}/circuit_board_label_checkpoint")
.toTable("circuit_board_label")
.awaitTermination()
)
display(spark.table("circuit_board_label"))
Auto Loaderによるバイナリーファイルのロード
これで、画像をロードするためにAuto Loaderを用い、ラベルのカラムを作成するためにspark関数を活用することができます。
また、テーブルとして画像のコンテンツとラベルを簡単に表示することができます。
from pyspark.sql.functions import substring_index, col, when
(
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobFilter", "*.JPG")
.option("recursiveFileLookup", "true")
.option("cloudFiles.schemaLocation", f"{cloud_storage_path}/circuit_board_schema")
.load(f"{cloud_storage_path}/Images/")
.withColumn("filename", substring_index(col("path"), "/", -1))
.withColumn(
"labelName",
when(col("path").contains("Anomaly"), "anomaly").otherwise("normal"),
)
.withColumn("label", when(col("labelName").eqNullSafe("anomaly"), 1).otherwise(0))
.select("filename", "content", "label", "labelName")
.writeStream.trigger(availableNow=True)
.option("checkpointLocation", f"{cloud_storage_path}/circuit_board_checkpoint")
.toTable("circuit_board")
.awaitTermination()
)
display(spark.table("circuit_board"))
ラベルと画像テーブルをマージしましょう
%sql
CREATE TABLE circuit_board_gold as (
select
cb.*,
labelDetail
from
circuit_board cb
inner join circuit_board_label cbl on cb.filename = cbl.filename
);
画像テーブルに対する自動最適化を有効化することができます
自動最適化は2つの補完的な機能となります: 最適化書き込みとオートコンパクションです。
%sql
ALTER TABLE circuit_board_gold SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true);
ALTER TABLE circuit_board SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)
このテーブルに対して任意のSQLコマンドを実行することができます。
%sql
select
*
from
circuit_board_gold
limit 10
データサインスチームが活用するデータセットの準備ができました
これですべてです!プロダクションレベルのデータ取り込みパイプラインをデプロイしました。
我々の画像はインクリメンタルに取り込まれ、ラベルデータセットと結合されます。
異常検知に必要なモデルを構築するために、データサイエンティストによってこのデータがどのように活用されるのかを見てみましょう。
01_ImageClassificationPytorch
pytorchモデルの実装とデプロイ
データサイエンティストとしての次のステップは、画像分類を実行するためにMLモデルを実装することです。
トレーニングデータセットとして以前のデータパイプラインで構築したゴールドテーブルを再利用します。
torchvisionを用いることで、このようなモデルの構築が非常にシンプルになります。
MLOpsのステップ
画像分類モデルの構築は簡単に終わるかもしれませんが、プロダクション環境にモデルをデプロイすることはさらに難しいものとなります。
Databricksにおいては、以下を提供するMLflowの助けを借りて、このプロセスをシンプルにし、価値創出に至るジャーニーを加速します。
- 進捗を追跡し続ける自動エクスペリメント追跡
- ベストなモデルを得るためにhyperoptを用いたシンプルかつ分散されたハイパーパラメータチューニング
- MLフレームワークを抽象化し、MLflowにモデルをパッケージング
- ガバナンスのためのモデルレジストリ
- バッチやリアルタイムのサービング(1クリックでのデプロイメント)
model_name = "cv_pcb_classification_taka" # 適宜変更
GPUの有効化
ディープラーニングでは、トレーニングでGPUを用いることが合理的です。
import torch
# GPUを利用できるかどうかをチェック
if not torch.cuda.is_available(): # gpuが利用できるかどうか
raise Exception(
"Please use a GPU-cluster for model training, CPU instances will be too slow"
)
# Spark/Pythonバージョンの確認
import sys
print(
"You are running a Databricks {0} cluster leveraging Python {1}".format(
spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion"),
sys.version.split(" ")[0],
)
)
You are running a Databricks 13.3.x-gpu-ml-scala2.12 cluster leveraging Python 3.10.12
from petastorm.spark import SparkDatasetConverter, make_spark_converter
from petastorm import TransformSpec
from PIL import Image
import torchvision
import torch
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import horovod.torch as hvd
from sparkdl import HorovodRunner
import mlflow
import pyspark.sql.functions as f
import numpy as np
from functools import partial
import io
import uuid
username = (
dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
)
mlflow.set_experiment("/Users/{}/pcbqi".format(username))
petastorm_path = (
f"file:///dbfs/tmp/petastorm/{str(uuid.uuid4())}/cache" # petastormのキャッシュファイルの格納場所
)
データをトレーニング/テストデータセットとして分割
他のMLモデルと同様に、画像をトレーニング/テストデータセットに分割するところからスタートします。
%sql
USE takaakiyayoi_catalog.pcb;
# 対象画像の取得
images = spark.table("circuit_board_gold").select(
"content", "label", "filename"
) # 次のステップではユニークなIDとしてパスを使用します
# 階層化された画像のサンプルの取得
images_train = images.sampleBy(
"label", fractions={0: 0.8, 1: 0.8}
) # トレーニングにはそれぞれのクラスの80%をサンプリング
images_test = images.join(
images_train, on="filename", how="leftanti"
) # 残りはテストに使用
# 不要なフィールドを削除
images_train = images_train.drop("filename").repartition(
sc.defaultParallelism
) # パスのIDの削除
images_test = images_test.drop("filename").repartition(sc.defaultParallelism)
# サンプリングの検証
display(
images_train.withColumn("eval_set", f.lit("train"))
.union(images_test.withColumn("eval_set", f.lit("test")))
.groupBy("eval_set", "label")
.agg(f.count("*").alias("instances"))
.orderBy("eval_set", "label")
)
DLとpetastormのためのDeltaテーブル
我々のデータは現在Deltaテーブルに格納されており、Sparkデータフレームとして利用することができます。しかし、pytorchでは特定のデータタイプが期待されます。
これを解決するために、テーブルからデータをモデルに自動で送信するように、PetastormとSparkコンバーターを活用します。このコンバーターは高速処理のために、ローカルキャッシュを用いてデータをインクリメンタルにロードします。詳細は関連ドキュメントをご覧ください。
try:
dbutils.fs.rm(petastorm_path, True)
except:
pass
# petastoreキャッシュの宛先を設定
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, petastorm_path)
# データセットのおおよそのバイト数を特定
bytes_in_train = (
images_train.withColumn("bytes", f.lit(4) + f.length("content"))
.groupBy()
.agg(f.sum("bytes").alias("bytes"))
.collect()[0]["bytes"]
)
bytes_in_test = (
images_test.withColumn("bytes", f.lit(4) + f.length("content"))
.groupBy()
.agg(f.sum("bytes").alias("bytes"))
.collect()[0]["bytes"]
)
# 画像データのキャッシュ
converter_train = make_spark_converter(
images_train,
parquet_row_group_size_bytes=int(bytes_in_train / sc.defaultParallelism),
)
converter_test = make_spark_converter(
images_test, parquet_row_group_size_bytes=int(bytes_in_test / sc.defaultParallelism)
)
NUM_CLASSES = 2 # ラベルは2クラス (0 あるいは 1)
Torchvision
Torchvisionは再利用できる事前学習済みモデルを提供します。
from torchvision.models import (
ViT_B_16_Weights,
vit_b_16,
)
def get_model():
# 事前学習済みモデルにアクセス
weights = ViT_B_16_Weights.DEFAULT
model = vit_b_16(weights=weights)
# 転送学習のために新たな分類レイヤーを追加
num_ftrs = model.heads.head.in_features
# 新たに構成されたモジュールのパラメーターでは、デフォルトで requires_grad=True が設定されています
model.heads.head = torch.nn.Linear(num_ftrs, NUM_CLASSES)
return model, weights
model, weights = get_model()
transforms = weights.transforms()
print(model.heads)
print(transforms)
Sequential(
(head): Linear(in_features=768, out_features=2, bias=True)
)
ImageClassification(
crop_size=[224]
resize_size=[256]
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]
interpolation=InterpolationMode.BILINEAR
)
# 画像変換のロジックを定義
def transform_row(is_train, batch_pd):
# 画像にパイプラインを適用
batch_pd["features"] = batch_pd["content"].map(
lambda x: np.ascontiguousarray(
transforms(Image.open(io.BytesIO(x)).convert("RGB")).numpy()
)
)
# ラベルの変換 (我々の評価メトリックは値が float32 であることを期待します)
# -----------------------------------------------------------
batch_pd["label"] = batch_pd["label"].astype("float32")
# -----------------------------------------------------------
return batch_pd[["features", "label"]]
# 変換の仕様を取得する関数の定義
def get_transform_spec(is_train=True):
spec = TransformSpec(
partial(transform_row, is_train), # 行を取得/変換するために呼び出す関数
edit_fields=[ # 関数によって返却される行のスキーマ
("features", np.float32, (3, 224, 224), False),
("label", np.float32, (), False),
],
selected_fields=["features", "label"], # モデルに送信するスキーマのフィールド
)
return spec
# petastormのキャッシュにアクセスし、仕様を用いてデータを変換
with converter_train.make_torch_dataloader(
transform_spec=get_transform_spec(is_train=True), batch_size=1
) as train_dataloader:
# キャッシュからレコードを取得
for i in iter(train_dataloader):
print(i)
break
BATCH_SIZE = 32 # 一度に32画像を処理
NUM_EPOCHS = 15 # すべての画像を5回繰り返し
from sklearn.metrics import f1_score
def train_one_epoch(
model,
criterion,
optimizer,
scheduler,
train_dataloader_iter,
steps_per_epoch,
epoch,
device,
):
model.train() # モデルをトレーニングモードに設定
# 統計情報
running_loss = 0.0
running_corrects = 0
running_size = 0
# 1つのエポックのデータに対するイテレーション
for step in range(steps_per_epoch):
# petastormから次のバッチを取得
pd_batch = next(train_dataloader_iter)
# 入力の特徴量とラベルを分離
inputs, labels = pd_batch["features"].to(device), pd_batch["label"].to(device)
# トレーニングの履歴を追跡
with torch.set_grad_enabled(True):
# パラメーターの勾配をゼロに
optimizer.zero_grad()
# フォワード
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
probs = torch.nn.functional.softmax(outputs, dim=0)[:, 1]
loss = criterion(probs, labels)
# バックワード + 最適化
loss.backward()
optimizer.step()
# 統計情報
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
running_size += inputs.size(0)
scheduler.step()
epoch_loss = running_loss / steps_per_epoch
epoch_acc = running_corrects.double() / running_size
print("Train Loss: {:.4f} Acc: {:.4f}".format(epoch_loss, epoch_acc))
return epoch_loss, epoch_acc
def evaluate(
model, criterion, test_dataloader_iter, test_steps, device, metric_agg_fn=None
):
model.eval() # モデルを評価モードに設定
# 統計情報
running_loss = 0.0
running_corrects = 0
running_size = 0
f1_scores = 0
# すべての検証データに対してイテレーション
for step in range(test_steps):
pd_batch = next(test_dataloader_iter)
inputs, labels = pd_batch["features"].to(device), pd_batch["label"].to(device)
# メモリーを節約するために評価の際には履歴を追跡しない
with torch.set_grad_enabled(False):
# フォワード
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
probs = torch.nn.functional.softmax(outputs, dim=1)[:, 1]
loss = criterion(probs, labels)
# 統計情報
running_loss += loss.item()
running_corrects += torch.sum(preds == labels.data)
running_size += inputs.size(0)
f1_scores += f1_score(labels.cpu().data, preds.cpu())
# それぞれのミニバッチにおける結果におけるロスを平均
epoch_loss = running_loss / test_steps
epoc_f1 = f1_scores / test_steps
epoch_acc = running_corrects.double() / running_size
# すべえのワーカーのメトリクスを集計するために分散トレーニングで metric_agg_fn を使用
if metric_agg_fn is not None:
epoch_loss = metric_agg_fn(epoch_loss, "avg_loss")
epoch_acc = metric_agg_fn(epoch_acc, "avg_acc")
epoc_f1 = metric_agg_fn(epoc_f1, "avg_f1")
print(
"Testing Loss: {:.4f} Acc: {:.4f} F1: {:.4f}".format(
epoch_loss, epoch_acc, epoc_f1
)
)
return epoch_loss, epoch_acc, epoc_f1
import pandas as pd
pd.options.mode.chained_assignment = None # default='warn'
def train_and_evaluate(lr=0.001):
# 計算処理でGPUを使えるかどうかをチェック
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# モデルの取得
model, _ = get_model()
# 特定されたプロセッサーデバイスのプロセスにモデルを割り当て
model = model.to(device)
# バイナリークロスエントロピーに最適化
criterion = torch.nn.BCELoss()
# 最終レイヤーのパラメーターのみを最適化
filtered_params = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.AdamW(filtered_params, lr=lr)
# 7エポックごとに0.1の因数でLRを減衰
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(
optimizer, step_size=7, gamma=0.1
)
# petastormキャッシュのデータにアクセス
with converter_train.make_torch_dataloader(
transform_spec=get_transform_spec(is_train=True), batch_size=BATCH_SIZE
) as train_dataloader, converter_test.make_torch_dataloader(
transform_spec=get_transform_spec(is_train=False), batch_size=BATCH_SIZE
) as val_dataloader:
# データアクセスのためのイテレータと必要なサイクル数を定義
train_dataloader_iter = iter(train_dataloader)
steps_per_epoch = len(converter_train) // BATCH_SIZE
val_dataloader_iter = iter(val_dataloader)
validation_steps = max(1, len(converter_test) // BATCH_SIZE)
# それぞれのエポックに対して
for epoch in range(NUM_EPOCHS):
print("Epoch {}/{}".format(epoch + 1, NUM_EPOCHS))
print("-" * 10)
# トレーニング
train_loss, train_acc = train_one_epoch(
model,
criterion,
optimizer,
exp_lr_scheduler,
train_dataloader_iter,
steps_per_epoch,
epoch,
device,
)
# 評価
val_loss, val_acc, val_f1 = evaluate(
model, criterion, val_dataloader_iter, validation_steps, device
)
# accで問題のあるタイプを訂正
if type(val_acc) == torch.Tensor:
val_acc = val_acc.item()
return model, val_loss, val_acc, val_f1 # テンソルから値を抽出
# model, loss, acc, f1 = train_and_evaluate(**{'lr':0.00001})
Hyperoptによるハイパーパラメータチューニング
モデルの準備ができました。このようなモデルのチューニングは複雑なものです。アーキテクチャ、エンコーダー、学習率のようなハイパーパラメーターに対する選択肢が存在します。
我々のためにベストなハイパーパラメーターを探してもらえるように、Hyperoptを活用しましょう。Hyperoptは分散処理でも動作し、トレーニングプロセスをスピードアップするために、複数インスタンスで並列にトレーニングを実行できることにも注意してください。
# ハイパーパラメータの探索空間の定義
search_space = {
"lr": hp.loguniform("lr", np.log(1e-5), np.log(1.2e-5)),
}
# hyperoptが期待する形式の結果を返却するトレーニング関数の定義
def train_fn(params):
# 指定されたハイパーパラメーターの設定を用いてモデルをトレーニング
model, loss, acc, f1 = train_and_evaluate(**params)
# 透明性を確保するためにこのイテレーションをmlflowに記録
mlflow.log_metric("accuracy", acc)
mlflow.log_metric("f1", f1)
mlflow.pytorch.log_model(model, "model")
# このイテレーションの結果を返却
return {"loss": loss, "status": STATUS_OK}
# 適用する並列度を決定
if torch.cuda.is_available(): # GPUの場合
nbrWorkers = sc.getConf().get("spark.databricks.clusterUsageTags.clusterWorkers")
if nbrWorkers is None: # gcp
nbrWorkers = sc.getConf().get(
"spark.databricks.clusterUsageTags.clusterTargetWorkers"
)
parallelism = int(nbrWorkers)
if parallelism == 0: # シングルノードのクラスター
parallelism = 1
else: # CPUの場合
parallelism = sc.defaultParallelism
# 分散ハイパーパラメーターチューニングの実行
with mlflow.start_run(run_name=model_name) as run:
argmin = fmin(
fn=train_fn,
space=search_space,
algo=tpe.suggest,
max_evals=1, # ハイパーパラメーターランの合計数 (通常この値はもっと大きなものにします)
trials=SparkTrials(parallelism=parallelism),
) # 並列に実行されるハイパーパラメーターランの数
argmin
{'lr': 1.1829944020028542e-05}
Horovodによる分散ディープラーニング
より多くのエポックでモデルをトレーニングできるようになりました。ランを加速させるには、Sparkクラスターの複数ノードでトレーニングを分散させることができます。
詳細はHorovodのドキュメントをご覧ください。
# モデル評価関数の定義
def metric_average_hvd(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
# 分散トレーニング & 評価の関数の定義
def train_and_evaluate_hvd(lr=0.001):
# Step 1: Horovodの初期化
hvd.init()
# Step 2: 特定のCPUコア、あるいはGPUにHorovodプロセスを割り当て
# トレーニングに使用するデバイスの特定
if torch.cuda.is_available(): # gpu
torch.cuda.set_device(hvd.local_rank())
device = torch.cuda.current_device()
else:
device = torch.device("cpu") # cpu
# モデルの取得及びデバイスへの割り当て
model, _ = get_model()
model = model.to(device)
criterion = torch.nn.BCELoss()
# Step 3: Horovodプロセスの数に基づいて学習率をスケール
filtered_params = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.AdamW(filtered_params, lr=lr * hvd.size())
# Step 4: 分散処理のためにオプティマイザをラッピング
optimizer_hvd = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(
optimizer_hvd, step_size=7, gamma=0.1
)
# Step 5: Horovodプロセスに関連づけられる状態変数の初期化
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# petastormキャッシュへのアクセスを解放
with converter_train.make_torch_dataloader(
transform_spec=get_transform_spec(is_train=True),
cur_shard=hvd.rank(),
shard_count=hvd.size(),
batch_size=BATCH_SIZE,
) as train_dataloader, converter_test.make_torch_dataloader(
transform_spec=get_transform_spec(is_train=False),
cur_shard=hvd.rank(),
shard_count=hvd.size(),
batch_size=BATCH_SIZE,
) as test_dataloader:
# それぞれのコア/GPUがバッチを処理します
train_dataloader_iter = iter(train_dataloader)
train_steps = len(converter_train) // (BATCH_SIZE * hvd.size())
test_dataloader_iter = iter(test_dataloader)
test_steps = max(1, len(converter_test) // (BATCH_SIZE * hvd.size()))
# データセットに対するイテレーション
for epoch in range(NUM_EPOCHS):
# エポック情報の表示
print("Epoch {}/{}".format(epoch + 1, NUM_EPOCHS))
print("-" * 10)
# モデルのトレーニング
train_loss, train_acc = train_one_epoch(
model,
criterion,
optimizer_hvd,
exp_lr_scheduler,
train_dataloader_iter,
train_steps,
epoch,
device,
)
# モデルの評価
test_loss, test_acc, f1_acc = evaluate(
model,
criterion,
test_dataloader_iter,
test_steps,
device,
metric_agg_fn=metric_average_hvd,
)
return test_loss, test_acc, f1_acc, model
# horovodで利用できる並列度の特定
if torch.cuda.is_available(): # gpuの場合
nbrWorkers = sc.getConf().get("spark.databricks.clusterUsageTags.clusterWorkers")
if nbrWorkers is None: # gcp
nbrWorkers = sc.getConf().get(
"spark.databricks.clusterUsageTags.clusterTargetWorkers"
)
parallelism = int(nbrWorkers)
if parallelism == 0: # シングルノードのクラスター
parallelism = 1
else:
parallelism = 2 # 小規模なデータでは並列度を2と低く設定。それ以外の場合には、sc.defaultParallelismに設定することも可能
# horovodの実行環境の初期化
hr = HorovodRunner(np=parallelism)
# 分散トレーニングの実行
with mlflow.start_run(run_name=model_name) as run:
# モデルのトレーニングと評価
loss, acc, f1, model = hr.run(
train_and_evaluate_hvd, **argmin
) # argminにはチューニングされたハイパーパラメーターが含まれます
# mlflowにモデルを記録
mlflow.log_params(argmin)
mlflow.log_metrics({"loss": loss, "accuracy": acc, "f1": f1})
mlflow.pytorch.log_model(model, "model")
プロダクション環境にモデルをデプロイ
モデルのトレーニングが完了しました。やらなくてはいけないことは、(f1
メトリックに基づき)ベストなモデルを入手し、MLflowのレジストリにデプロイするということです。
UI、あるいはいくつかのAPI呼び出しでこれを行うことができます:
# レジストリからベストモデルを取得
best_model = mlflow.search_runs(
filter_string=f'attributes.status = "FINISHED"',
order_by=["metrics.f1 DESC"],
max_results=1,
).iloc[0]
model_registered = mlflow.register_model(
"runs:/" + best_model.run_id + "/model", model_name
)
client = mlflow.tracking.MlflowClient()
print("registering model version " + model_registered.version + " as production model")
client.transition_model_version_stage(
name=model_name,
version=model_registered.version,
stage="Production",
archive_existing_versions=True,
)
try:
dbutils.fs.rm(petastorm_path, True)
except:
pass
我々のモデルはデプロイされ、プロダクションでの利用が可能とフラグが立てられました!
モデルレジストリにモデルをデプロイしました。これによって、モデルのガバナンスが提供され、後段でのすべてのパイプライン開発をシンプルにし、加速させます。
このモデルは、すべてのデータパイプライン(DLT、バッチ、あるいはDatabricksモデルサービングによるリアルタイム)で活用できるようになりました。
それでは、大規模な推論の実行でこのモデルを活用しましょう。
02_PredictionPCB
プロダクション環境での推論にモデルを活用
これまでのノートブックで、ディープラーニングモデルをトレーニングし、モデルレジストリを用いてデプロイを行いました。ここでは、推論でモデルをどのように活用するのかを見ていきます。
最初のステップでは、MLflowのリポジトリからモデルをダウンロードする必要があります。
import os
import torch
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository
model_name = "cv_pcb_classification_taka" # 適宜変更
local_path = ModelsArtifactRepository(
f"models:/{model_name}/Production"
).download_artifacts(
""
)
PCB画像の分類
PCB画像を分類するために使用するUDF関数(ユーザー定義関数)を作成します。
from pyspark.sql.functions import pandas_udf
import pandas as pd
from typing import Iterator
from io import BytesIO
from PIL import Image
from torchvision.models import ViT_B_16_Weights
import numpy as np
device = "cuda" if torch.cuda.is_available() else "cpu"
loaded_model = torch.load(
local_path + "data/model.pth", map_location=torch.device(device)
)
weights = ViT_B_16_Weights.DEFAULT
feature_extractor = weights.transforms()
feature_extractor_b = sc.broadcast(feature_extractor)
model_b = sc.broadcast(loaded_model)
@pandas_udf("struct<score: float, label: int, labelName: string>")
def apply_vit(images_iter: Iterator[pd.Series]) -> Iterator[pd.DataFrame]:
model = model_b.value
feature_extractor = feature_extractor_b.value
model = model.to(torch.device("cuda"))
model.eval()
id2label = {0: "normal", 1: "anomaly"}
with torch.set_grad_enabled(False):
for images in images_iter:
pil_images = torch.stack(
[
feature_extractor(Image.open(BytesIO(b)).convert("RGB"))
for b in images
]
)
pil_images = pil_images.to(torch.device(device))
outputs = model(pil_images)
preds = torch.max(outputs, 1)[1].tolist()
probs = torch.nn.functional.softmax(outputs, dim=-1)[:, 1].tolist()
yield pd.DataFrame(
[
{"score": prob, "label": pred, "labelName": id2label[pred]}
for pred, prob in zip(preds, probs)
]
)
バッチサイズの設定
SparkのデータパーティションがArrowのレコードバッチに変換される際に、maxRecordsPerBatch
パラメータを用いてバッチサイズを64に設定しましょう。
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 64)
予測テーブル
これで、すべての画像に対する予測結果を持つ新規テーブルを計算することができます。
%sql
USE takaakiyayoi_catalog.pcb;
spark.sql("drop table IF EXISTS circuit_board_prediction")
spark.table("circuit_board_gold").withColumn(
"prediction", apply_vit("content")
).write.saveAsTable("circuit_board_prediction")
誤ってラベル付けされた画像を表示
シンプルなSQLで誤ったラベルを持つ画像を表示します。
%sql
select
*
from
circuit_board_prediction
where
labelName != prediction.labelName
なお、ここまでで作成されたテーブルはUnity Catalogで管理されているので、リネージも追跡されています。
RESTサーバレスリアルタイム推論エンドポイントにモデルをデプロイ
以降はサーバレスモデルサービングエンドポイントへのデプロイになりますが、今動かしている環境ではサーバレスモデルサービングエンドポイントが有効化されていないので、ここでは割愛します。ノートブックやドキュメントを参照ください。
結論
これですべてです!データセットをインクリメンタルに取り込み、クレンジングし、ディープラーニングモデルをトレーニングするエンドツーエンドのパイプラインを構築しました。プロダクションレベルのパイプラインとモデルはデプロイされ、活用できる状態です。
Databricksレイクハウスは、皆様のチームのスピードを改善し、プロダクションへの移行をシンプルにします:
- Auto Loaderによるユニークなデータ取り込み、データ準備機能は誰でもデータエンジニアリングにアクセスできるようにします
- 構造化データ、非構造化データを取り込み、処理できることで、すべてのユースケースをサポートします
- MLトレーニングのための高度なML機能
- データサイエンティストがオペレーションのタスクではなく、(皆様のビジネスを改善するために)本当に問題になっていることにフォーカスできるようにするMLOpsのカバレッジ
- 外部ツールなしに、あなたのすべてのユースケースをカバーするように、すべてのタイプのプロダクションデプロイメントをサポート
- データセキュリティからモデルガバナンスに至る全てをカバーするセキュリティとコンプライアンス
これによって、Databricksを活用しているチームは、データの取り込みからモデルのデプロイメントに至る高度なMLプロジェクトを数週間でプロダクションに到達させることが可能となり、ビジネスを劇的に加速させます。