同じ画像を検索するプログラム
あるフォルダ配下にちらばった重複画像を見つけるプログラムです
画像を縮小、モノクロ化し画像の特徴を最低限残しつつベクトル化してからベクトル同士の距離を計算します
距離が0又は0に近い物を同一画像と判定します。そんなプログラム。
画像をできるだけ小さいデータにする
ついでに計算しやすいように numpy.array に変換します。
def img2vec(filename):
img = Image.open(filename)
img = img.resize((200, 200), resample=Image.BILINEAR) #縮小
img = img.convert('1') #2値化
#img.save(get_mono_filename(filename)) # 画像確認したい場合
return np.array(img)
ここで設定している 200 x 200 のサイズは大きすぎるかもしれません、22000枚に対して本プログラムを実行した所9時間ほどかかりましたので、 50 x 50 程度でも良いように思います。
画像同士を比較する
numpyでベクトル同士の距離を求めます、numpy使うととても楽。
def normdiff(vec1, vec2):
norm = np.linalg.norm(vec1 - vec2)
return norm
試験コード
norm = normdiff(img2vec("picture1.bmp"), img2vec("picture2.bmp"))
print(norm)
上記関数を使用してこんな感じのテストコードを実行すれば画像同士の近似度?が表示されます。
ソース全体
import csv
import datetime
import glob
import multiprocessing as mulproc
import numpy as np
import os
from PIL import Image
import sys
def log(*args):
timestr = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
body = " ".join([timestr] + [str(v) for v in args])
print(body)
with open("log.txt", mode="a", encoding="utf-8") as wf:
wf.write(body + "\n")
def get_mono_filename(filename):
dirname, filename = os.path.split(filename)
return os.path.join("mono", filename)
def img2vec(filename):
# ANTIALIAS
# BILINEAR
# BICUBIC
# NEAREST
img = Image.open(filename)
img = img.resize((200, 200), resample=Image.BILINEAR)
img = img.convert('1')
#img.save(get_mono_filename(filename)) # 画像確認したい場合
return np.array(img)
def normdiff(vec1, vec2):
norm = np.linalg.norm(vec1 - vec2)
return norm
def walk_img_files(walk_dir):
for root, dirs, files in os.walk(walk_dir):
yield root
for file in files:
yield os.path.join(root, file)
def is_picture_filename(filename):
extensions = ["png", "jpg"]
for ext in extensions:
if filename.endswith("." + ext): return True
return False
def save_vector(filename, vec):
with open(filename, mode="w", encoding="utf-8") as wf:
writer = csv.writer(wf, lineterminator='\n')
writer.writerows(vec)
def save_labels(filenames):
with open("./labels.txt", mode="w", encoding="utf-8") as wf:
wf.write("\n".join(filenames))
def create_vector_dump(search_dir):
files = list(walk_img_files(search_dir))
img_files = []
for f in files:
if is_picture_filename(f): img_files.append(f)
for img_file in img_files:
vec = img2vec(img_file)
dir, filename = os.path.split(img_file)
save_vector(os.path.join("vector", filename) + ".vector", list(vec.astype(int)))
save_labels(img_files)
return
def load_labels():
with open("./labels.txt", mode="r", encoding="utf-8") as rf:
body = rf.read()
labels = body.split("\n")
labels = [l for l in labels if len(l) > 0]
return labels
def load_vecs(labels):
log("start load vectoes")
vecs = []
for i, l in enumerate(labels):
dirname, filename = os.path.split(l)
filename = os.path.join("vector", filename + ".vector")
vecs.append(np.loadtxt(filename, delimiter=","))
log("load vectoes {}/{} complete".format(i, len(labels)))
log("end load vectoes")
return np.array(vecs)
def save_results(rows):
with open("results.csv", mode="w", encoding="utf-8") as wf:
writer = csv.writer(wf, lineterminator='\n')
writer.writerows(rows)
def create_join_imgs(filename, vecs):
vecs = np.concatenate(vecs, axis=1)
vecs *= 255
img = Image.fromarray(vecs).convert("1")
img.save(filename)
def create_dup_imgs(approximates, vecs, labels):
for i, approximate in enumerate(approximates):
orig_label = labels[i]
if len(approximate) < 1: continue
img_vecs = [vecs[i]] + [vecs[ai] for ai in approximate]
dirname, filename = os.path.split(orig_label)
filename = os.path.join("dupulicate", filename)
img = create_join_imgs(filename, img_vecs)
class EnumApproximate:
def __init__(self):
labels = load_labels()
#labels = labels[0:1000]
self.labels = labels
vecs = load_vecs(labels)
self.vecs = vecs
self.threthold = float(10.0)
def enum_approximate(self, index):
indexes = []
vec = self.vecs[index]
for i, v in enumerate(self.vecs):
if i == index: continue
dif = normdiff(vec, v)
if dif <= self.threthold: indexes.append(i)
return indexes
def exec(self):
log("start")
approximates = []
for i in range(len(self.labels)):
log("enum_approximate vectoes {}/{} complete".format(i, len(self.labels)))
approximates.append(self.enum_approximate(i))
rows = []
for i in range(len(self.labels)):
idxs = approximates[i]
cols = [self.labels[i]] + [self.labels[ii] for ii in idxs]
rows.append(cols)
save_results(rows)
create_dup_imgs(approximates, self.vecs, self.labels)
log("end")
def main():
x = EnumApproximate()
x.exec()
if __name__ == '__main__':
create_vector_dump(r"O:\picture\艦これ")
main()
実行について
ソースと同じ階層に vector, dupulicate というフォルダを作成し
create_vector_dump(r"O:\picture\艦これ")
この1行を実行すると vector フォルダに画像をベクトル化したCSVが作成されます。
ベクトル化が終了したらmainを実行します。
両方同時に実行しても構いません。
実行が終わると result.csv というCSVのような形式のファイルと、dupulicate フォルダに重複画像を繋げた画像が作成されます。
見つけるだけで重複削除等をするわけではないので、そこから先なにかをしたい場合は result.csv を元になにかスクリプトを書けば良いと思います。