Library
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import zipfile
import io
import re
from PIL import Image
import time
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
import tensorflow as tf
from keras import backend as K
from keras.models import Sequential, Model
from keras.layers import Input, Dense, Dropout, Flatten, BatchNormalization
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D
from keras.layers import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from keras.models import load_model
from keras.applications.mobilenet import MobileNet
#from keras.applications.resnet50 import ResNet50
np.random.seed(10)
tf.set_random_seed(10)
Data
%%time
zip_path_train = '/content/gdrive/My Drive/xxx/data/train.zip'
#zip_path_test = '/content/gdrive/My Drive/xxx/data/test.zip'
path = '/content/gdrive/My Drive/xxx/data/'
with zipfile.ZipFile(zip_path_train, 'r') as zf_train:
flist_train = zf_train.namelist()
#with zipfile.ZipFile(zip_path_test, 'r') as zf_test:
# flist_test = zf_test.namelist()
print ('size train: ', len(flist_train))
#print ('size test: ', len(flist_test))
%%time
train_b_p = []
train_a_p = []
train_b_n = []
train_a_n = []
for i in tqdm(range(len(flist_train))):
temp = flist_train[i]
if ('PALSAR/before/positive' in temp):
train_b_p.append(temp)
elif ('PALSAR/after/positive' in temp):
train_a_p.append(temp)
elif ('PALSAR/before/negative' in temp):
train_b_n.append(temp)
elif ('PALSAR/after/negative' in temp):
train_a_n.append(temp)
else:
pass
train_b_p = train_b_p[1:]
train_a_p = train_a_p[1:]
train_b_n = train_b_n[1:]
train_a_n = train_a_n[1:]
selected_idx = np.random.RandomState(seed=10).permutation(len(train_b_n))[:len(train_b_p)*5]
%%time
zippath = '/content/gdrive/My Drive/xxx/data/train.zip'
img_b_p = []
img_a_p = []
img_b_p_90 = []
img_a_p_90 = []
img_b_p_270 = []
img_a_p_270 = []
img_b_n = []
img_a_n = []
with zipfile.ZipFile(zippath) as z:
for i in tqdm(range(len(train_b_p))):
imgname_b_p = train_b_p[i]
imgname_a_p = train_a_p[i]
img_b_p.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p)))))
img_a_p.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p)))))
img_b_p_90.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p))).rotate(90)))
img_a_p_90.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p))).rotate(90)))
img_b_p_270.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_p))).rotate(270)))
img_a_p_270.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_p))).rotate(270)))
for i in tqdm(selected_idx):
imgname_b_n = train_b_n[i]
imgname_a_n = train_a_n[i]
img_b_n.append(np.array(Image.open(io.BytesIO(z.read(imgname_b_n)))))
img_a_n.append(np.array(Image.open(io.BytesIO(z.read(imgname_a_n)))))
temp_b_p = np.array(img_b_p)
temp_a_p = np.array(img_a_p)
temp_b_p_flip_v = temp_b_p[:, ::-1, :]
temp_a_p_flip_v = temp_a_p[:, ::-1, :]
temp_b_p_flip_h = temp_b_p[:, :, ::-1]
temp_a_p_flip_h = temp_a_p[:, :, ::-1]
x_train_b_p_0 = np.expand_dims(np.array(img_b_p), axis=3)
x_train_b_p_90 = np.expand_dims(np.array(img_b_p_90), axis=3)
x_train_b_p_270 = np.expand_dims(np.array(img_b_p_270), axis=3)
x_train_b_p_flip_v = np.expand_dims(temp_b_p_flip_v, axis=3)
x_train_b_p_flip_h = np.expand_dims(temp_b_p_flip_h, axis=3)
x_train_b_p = np.concatenate((x_train_b_p_0, x_train_b_p_90, x_train_b_p_270,
x_train_b_p_flip_v, x_train_b_p_flip_h), axis=0)
x_train_a_p_0 = np.expand_dims(np.array(img_a_p), axis=3)
x_train_a_p_90 = np.expand_dims(np.array(img_a_p_90), axis=3)
x_train_a_p_270 = np.expand_dims(np.array(img_a_p_270), axis=3)
x_train_a_p_flip_v = np.expand_dims(temp_a_p_flip_v, axis=3)
x_train_a_p_flip_h = np.expand_dims(temp_a_p_flip_h, axis=3)
x_train_a_p = np.concatenate((x_train_a_p_0, x_train_a_p_90, x_train_a_p_270,
x_train_a_p_flip_v, x_train_a_p_flip_h), axis=0)
x_train_b_n = np.expand_dims(np.array(img_b_n), axis=3)
x_train_a_n = np.expand_dims(np.array(img_a_n), axis=3)
x_train_b = np.concatenate((x_train_b_p, x_train_b_n), axis=0)
x_train_a = np.concatenate((x_train_a_p, x_train_a_n), axis=0)
n_x_train_p = len(x_train_b_p)
n_x_train_n = len(x_train_b_n)
x_train_b_norm = x_train_b / 65535.
x_train_a_norm = x_train_a / 65535.
y_train_p = np.ones((n_x_train_p, 1), dtype=np.int)
y_train_n = np.zeros((n_x_train_n, 1), dtype=np.int)
y_train = np.concatenate((y_train_p, y_train_n), axis=0)
index_p = np.random.randint(len(train_b_p))
index_n = np.random.randint(len(train_b_n))
ex_b_p = img_b_p[index_p]
ex_a_p = img_a_p[index_p]
ex_b_n = img_b_n[index_n]
ex_a_n = img_a_n[index_n]
fig = plt.figure(figsize = (5, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.imshow(ex_b_p, cmap='gray')
ax1.set_title('Before (positive)')
ax1.set_axis_off()
ax2 = fig.add_subplot(1, 2, 2)
ax2.imshow(ex_a_p, cmap='gray')
ax2.set_title('After (positive)')
ax2.set_axis_off()
plt.show()
fig = plt.figure(figsize = (5, 3))
ax1 = fig.add_subplot(1, 2, 1)
ax1.imshow(ex_b_n, cmap='gray')
ax1.set_title('Before (negative)')
ax1.set_axis_off()
ax2 = fig.add_subplot(1, 2, 2)
ax2.imshow(ex_a_n, cmap='gray')
ax2.set_title('After (negative)')
ax2.set_axis_off()
plt.show()
x_train_b, x_val_b, x_train_a, x_val_a, y_train, y_val = train_test_split(x_train_b_norm,
x_train_a_norm,
y_train, test_size=0.2,
stratify=y_train,
random_state=10)
Model 1
digit_input = Input(shape=(40, 40, 1))
x = Conv2D(32, (3, 3), activation='relu', padding='same')(digit_input)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.25)(x)
out = Dense(1, activation='sigmoid')(x)
vision_model = Model(digit_input, out)
digit_b = Input(shape=(40, 40, 1), name='before')
digit_a = Input(shape=(40, 40, 1), name='after')
# The vision model will be shared, weights and all
out_b = vision_model(digit_b)
out_a = vision_model(digit_a)
concatenated = concatenate([out_b, out_a])
out = Dense(1, activation='sigmoid')(concatenated)
classification_model = Model([digit_b, digit_a], out)
vision_model.summary()
#classification_model.summary()
classification_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
epochs = 10
batch_size = 64
callbacks = [
EarlyStopping(patience=3, verbose=1),
ReduceLROnPlateau(patience=3, verbose=1)
]
history = classification_model.fit({'before': x_train_b, 'after': x_train_a}, y_train,
batch_size=batch_size, epochs=epochs, callbacks=callbacks,
validation_data=({'before': x_val_b, 'after': x_val_a}, y_val),
verbose=1)
plt.figure(figsize =(5,3))
plt.plot(history.history['loss'], marker='.', label='train')
plt.plot(history.history['val_loss'], marker='.', label='validation')
plt.title('Loss')
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(loc='best')
plt.show()
plt.figure(figsize =(5,3))
plt.plot(history.history['acc'], marker='.', label='train')
plt.plot(history.history['val_acc'], marker='.', label='validation')
plt.title('Accuracy')
plt.grid(True)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend(loc='best')
plt.show()
Model 2
digit_b = Input(shape=(40, 40, 1), name='before')
digit_a = Input(shape=(40, 40, 1), name='after')
concatenated = concatenate([digit_b, digit_a])
x = Conv2D(32, (3, 3), activation='relu', padding='same')(concatenated)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(256, activation='relu')(x)
x = Dropout(0.25)(x)
out = Dense(1, activation='sigmoid')(x)
model2 = Model([digit_b, digit_a], out)
model2.summary()
model2.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
epochs = 10
batch_size = 64
callbacks = [
EarlyStopping(patience=3, verbose=1),
ReduceLROnPlateau(patience=3, verbose=1)
]
history = model2.fit({'before': x_train_b, 'after': x_train_a}, y_train,
batch_size=batch_size, epochs=epochs, callbacks=callbacks,
validation_data=({'before': x_val_b, 'after': x_val_a}, y_val),
verbose=1)
Data 2 (imbalanced, positive:negative = 1:4)
selected_idx = np.random.RandomState(seed=10).permutation(len(train_b_n))[:len(train_b_p)*20]
Loss functions
def precision(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
pr = true_positives / (predicted_positives + K.epsilon())
return pr
def recall(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
re = true_positives / (possible_positives + K.epsilon())
return re
def f1(y_true, y_pred):
pr = precision(y_true, y_pred)
re = recall(y_true, y_pred)
return 2*((pr * re)/(pr + re + K.epsilon()))
def iou(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
false_positives = K.sum(K.round(K.clip((1-y_true) * y_pred, 0, 1)))
false_negatives = K.sum(K.round(K.clip(y_true * (1-y_pred), 0, 1)))
io = true_positives / (true_positives + false_positives +
false_negatives + K.epsilon())
return io
def iou_loss(y_true, y_pred):
true_positives = K.mean(K.clip(y_true * y_pred, 0, 1), axis=-1)
false_positives = K.mean(K.clip((1-y_true) * y_pred, 0, 1), axis=-1)
false_negatives = K.mean(K.clip(y_true * (1-y_pred), 0, 1), axis=-1)
io = true_positives / (true_positives + false_positives +
false_negatives + K.epsilon())
return 1-io
def focal_loss(gamma=2., alpha=.25):
def focal_loss_fixed(y_true, y_pred):
pt_1 = tf.where(tf.equal(y_true, 1), y_pred, tf.ones_like(y_pred))
pt_0 = tf.where(tf.equal(y_true, 0), y_pred, tf.zeros_like(y_pred))
ret = -K.sum(alpha * K.pow(1. - pt_1, gamma) *
K.log(pt_1)) -K.sum((1-alpha) * K.pow( pt_0, gamma) *
K.log(1. - pt_0))
return ret
return focal_loss_fixed
def bce_loss(y_true, y_pred):
return K.mean(K.binary_crossentropy(y_true, y_pred), axis=-1)
def bce_iou_loss(y_true, y_pred):
return bce_loss(y_true, y_pred) + iou_loss(y_true, y_pred)
def dice_loss(y_true, y_pred):
smooth = 1.
y_true_f = K.flatten(y_true)
y_pred_f = K.flatten(y_pred)
intersection = y_true_f * y_pred_f
score = (2. * K.sum(intersection) + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
return 1. - score
Model 2 (BCE Loss)
model2.compile(optimizer='adam', loss=bce_loss, metrics=['accuracy', iou])
Model 2 (BCE + IOU Loss)
model2.compile(optimizer='adam', loss=bce_iou_loss, metrics=['accuracy', iou])