はじめに
こんにちは、株式会社COSM AI事業部でエンジニアをやってる村田です。
今回外観検査AIモデルであるPatchCoreを触っておきたく、
アルミフレームの表面の傷を検知してみました。
PatchCoreとは??
外観検査における異常検知において、良品のサンプルは簡単にたくさん手に入るけど不良品のサンプルは手に入りにくい傾向にあります。
(そもそも不良品の数が少ない、傷の形状・大きさが様々etc)
そのため良品のみの画像を学習させて、その特徴差分から異常検知する手法が注目されています。
その中でも代表的な手法が今回使用したPatchCoreになります。
より詳しくアルゴリズム等を知りたい方はこちらの記事をみてください!
具体的かつ分かりやすく説明してくださってます!
今回検査対象に用いたもの
今回検査対象として画像のアルミフレームを用いました。
このアルミフレームの表面を撮影し、検査したい箇所をトリミングして学習・推論に用いました。
撮影環境は以下の感じ。
暗室環境でリング照明とエリアスキャンカメラ用いて撮影しました。
撮れた良部位の画像はこんな感じ。
撮れた不良部位の画像はこんな感じ。
傷が確認できます。
実装コード
こちらのコードを参考に改変・実装しました。
環境は以下。
- python : 3.9.13
- 実行環境 : CPU環境
パッケージに関しては以下pipfileに。
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
streamlit = "*"
wget = "==3.2"
matplotlib = "==3.3.4"
timm = "==0.4.12"
click = "==7.1.2"
torch = "==1.9.0"
tqdm = "==4.61.2"
numpy = "==1.19.5"
torchvision = "==0.10.0"
pillow = "==8.3.1"
pyyaml = "==5.4.1"
scikit-learn = "==0.24.2"
opencv-python = "*"
altair = "==4.1.0"
plotly = "==5.3.1"
protobuf = "==3.20.1"
[dev-packages]
[requires]
python_version = "3.9"
ディレクトリ構造
src:.
| params.py
| train.py
| inference.py
| dataset.py
| anomaly_detector.py
| patchcore.py
| Pipfile
| Pipfile.lock
|
+---datasets
| +---alumi
| | +---test
| | | +---bad
| | | | | bad01.png
| | | | | bad02.png
| | | | | bad03.png
| | | | | bad04.png
| | | | | bad05.png
| | | |
| | | \---good
| | | | good01.png
| | | | good02.png
| | | | good03.png
| | | | good04.png
| | | | good05.png
| | |
| | \---train
| | \---good
| | train01.png
| | train02.png
| | train03.png
| | train04.png
| | train05.png
| | train06.png
| | train07.png
| | train08.png
| | train09.png
| | train10.png
| |
|
+---npy_data
| patch_lib.npy(学習実行後に生成される)
|
\---weights
alumi.pth(学習実行後に生成される)
params.py
学習推論ファイルに対して使用するパラメーターをまとめています。
MODELNAME = "./weights/alumi.pth"
DATASET = "alumi"
BACKBONE = "wide_resnet50_2"
THRESHOLD = 20
train.py
以下が学習用を行うコード。
こいつを実行すると、npy_data・weightsのディレクトリに学習データが保存されます。
from patchcore import PatchCore
from dataset import Dataset
import torch
from params import MODELNAME,DATASET,BACKBONE
def main():
model = PatchCore(backbone_name=BACKBONE, pretrained=True)
train_ds, _ = Dataset(DATASET).get_dataloaders()
model.fit(train_ds)
torch.save(model.state_dict(), MODELNAME)
print("モデル保存先: ", MODELNAME)
if __name__ == "__main__":
main()
inference.py
以下が推論を行うコード。
指定したデータセットディレクトリ内の推論用画像にすべてに対して、
推論を実行し結果を表示します。
from anomaly_detector import AnomalyDetector
from patchcore import PatchCore
from data import Dataset
from params import MODELNAME,DATASET,BACKBONE,THRESHOLD
import torch
def main():
model = PatchCore(backbone_name=BACKBONE)
model.load_state_dict(torch.load(MODELNAME))
model.standby()
anomaly_detector = AnomalyDetector(model, THRESHOLD)
dataset = Dataset(DATASET)
_, test_loader = dataset.get_dataloaders()
anomaly_detector.run(test_loader)
if __name__ == "__main__":
main()
dataset.py
データセットまわりの処理を実装してます。
データセットが格納させれてるディレクトリ名を指定してあげることで、
学習・推論用画像をそれぞれ224*224にリサイズして取得できます。
バックボーンを変更する際はそれに合わせてSIZEを変更する必要があります。
from pathlib import Path
from torch import tensor
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torch.utils.data import DataLoader
DATASETS_PATH = Path("./datasets")
IMAGENET_MEAN = tensor([.485, .456, .406])
IMAGENET_STD = tensor([.229, .224, .225])
SIZE = 224
transform = transforms.Compose([
transforms.Resize(SIZE, interpolation=transforms.InterpolationMode.BICUBIC),
transforms.ToTensor(),
transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])
class CustomImageDataset(ImageFolder):
def __init__(self, root, transform):
super().__init__(root=root, transform=transform)
def __getitem__(self, index):
path, target = self.samples[index]
image = self.loader(path)
if self.transform is not None:
image = self.transform(image)
return image, path, target
class Dataset:
def __init__(self, cls):
self.cls = cls
self.train_ds = CustomImageDataset(DATASETS_PATH / cls / "train", transform)
self.test_ds = CustomImageDataset(DATASETS_PATH / cls / "test", transform)
def get_dataloaders(self):
return DataLoader(self.train_ds), DataLoader(self.test_ds)
anomaly_detector.py
任意の閾値で画像を異常判定し結果を出力する処理です。
import os
class AnomalyDetector:
def __init__(self, model, threshold):
self.threshold = threshold
self.model = model
def run(self, dataloader):
for images, paths, _ in dataloader:
for i in range(images.size(0)):
path = paths[i]
image_tensor = images[i].unsqueeze(0)
filename = os.path.basename(path)
img_lvl_anom_score, _ = self.model.predict(image_tensor)
score = img_lvl_anom_score.item()
is_anomaly = score > self.threshold
print(f"ファイル名: {filename}, 異常値スコア: {score}, 異常フラグ: {is_anomaly}")
patchcore.py
PatchCoreの実装コードになります。
メモリバンク の取得割合や取得する近傍数等のパラメーターは適宜いじってみてください。
今回はデフォルト値で検証しました。
from tqdm import tqdm
import torch
import timm
from dataset import SIZE
import numpy as np
import sys
from sklearn import random_projection
class PatchCore(torch.nn.Module):
def __init__(
self,
f_coreset = 0.01,
backbone_name = "resnet18",
coreset_eps = 0.90,
pool_last = False,
pretrained = True
):
super().__init__()
self.feature_extractor = self._initialize_feature_extractor(backbone_name, pretrained)
self.pool = torch.nn.AdaptiveAvgPool2d(1) if pool_last else None
self.backbone_name = backbone_name
self.out_indices = (2, 3)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.feature_extractor = self.feature_extractor.to(self.device)
self.f_coreset = f_coreset
self.coreset_eps = coreset_eps
self.average = torch.nn.AvgPool2d(3, stride=1)
self.n_reweight = 3
self.patch_lib = []
self.resize = None
def _initialize_feature_extractor(self, backbone_name, pretrained):
feature_extractor = timm.create_model(
backbone_name,
out_indices=(2, 3),
features_only=True,
pretrained=pretrained,
)
for param in feature_extractor.parameters():
param.requires_grad = False
feature_extractor.eval()
return feature_extractor
def __call__(self, x: torch.Tensor):
with torch.no_grad():
feature_maps = self.feature_extractor(x.to(self.device))
feature_maps = [fmap.to("cpu") for fmap in feature_maps]
if self.pool:
return feature_maps[:-1], self.pool(feature_maps[-1])
else:
return feature_maps
def fit(self, train_dl):
self.patch_lib = self._create_patch_library(train_dl)
self._reduce_patch_library()
def _create_patch_library(self, train_dl):
patch_lib = []
for sample, _, _ in tqdm(train_dl, **self.get_tqdm_params()):
feature_maps = self(sample)
resized_maps = self._resize_feature_maps(feature_maps)
patch = self._reshape_and_concatenate(resized_maps)
patch_lib.append(patch)
return torch.cat(patch_lib, 0)
def _resize_feature_maps(self, feature_maps):
if self.resize is None:
largest_fmap_size = feature_maps[0].shape[-2:]
self.resize = torch.nn.AdaptiveAvgPool2d(largest_fmap_size)
return [self.resize(self.average(fmap)) for fmap in feature_maps]
def _reshape_and_concatenate(self, resized_maps):
patch = torch.cat(resized_maps, 1)
return patch.reshape(patch.shape[1], -1).T
def _reduce_patch_library(self):
if self.f_coreset < 1:
self.coreset_idx = self._get_coreset_idx_randomp(
self.patch_lib,
n=int(self.f_coreset * self.patch_lib.shape[0]),
eps=self.coreset_eps,
)
self.patch_lib = self.patch_lib[self.coreset_idx]
x = self.patch_lib.to('cpu').detach().numpy().copy()
np.save("./npy_data/patch_lib.npy", x)
def _get_coreset_idx_randomp(self, z_lib, n = 1000, eps = 0.90, float16 = True, force_cpu = False):
print(f" ランダムプロジェクション。開始次元 = {z_lib.shape}.")
try:
transformer = random_projection.SparseRandomProjection(eps=eps)
z_lib = torch.tensor(transformer.fit_transform(z_lib))
print(f" 完了。変換後の次元 = {z_lib.shape}.")
except ValueError:
print( " ベクトルをプロジェクションできませんでした。`eps`を増やしてください。")
select_idx = 0
last_item = z_lib[select_idx:select_idx+1]
coreset_idx = [torch.tensor(select_idx)]
min_distances = torch.linalg.norm(z_lib-last_item, dim=1, keepdims=True)
if float16:
last_item = last_item.half()
z_lib = z_lib.half()
min_distances = min_distances.half()
if torch.cuda.is_available() and not force_cpu:
last_item = last_item.to("cuda")
z_lib = z_lib.to("cuda")
min_distances = min_distances.to("cuda")
for _ in tqdm(range(n-1), **self.get_tqdm_params()):
distances = torch.linalg.norm(z_lib-last_item, dim=1, keepdims=True)
min_distances = torch.minimum(distances, min_distances)
select_idx = torch.argmax(min_distances)
last_item = z_lib[select_idx:select_idx+1]
min_distances[select_idx] = 0
coreset_idx.append(select_idx.to("cpu"))
return torch.stack(coreset_idx)
def predict(self, sample):
feature_maps = self(sample)
resized_maps = self._resize_feature_maps(feature_maps)
patch = self._reshape_and_concatenate(resized_maps)
s, s_map = self._compute_anomaly_scores(patch, feature_maps)
return s, s_map
def _compute_anomaly_scores(self, patch, feature_maps):
dist = torch.cdist(patch, self.patch_lib)
min_val, min_idx = torch.min(dist, dim=1)
s_star, s_idx = torch.max(min_val), torch.argmax(min_val)
w = self._reweight(patch, min_idx, s_star, s_idx)
s = w * s_star
s_map = self._create_segmentation_map(min_val, feature_maps)
return s, s_map
def _reweight(self, patch, min_idx, s_star, s_idx):
m_test = patch[s_idx].unsqueeze(0)
m_star = self.patch_lib[min_idx[s_idx]].unsqueeze(0)
w_dist = torch.cdist(m_star, self.patch_lib)
_, nn_idx = torch.topk(w_dist, k=self.n_reweight, largest=False)
m_star_knn = torch.linalg.norm(m_test - self.patch_lib[nn_idx[0, 1:]], dim=1)
D = torch.sqrt(torch.tensor(patch.shape[1]))
return 1 - (torch.exp(s_star / D) / torch.sum(torch.exp(m_star_knn / D)))
def _create_segmentation_map(self, min_val, feature_maps):
s_map = min_val.view(1, 1, *feature_maps[0].shape[-2:])
s_map = torch.nn.functional.interpolate(
s_map, size=(SIZE, SIZE), mode='bilinear'
)
return s_map
def standby(self):
largest_fmap_size = torch.LongTensor([SIZE // 8, SIZE // 8])
self.resize = torch.nn.AdaptiveAvgPool2d(largest_fmap_size)
self.patch_lib = np.load("./npy_data/patch_lib.npy")
self.patch_lib = torch.from_numpy(self.patch_lib.astype(np.float32)).clone()
def get_tqdm_params(self):
return {
"file" : sys.stdout,
"bar_format" : " {l_bar}{bar:10}{r_bar}{bar:-10b}",
}
検知結果
まず撮影した画像をデータセットディレクトリに配置しtrain.pyを実行。
以下のような出力が出る。
今回は学習データに良部位の画像を10枚食わせました。
PS C:\patchcore> python train.py
0%| | 0/10 [00:00<?, ?it/s] C:\Users\paulm_k1pu3xr\.virtualenvs\ind_knn_ad-5yVICGag\lib\site-packages\torch\nn\functional.py:718: UserWarning: Named tensors and all their associated APIs are an experimental feature and subject to change. Please do not use them for anything important until they are released as stable. (Triggered internally at ..\c10/core/TensorImpl.h:1156.)
return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
100%|██████████| 10/10 [00:01<00:00, 7.02it/s]
ランダムプロジェクション。開始次元 = torch.Size([7840, 1536]).
完了。変換後の次元 = torch.Size([7840, 221]).
100%|██████████| 77/77 [00:00<00:00, 89.48it/s]
モデル保存先: ./weights/alumi.pth
次にinference.pyを実行する。
すると以下のような結果が出力がされる。
今回異常値の判定閾値を20に設定して実行。
推論画像として良・不良部位の画像をそれぞれ5枚ずつ使用。
良部位は正常、不良部位は異常として検知できた。
PS C:\patchcore> python .\inference.py
ファイル名: bad01.png, 異常値スコア: 23.76926040649414, 異常フラグ: True
ファイル名: bad02.png, 異常値スコア: 23.66185760498047, 異常フラグ: True
ファイル名: bad03.png, 異常値スコア: 28.26588821411133, 異常フラグ: True
ファイル名: bad04.png, 異常値スコア: 27.73970413208008, 異常フラグ: True
ファイル名: bad05.png, 異常値スコア: 29.32174301147461, 異常フラグ: True
ファイル名: good1.png, 異常値スコア: 15.619819641113281, 異常フラグ: False
ファイル名: good2.png, 異常値スコア: 15.680216789245605, 異常フラグ: False
ファイル名: good3.png, 異常値スコア: 15.7577543258667, 異常フラグ: False
ファイル名: good4.png, 異常値スコア: 15.54543399810791, 異常フラグ: False
ファイル名: good5.png, 異常値スコア: 16.94964027404785, 異常フラグ: False
おわりに
外観検査AIモデルであるPatchCore用いて、アルミフレームの表面の傷を検知してみました。
良品学習のみで不良部位の画像が高い異常値として検出されることを確認できました。
小さい傷でも撮影機器・撮影方法・画像処理次第で異常検知が可能なので、
要件や機構、閾値設定次第で実際の現場でも使える場面がありそうです。
参考文献