LoginSignup
2
0

More than 5 years have passed since last update.

Image change (binary) classification

Last updated at Posted at 2018-12-10

Library

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

import zipfile
import io

import re

from PIL import Image

import time
from tqdm import tqdm

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split 
from sklearn.metrics import f1_score

import tensorflow as tf
from keras import backend as K
from keras.models import Sequential, Model
from keras.layers import Input, Dense, Dropout, Flatten, BatchNormalization
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D 
from keras.layers import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.models import load_model

from keras.applications.mobilenet import MobileNet
#from keras.applications.resnet50 import ResNet50

np.random.seed(10)
tf.set_random_seed(10)

Data

%%time

zip_path_train = '/content/gdrive/My Drive/xxx/data/train.zip'
#zip_path_test = '/content/gdrive/My Drive/xxx/data/test.zip'

path = '/content/gdrive/My Drive/xxx/data/'

with zipfile.ZipFile(zip_path_train, 'r') as zf_train:  
  flist_train = zf_train.namelist()

#with zipfile.ZipFile(zip_path_test, 'r') as zf_test:  
#  flist_test = zf_test.namelist()

print ('size train: ', len(flist_train))
#print ('size test: ', len(flist_test)) 

%%time

train_b_p = []
train_a_p = []
train_b_n = []
train_a_n = []

for i in tqdm(range(len(flist_train))):
  temp = flist_train[i]

  if ('PALSAR/before/positive' in temp):
    train_b_p.append(temp)
  elif ('PALSAR/after/positive' in temp):
    train_a_p.append(temp)
  elif ('PALSAR/before/negative' in temp):
    train_b_n.append(temp)
  elif ('PALSAR/after/negative' in temp):
    train_a_n.append(temp)
  else:
    pass

train_b_p = train_b_p[1:]
train_a_p = train_a_p[1:]
train_b_n = train_b_n[1:]
train_a_n = train_a_n[1:]

selected_idx = np.random.RandomState(seed=10).permutation(len(train_b_n))[:len(train_b_p)*5]

%%time

zippath = '/content/gdrive/My Drive/xxx/data/train.zip'

img_b_p = []
img_a_p = []
img_b_p_90 = []
img_a_p_90 = []
img_b_p_270 = []
img_a_p_270 = []
img_b_n = []
img_a_n = []

with zipfile.ZipFile(zippath) as z:

  for i in tqdm(range(len(train_b_p))):
    imgname_b_p = train_b_p[i]
    imgname_a_p = train_a_p[i]
    img_b_p.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p)))))
    img_a_p.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p)))))
    img_b_p_90.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p))).rotate(90)))
    img_a_p_90.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p))).rotate(90)))
    img_b_p_270.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p))).rotate(270)))
    img_a_p_270.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p))).rotate(270)))

  for i in tqdm(selected_idx):
    imgname_b_n = train_b_n[i]
    imgname_a_n = train_a_n[i]
    img_b_n.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_n)))))
    img_a_n.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_n)))))    

temp_b_p = np.array(img_b_p)
temp_a_p = np.array(img_a_p)

temp_b_p_flip_v = temp_b_p[:, ::-1, :]
temp_a_p_flip_v = temp_a_p[:, ::-1, :]
temp_b_p_flip_h = temp_b_p[:, :, ::-1]
temp_a_p_flip_h = temp_a_p[:, :, ::-1]

x_train_b_p_0 = np.expand_dims(np.array(img_b_p), axis=3)
x_train_b_p_90 = np.expand_dims(np.array(img_b_p_90), axis=3)
x_train_b_p_270 = np.expand_dims(np.array(img_b_p_270), axis=3)
x_train_b_p_flip_v = np.expand_dims(temp_b_p_flip_v, axis=3)
x_train_b_p_flip_h = np.expand_dims(temp_b_p_flip_h, axis=3)
x_train_b_p = np.concatenate((x_train_b_p_0, x_train_b_p_90, x_train_b_p_270, 
                            x_train_b_p_flip_v, x_train_b_p_flip_h), axis=0)

x_train_a_p_0 = np.expand_dims(np.array(img_a_p), axis=3)
x_train_a_p_90 = np.expand_dims(np.array(img_a_p_90), axis=3)
x_train_a_p_270 = np.expand_dims(np.array(img_a_p_270), axis=3)
x_train_a_p_flip_v = np.expand_dims(temp_a_p_flip_v, axis=3)
x_train_a_p_flip_h = np.expand_dims(temp_a_p_flip_h, axis=3)
x_train_a_p = np.concatenate((x_train_a_p_0, x_train_a_p_90, x_train_a_p_270, 
                            x_train_a_p_flip_v, x_train_a_p_flip_h), axis=0)

x_train_b_n = np.expand_dims(np.array(img_b_n), axis=3)
x_train_a_n = np.expand_dims(np.array(img_a_n), axis=3)

x_train_b = np.concatenate((x_train_b_p, x_train_b_n), axis=0)
x_train_a = np.concatenate((x_train_a_p, x_train_a_n), axis=0)

n_x_train_p = len(x_train_b_p)
n_x_train_n = len(x_train_b_n)

x_train_b_norm = x_train_b / 65535.
x_train_a_norm = x_train_a / 65535.

y_train_p = np.ones((n_x_train_p, 1), dtype=np.int) 
y_train_n = np.zeros((n_x_train_n, 1), dtype=np.int) 
y_train = np.concatenate((y_train_p, y_train_n), axis=0)
index_p = np.random.randint(len(train_b_p))
index_n = np.random.randint(len(train_b_n))

ex_b_p = img_b_p[index_p]
ex_a_p = img_a_p[index_p]
ex_b_n = img_b_n[index_n]
ex_a_n = img_a_n[index_n]

fig = plt.figure(figsize = (5, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.imshow(ex_b_p, cmap='gray')
ax1.set_title('Before (positive)')
ax1.set_axis_off()

ax2 = fig.add_subplot(1, 2, 2)
ax2.imshow(ex_a_p, cmap='gray')
ax2.set_title('After (positive)')
ax2.set_axis_off()

plt.show()

fig = plt.figure(figsize = (5, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.imshow(ex_b_n, cmap='gray')
ax1.set_title('Before (negative)')
ax1.set_axis_off()

ax2 = fig.add_subplot(1, 2, 2)
ax2.imshow(ex_a_n, cmap='gray')
ax2.set_title('After (negative)')
ax2.set_axis_off()

plt.show()

image.png

x_train_b, x_val_b, x_train_a, x_val_a, y_train, y_val = train_test_split(x_train_b_norm,
                                                                          x_train_a_norm,
                                                                          y_train, test_size=0.2,
                                                                          stratify=y_train,
                                                                          random_state=10)

Model 1

digit_input = Input(shape=(40, 40, 1))
x = Conv2D(32, (3, 3), activation='relu', padding='same')(digit_input)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)

x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.25)(x)
out = Dense(1, activation='sigmoid')(x)

vision_model = Model(digit_input, out)

digit_b = Input(shape=(40, 40, 1), name='before')
digit_a = Input(shape=(40, 40, 1), name='after')

# The vision model will be shared, weights and all
out_b = vision_model(digit_b)
out_a = vision_model(digit_a)

concatenated = concatenate([out_b, out_a])
out = Dense(1, activation='sigmoid')(concatenated)

classification_model = Model([digit_b, digit_a], out)

vision_model.summary()
#classification_model.summary()
classification_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) 

epochs = 10
batch_size = 64

callbacks = [
    EarlyStopping(patience=3, verbose=1),
    ReduceLROnPlateau(patience=3, verbose=1)
]

history = classification_model.fit({'before': x_train_b, 'after': x_train_a}, y_train, 
                    batch_size=batch_size, epochs=epochs, callbacks=callbacks,
                    validation_data=({'before': x_val_b, 'after': x_val_a}, y_val),
                    verbose=1)

image.png

plt.figure(figsize =(5,3))
plt.plot(history.history['loss'], marker='.', label='train')
plt.plot(history.history['val_loss'], marker='.', label='validation')
plt.title('Loss')
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(loc='best')
plt.show()

plt.figure(figsize =(5,3))
plt.plot(history.history['acc'], marker='.', label='train')
plt.plot(history.history['val_acc'], marker='.', label='validation')
plt.title('Accuracy')
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend(loc='best')
plt.show()

image.png

Model 2

digit_b = Input(shape=(40, 40, 1), name='before')
digit_a = Input(shape=(40, 40, 1), name='after')

concatenated = concatenate([digit_b, digit_a])
x = Conv2D(32, (3, 3), activation='relu', padding='same')(concatenated)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)

x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.25)(x)
out = Dense(1, activation='sigmoid')(x)

model2 = Model([digit_b, digit_a], out)

model2.summary()

image.png

model2.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) 

epochs = 10
batch_size = 64

callbacks = [
    EarlyStopping(patience=3, verbose=1),
    ReduceLROnPlateau(patience=3, verbose=1)
]

history = model2.fit({'before': x_train_b, 'after': x_train_a}, y_train, 
                    batch_size=batch_size, epochs=epochs, callbacks=callbacks,
                    validation_data=({'before': x_val_b, 'after': x_val_a}, y_val),
                    verbose=1)

image.png

Data 2 (imbalanced, positive:negative = 1:4)

selected_idx = np.random.RandomState(seed=10).permutation(len(train_b_n))[:len(train_b_p)*20]

Loss functions

def precision(y_true, y_pred):
  true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
  predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
  pr = true_positives / (predicted_positives + K.epsilon())
  return pr

def recall(y_true, y_pred):
  true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
  possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
  re = true_positives / (possible_positives + K.epsilon())
  return re

def f1(y_true, y_pred):
  pr = precision(y_true, y_pred)
  re = recall(y_true, y_pred)
  return 2*((pr * re)/(pr + re + K.epsilon()))

def iou(y_true, y_pred):
  true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
  false_positives = K.sum(K.round(K.clip((1-y_true) * y_pred, 0, 1)))
  false_negatives = K.sum(K.round(K.clip(y_true * (1-y_pred), 0, 1)))
  io =  true_positives / (true_positives + false_positives + 
                          false_negatives + K.epsilon())   
  return io  

def iou_loss(y_true, y_pred):
  true_positives = K.mean(K.clip(y_true * y_pred, 0, 1), axis=-1)
  false_positives = K.mean(K.clip((1-y_true) * y_pred, 0, 1), axis=-1)
  false_negatives = K.mean(K.clip(y_true * (1-y_pred), 0, 1), axis=-1)
  io =  true_positives / (true_positives + false_positives + 
                          false_negatives + K.epsilon())   
  return 1-io  

def focal_loss(gamma=2., alpha=.25):
  def focal_loss_fixed(y_true, y_pred):
    pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
    pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
    ret = -K.sum(alpha * K.pow(1. - pt_1, gamma) * 
                 K.log(pt_1)) -K.sum((1-alpha) * K.pow( pt_0, gamma) * 
                                     K.log(1. - pt_0))
    return ret
  return focal_loss_fixed

def bce_loss(y_true, y_pred):
  return K.mean(K.binary_crossentropy(y_true, y_pred), axis=-1)

def bce_iou_loss(y_true, y_pred):
  return bce_loss(y_true, y_pred) + iou_loss(y_true, y_pred)

def dice_loss(y_true, y_pred):
  smooth = 1.
  y_true_f = K.flatten(y_true)
  y_pred_f = K.flatten(y_pred)
  intersection = y_true_f * y_pred_f
  score = (2. * K.sum(intersection) + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
  return 1. - score

Model 2 (BCE Loss)

model2.compile(optimizer='adam', loss=bce_loss, metrics=['accuracy', iou])

image.png

image.png

Model 2 (BCE + IOU Loss)

model2.compile(optimizer='adam', loss=bce_iou_loss, metrics=['accuracy', iou])

image.png

image.png

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0