LoginSignup
12
25

More than 5 years have passed since last update.

同じ画像を検索するプログラム

Last updated at Posted at 2017-05-16

同じ画像を検索するプログラム

あるフォルダ配下にちらばった重複画像を見つけるプログラムです
画像を縮小、モノクロ化し画像の特徴を最低限残しつつベクトル化してからベクトル同士の距離を計算します
距離が0又は0に近い物を同一画像と判定します。そんなプログラム。

画像をできるだけ小さいデータにする

ついでに計算しやすいように numpy.array に変換します。

def img2vec(filename):
    img = Image.open(filename)
    img = img.resize((200, 200), resample=Image.BILINEAR) #縮小
    img = img.convert('1') #2値化
    #img.save(get_mono_filename(filename)) # 画像確認したい場合
    return np.array(img)

ここで設定している 200 x 200 のサイズは大きすぎるかもしれません、22000枚に対して本プログラムを実行した所9時間ほどかかりましたので、 50 x 50 程度でも良いように思います。

画像同士を比較する

numpyでベクトル同士の距離を求めます、numpy使うととても楽。

def normdiff(vec1, vec2):
    norm = np.linalg.norm(vec1 - vec2)
    return norm

試験コード

    norm = normdiff(img2vec("picture1.bmp"), img2vec("picture2.bmp"))
    print(norm)

上記関数を使用してこんな感じのテストコードを実行すれば画像同士の近似度?が表示されます。

ソース全体

import csv
import datetime
import glob
import multiprocessing as mulproc
import numpy as np
import os
from PIL import Image
import sys

def log(*args):
    timestr = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
    body = " ".join([timestr] + [str(v) for v in args])
    print(body)
    with open("log.txt", mode="a", encoding="utf-8") as wf:
        wf.write(body + "\n")

def get_mono_filename(filename):
    dirname, filename = os.path.split(filename)
    return os.path.join("mono", filename)

def img2vec(filename):
    # ANTIALIAS
    # BILINEAR
    # BICUBIC
    # NEAREST
    img = Image.open(filename)
    img = img.resize((200, 200), resample=Image.BILINEAR)
    img = img.convert('1')
    #img.save(get_mono_filename(filename)) # 画像確認したい場合
    return np.array(img)

def normdiff(vec1, vec2):
    norm = np.linalg.norm(vec1 - vec2)
    return norm

def walk_img_files(walk_dir):
    for root, dirs, files in os.walk(walk_dir):
        yield root
        for file in files:
            yield os.path.join(root, file)

def is_picture_filename(filename):
    extensions = ["png", "jpg"]
    for ext in extensions:
        if filename.endswith("." + ext): return True
    return False

def save_vector(filename, vec):
    with open(filename, mode="w", encoding="utf-8") as wf:
        writer = csv.writer(wf, lineterminator='\n')
        writer.writerows(vec)

def save_labels(filenames):
    with open("./labels.txt", mode="w", encoding="utf-8") as wf:
        wf.write("\n".join(filenames))

def create_vector_dump(search_dir):
    files = list(walk_img_files(search_dir))
    img_files = []
    for f in files:
        if is_picture_filename(f): img_files.append(f)

    for img_file in img_files:
        vec = img2vec(img_file)
        dir, filename = os.path.split(img_file)
        save_vector(os.path.join("vector", filename) + ".vector", list(vec.astype(int)))
    save_labels(img_files)
    return

def load_labels():
    with open("./labels.txt", mode="r", encoding="utf-8") as rf:
        body = rf.read()
        labels = body.split("\n")
        labels = [l for l in labels if len(l) > 0]
        return labels

def load_vecs(labels):
    log("start load vectoes")
    vecs = []
    for i, l in enumerate(labels):
        dirname, filename = os.path.split(l)
        filename = os.path.join("vector", filename + ".vector")
        vecs.append(np.loadtxt(filename, delimiter=","))
        log("load vectoes {}/{} complete".format(i, len(labels)))
    log("end load vectoes")
    return np.array(vecs)

def save_results(rows):
    with open("results.csv", mode="w", encoding="utf-8") as wf:
        writer = csv.writer(wf, lineterminator='\n')
        writer.writerows(rows)

def create_join_imgs(filename, vecs):
    vecs = np.concatenate(vecs, axis=1)
    vecs *= 255
    img = Image.fromarray(vecs).convert("1")
    img.save(filename)

def create_dup_imgs(approximates, vecs, labels):
    for i, approximate in enumerate(approximates):
        orig_label = labels[i]
        if len(approximate) < 1: continue
        img_vecs = [vecs[i]] + [vecs[ai] for ai in approximate]
        dirname, filename = os.path.split(orig_label)
        filename = os.path.join("dupulicate", filename)
        img = create_join_imgs(filename, img_vecs)

class EnumApproximate:
    def __init__(self):
        labels = load_labels()
        #labels = labels[0:1000]
        self.labels = labels
        vecs = load_vecs(labels)
        self.vecs = vecs
        self.threthold = float(10.0)

    def enum_approximate(self, index):
        indexes = []
        vec = self.vecs[index]
        for i, v in enumerate(self.vecs):
            if i == index: continue
            dif = normdiff(vec, v)
            if dif <= self.threthold: indexes.append(i)
        return indexes

    def exec(self):
        log("start")
        approximates = []
        for i in range(len(self.labels)):
            log("enum_approximate vectoes {}/{} complete".format(i, len(self.labels)))
            approximates.append(self.enum_approximate(i))
        rows = []
        for i in range(len(self.labels)):
            idxs = approximates[i]
            cols = [self.labels[i]] + [self.labels[ii] for ii in idxs]
            rows.append(cols)
        save_results(rows)
        create_dup_imgs(approximates, self.vecs, self.labels)
        log("end")

def main():
    x = EnumApproximate()
    x.exec()

if __name__ == '__main__':
    create_vector_dump(r"O:\picture\艦これ")
    main()

実行について

ソースと同じ階層に vector, dupulicate というフォルダを作成し

create_vector_dump(r"O:\picture\艦これ")

この1行を実行すると vector フォルダに画像をベクトル化したCSVが作成されます。
ベクトル化が終了したらmainを実行します。
両方同時に実行しても構いません。

実行が終わると result.csv というCSVのような形式のファイルと、dupulicate フォルダに重複画像を繋げた画像が作成されます。

43164116_big_p1.png

見つけるだけで重複削除等をするわけではないので、そこから先なにかをしたい場合は result.csv を元になにかスクリプトを書けば良いと思います。

12
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
25