Edited at

KerasでCNNしたった


最近こっちに統合してます Qiita >> https://qiita.com/yoyoyo_/items/cd5b859341106c3b52f9 Github>> https://github.com/yoyoyo-yo/DeepLearningMugenKnock

kerasでCNNを動かすメモ

DataGeneratorを使った学習方法や自分で画像を読み込んで学習させる方法、テストの方法などをまとめてみた

いろいろ調べたのをまとめた(コピペしていけばできます。解説とかいらないんで早く使いたいって人はプログラム全体をコピペしていって下さい。)


Github

ちょっと使いやすくしたコード↓

https://github.com/nagayosi/cnn_keras


インストール

インストールはただpipをするだけです。ここではtensorflowバックエンドでやります。tensorflowのインストールは https://qiita.com/nagayosi/items/7ecd9ef446d255c7d866

すでにtensorflowが入っている状態で、下記コマンドをします。

pip install keras


作業ディレクトリを作る

とりあえず作業ディレクトリをつくりましょう。今回はkeras-workというディレクトリ内で全部作業します。

$HOME --- keras-work

学習画像とテスト画像を集めたら、こんな感じでおきます。同じクラスに属する画像をディレクトリ毎に分けます。こうするのは後々出てくるImageDataGeneratorの仕様のためです。

$HOME --- keras-work --- Images --- Train --- class1 --- *.jpg

| |- class2 --- *.jpg
|
|- Test --- class1 --- *.jpg
|- class2 --- *.jpg


GPU使用

GPU版を使うときは、そのまま使うとGPUのメモリを全部確保してしまうので、

以下の記述をすることで、GPUのメモリ確保を必要な分だけにする。

こちらを参考にしました。https://qiita.com/namakemono/items/12ad8a9f6d0561929056

import tensorflow as tf

from keras import backend as K
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list="0"
sess = tf.Session(config=config)
K.set_session(sess)


ネットワーク定義

ネットワークの定義はこんな感じ。

まず必要なものをインポートします。

from keras.models import Sequential, Model

from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D

まずはSequential()でインスタンスを作り、add()メソッドでlayerを追加するだけ! caffeとかchainerより簡単すぐる、、、

一番最初のConv2Dにはinput_shapeという引数を入れます。これによって入力サイズを定義します。

今回は2クラス分類とするので、最終層の出力はnum_classes=2となります。

入力サイズは64x64にします。

num_classes = 2

img_height, img_width = 64, 64

def Mynet():
model = Sequential()
model.add(Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(img_height, img_width, 3)))
model.add(Conv2D(32, (3, 3), padding='same', activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), padding='same', activation='relu'))
model.add(Conv2D(64, (3, 3), padding='same', activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())
model.add(Dense(512, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes))
model.add(Activation('softmax'))

return model


学習の準備

ここから学習するための準備をしていきます。

まずkerasをインポートします。

import keras

次にさっき定義したネットワークモデルのインスタンスを作ります。

model = MyNet()

つぎに各層のパラメータを学習させるように設定します。

for layer in model.layers:

layer.trainable = True

次にモデルをコンパイルします。ここでのコンパイルはoptimizerのセットのことを言います。今回はSGDを設定します。lossは要はタスクの種類です。今回は多クラス分類なのでcategorical_crossentropyです。lrは学習率、decayは重み減衰、momentumはモーメンタムを表します。

model.compile(loss='categorical_crossentropy',

optimizer=keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
metrics=['accuracy'])

つぎに学習データ、評価データを用意しますが、今回はImageDataGeneratorという予めkerasで用意された関数を使います。

まずはimportします。

from keras.preprocessing.image import ImageDataGenerator

あとは関数を呼びます。学習用と評価用でそれぞれ呼び出します。ImageDataGeneratorを使うと色んな前処理(切り抜き、左右反転、zca白色化)を自動的にやってくれるので便利です。

rescaleはデータを何倍するか、つまり今回は0~255を0~1 に正規化したいので1/255となります。

train_datagen = ImageDataGenerator(

rescale=1. / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
featurewise_center=True,
featurewise_std_normalization=True,
zca_whitening=True
)

test_datagen = ImageDataGenerator(
rescale=1. / 255,
featurewise_center=True,
featurewise_std_normalization=True,
zca_whitening=True
)

次にImageDataGeneratorに学習データを実際に通します。これはflow_from_directoryを使います。最初の引数はデータがあるディレクトリです(パスに注意)。target_sizeでリサイズも行います。バッチサイズは後から出てくる学習の1epochでのバッチサイズです。

train_generator = train_datagen.flow_from_directory(

'Images/Train/',
target_size=(img_height, img_width),
batch_size=30,
class_mode='categorical')

validation_generator = test_datagen.flow_from_directory(
'Images/Test/',
target_size=(img_height, img_width),
batch_size=1,
class_mode='categorical')


学習させるお

実際に学習させます。fit_generatorをするだけです。最初の引数は学習データのImageDataGeneratorです。

- steps_per_epochは1エポック内で何回ミニバッチを回すかということです。ここらへんのエポックとかの概念がcaffeやchainerとは違います。

- epochsは上の1エポックを何エポックするかということ。

- validation_dataには評価用のImageDataGeneratorをいれます。

- validation_stepsは評価を何エポックでするかです。

history = model.fit_generator(

train_generator,
steps_per_epoch=50,
epochs=100,
validation_data=validation_generator,
validation_steps=50
)

これで学習が終わるので、あとはモデルを保存しましょう。拡張子は.h5です。

model.save('MyNet.h5')

以上で終わりです。おつです。


テストするお

学習したモデルで、実際にテストします。

まずはモデルのインスタンスを呼びます。

model = Mynet()

つぎに学習済みパラメータのファイルを読み込みます。

model.load_weights('MyNet.h5')

とりあえずテスト画像を1枚読み込みます。

from PIL import Image

file_path = 'Images/Test/class1/img.jpg'
img = Image.open(filepath).convert('RGB') ## Gray->L, RGB->RGB
img = img.resize((img_width, img_height))
x = np.array(img, dtype=np.float32)
x = x / 255.

でネットワーク入力の形に合わせます。

x = x[None, ...]

ネットワークに入力して、出力を得るには、predictを使います。あとはcaffeやchainerと同様、最大クラスの予測確率とインデックスを得ます。

pred = model.predict(x, batch_size=1, verbose=0)

score = np.max(pred)
pred_label = np.argmax(pred)

以上です。


layerのパラメータの可視化

layerのweight, biasを可視化するために使う

def visualize_parameter(model, layer_num=1):

W = model.layers[layer_num].get_weights()[0].transpose(3,2,0,1)

B = model.layers[layer_num].get_weights()[1].transpose(3,2,0,1)
nb_filter, nb_channnel, nb_row, nb_col = W.shape

line_num = math.ceil(math.sqrt(nb_filter))

plt.figure()
for i in range(nb_filter):
im = W[i, 0]
scaler = MinMaxScaler(feature_range=(0, 255))
im = scaler.fit_transform(im)

plt.subplot(line_num, line_num, i+1)
plt.axis('off')
plt.imshow(im, cmap='gray')
plt.show()


中間層の可視化

画像を入力した時に中間層の出力を可視化する関数

まずこの関数でmodelの中間層までをコピーするらしい

def get_inter_layer(model, layer_num=16):

layers = model.layers[:layer_num]

output = model.layers[15]

print('\nvisualize intermediate layer <-- {}'.format(output.name))

model2 = Sequential()
for layer in layers:
model2.add(layer)

return model2

次に画像を入力してforwardする

output = model2.predict(x, batch_size=1, verbose=0)

この出力を次の関数で可視化する

def visualize_inter_layer(layer):

layer = layer[0,:,:,:]

filter_num = layer.shape[2]

line_num = math.ceil(math.sqrt(filter_num))

plt.figure()
plt.figure(figsize=(15,15))
plt.subplots_adjust(left=0.001, right=0.999, top=0.999, bottom=0.001, hspace=0.01, wspace=0.01)

for i in range(filter_num):
im = layer[:,:,i]
scaler = MinMaxScaler(feature_range=(0, 255))
im = scaler.fit_transform(im)

plt.subplot(line_num, line_num, i+1)
plt.axis('off')
plt.imshow(im, cmap='jet')
plt.show()


プログラム全体


import

とりあえず今回使うimport関係はこちら

import keras

from keras import optimizers
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, Add, Input, Multiply, Concatenate

import os, glob, random
from PIL import Image
import pickle
import numpy as np


ハイパーパラメータ関係

ミニバッチサイズとか諸々のハイパーパラメータ関係はこちら

batch_size = 10

num_classes = 2
epochs = 50
data_augmentation = True
num_predictions = 20
#save_dir = os.path.join(os.getcwd(), 'saved_models')
model_path = 'model0.h5'

img_width = 128
img_height = 128

train_imgs = '../Images/Train/'
test_imgs = '../Images/Test/'

class_label = ['class1', 'class2']


ネットワーク定義

ネットワークの定義はこんな感じ

まずはSequential()でインスタンスを作り、、add()メソッドでlayerを追加するだけ! caffeとかchainerより簡単すぐる、、、

from keras.models import Sequential, Model

from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, Add, Input, Multiply, Concatenate

def MyNet():
model = Sequential()
model.add(Conv2D(32, (3, 3), padding='same', input_shape=(img_height, img_width, 3)))
model.add(Activation('relu'))
model.add(Conv2D(32, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes))
model.add(Activation('softmax'))

return model


学習の関数(DataGenerator使う時)

DataGeneratorっていうのを使うと、pre-processようは事前処理を簡単に実行できる。というか引数を変えるだけで全部やってくれる、便利すぐる、、、、

とりあえず学習の流れはこんな感じ(コピペでおk!)

from keras import optimizers

from keras.preprocessing.image import ImageDataGenerator

def train():
print('\ntrain start\n')

## define model architecture
model = Mynet()

## visualize model
model.summary()

for layer in model.layers:
layer.trainable = True

model.compile(loss='categorical_crossentropy',
optimizer=keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
metrics=['accuracy'])

early_stopping = keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, verbose=1, mode='max')
check_point = keras.callbacks.ModelCheckpoint(
filepath = os.path.join('model{epoch:02d}-vacc{val_acc:.2f}.hdf5'),
monitor='val_acc', verbose=0, save_best_only=True, mode='max')

train_datagen = ImageDataGenerator(
rescale=1. / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
featurewise_center=True,
featurewise_std_normalization=True,
zca_whitening=True
)

test_datagen = ImageDataGenerator(
rescale=1. / 255,
featurewise_center=True,
featurewise_std_normalization=True,
zca_whitening=True
)

train_generator = train_datagen.flow_from_directory(
train_imgs,
target_size=(img_height, img_width),
batch_size=batch_size,
class_mode='categorical')

validation_generator = test_datagen.flow_from_directory(
test_imgs,
target_size=(img_height, img_width),
batch_size=1,
class_mode='categorical')

history = model.fit_generator(
train_generator,
steps_per_epoch=50,
epochs=epochs,
validation_data=validation_generator,
validation_steps=50,
callbacks=[check_point]
)

# Save model and weights
#if not os.path.isdir(save_dir):
# os.makedirs(save_dir)
#model_path = os.path.join(save_dir, model_name)
model.save(model_path)
print('\nSaved trained model at --> %s ' % model_path)
print('\ntrain end\n')


説明

この中でDataGeneratorはこの部分

train_datagen = ImageDataGenerator(

rescale=1. / 255,
shear_range=0.2,
zoom_range=0.2,
horizontal_flip=True,
featurewise_center=True,
featurewise_std_normalization=True,
zca_whitening=True
)

引数に値をセットすることで値の正規化(rescale)、ZCA白色化(zca_whitening)とかをやってくれる

んで、これのflow_from_directory()を呼ぶことで学習データのディレクトリを指定する!

train_generator = train_datagen.flow_from_directory(

train_imgs,
target_size=(img_height, img_width),
batch_size=batch_size,
class_mode='categorical')


  • ここでのtrain_imgsは学習データがあるディレクトリのパス

  • target_size()は画像をリサイズ

  • batch_sizeはバッチサイズ

  • class_modeはクラスの指定方法
    ちなみに学習データのディレクトリはクラスごとにフォルダを分けなければだめなんで注意!!
    例えばここではtrain_imgs='../Images/Train/'とやってるので次図の感じでディレクトリをつくなければだめ

Images --- Train --- Class1 --- img1_1.jpg

  | |- img1_2.jpg
          |        :
           |- Class2 --- img2_1.jpg


学習の関数(DataGenerator使わない時)

DataGeneratorを使えばpre-processは全部自動でやってくれるけど、自分でやりたい時のための方法

こっちは学習データをロードするための関数


画像をロードする関数

def load_images_all(dataset_path, shuffle=False):

print('\tload images <-- {}'.format(dataset_path))
if not os.path.exists(dataset_path):
raise Exception('{} not exists'.format(dataset_path))

cls_dirs = os.listdir(dataset_path)
cls = 0
imgs = []
labels = []
filepaths = []

for cls_dir in cls_dirs:
if not os.path.isdir(dataset_path + '/' + cls_dir): continue
_imgs, _labels, _filepaths = load_images(dataset_path + '/' + cls_dir, cls)
imgs += _imgs
labels += _labels
filepaths += _filepaths
cls += 1

imgs = np.array(imgs)
labels = np.array(labels)
filepaths = np.array(filepaths)

if shuffle:
s = np.arange(imgs.shape[0])
np.random.shuffle(s)
imgs = imgs[s]
labels = labels[s]
filepaths = filepaths[s]
print('\tloaded images\n')
return imgs, labels, filepaths

## this is used in load_images_all
def load_images(dataset_path, label, shuffle=False):
filepaths_jpg = glob.glob(dataset_path + '/*.jp*g')
filepaths_png = glob.glob(dataset_path + '/*.png')
filepaths = filepaths_jpg + filepaths_png
filepaths.sort()
datasets = []
labels = []

for filepath in filepaths:
img = Image.open(filepath).convert('RGB') ## Gray->L, RGB->RGB
img = img.resize((img_width, img_height))
#label = int(filepath.split('/')[-1].split('_')[0])

x = np.array(img, dtype=np.float32)
x = x / 255.
#x = x.reshape(3, INPUT_HEIGHT, INPUT_WIDTH)
#t = np.array(label, dtype=np.int32)
t = label
datasets.append(x)
labels.append(t)
if shuffle: random.shuffle(datasets)

return datasets, labels, filepaths


こっちは学習を実行する関数


学習する関数

def train():

print('\ntrain start\n')

## load train and test images
x_train, y_train, filepaths_train = load_images_all(train_imgs, shuffle=True)
x_test, y_test, filepaths_test = load_images_all(test_imgs)

print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

## define model architecture
model = Mynet()

## visualize model
model.summary()

for layer in model.layers:
layer.trainable = True

model.compile(loss='categorical_crossentropy',
optimizer=keras.optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
metrics=['accuracy'])

early_stopping = keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, verbose=1, mode='max')
check_point = keras.callbacks.ModelCheckpoint(
filepath = os.path.join('model0','model{epoch:02d}-vacc{val_acc:.2f}.hdf5'),
monitor='val_acc', verbose=0, save_best_only=True, mode='max')

history = model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
validation_data=(x_test, y_test),
shuffle=True,
callbacks=[early_stopping, check_point]
)

# Save model and weights
#if not os.path.isdir(save_dir):
# os.makedirs(save_dir)
#model_path = os.path.join(save_dir, model_name)
model.save(model_path)
print('\nSaved trained model at --> %s ' % model_path)
print('\ntrain end\n')



説明

まずは

x_train, y_train, filepaths_train = load_images_all(train_imgs, shuffle=True)

x_test, y_test, filepaths_test = load_images_all(test_imgs)

で学習画像とアノテーションラベルを取り出す。学習データのディレクトリ構造はDataGeneratorのとこで書いたのと同じで!

次にkeras.utils.to_categorical()でアノテーションラベルの形式を設定する

y_train = keras.utils.to_categorical(y_train, num_classes)

y_test = keras.utils.to_categorical(y_test, num_classes)

あとはmodelに適用させるだけ!!

history = model.fit(x_train, y_train,

batch_size=batch_size,
epochs=epochs,
validation_data=(x_test, y_test),
shuffle=True,
callbacks=[early_stopping, check_point]
)


テストのための関数

学習したらテストでしょ!!ってことでテスト用の関数を用意!!

ほとんど学習の時と変わらない、、

def test():

print('\ntest start\n')
model = Mynet()
#model.summary()
model.load_weights(model_path)

x_test, y_test, paths = load_images_all(test_imgs)
#y_test = keras.utils.to_categorical(y_test, num_classes)

# Evaluatemodel with test data set and share sample prediction results
#evaluation = model.evaluate_generator((x_test, y_test),
# steps=x_test.shape[0] // batch_size)

count = 0
total = x_test.shape[0]
print('---------------------------------------------------------------------')
print('evaluate start')
print('---------------------------------------------------------------------')
for index, x in enumerate(x_test):
x = x[None, ...]
pred = model.predict(x, batch_size=1, verbose=0)
score = np.max(pred)
pred_label = np.argmax(pred)

gt = y_test[index]

if pred_label == gt: count += 1

print(' {} : gt {} / pred {}'.format(paths[index], gt, pred_label))

print('--------------------------------------------------------------------')
print('accuracy {} ({} / {})'.format(1.0*count/total, count, total))

print('\ntest end\n')


用意されたモデルを使う

https://keras.io/ja/applications/

keras.applicationsには有名なネットワークが用意されている。

VGG16, Inception, ResNetとか。

これを使うにはモデル定義をこうすればいい。

weights=Noneはランダム初期化、weight='imagenet'は事前学習モデルを使う。

include_topはMLP3層を含むかどうか(Falseで含まない)

この場合はconvolutionだけimagenetのpretrainedモデルを使い、その後はランダム初期化したMLPをつなげている。

def model():

from keras.applications.vgg16 import VGG16
model = VGG16(include_top=False,
weights='imagenet',
input_shape=(128, 128, 3))

last = model.output
x = Flatten()(last)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(cf.Class_num, activation='softmax')(x)
model = Model(model.input, x)
return model


ResNet50

def model():

from keras.applications.resnet50 import ResNet50
model = ResNet50(include_top=False,
weights='imagenet',
input_shape=(256, 256, 3))

last = model.output
x = Flatten()(last)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(4096, activation='relu')(x)
x = Dropout(0.5)(x)
x = Dense(cf.Class_num, activation='softmax')(x)
model = Model(model.input, x)
return model

ResNet50を使うとき、プーリングで中間マップのサイズがすこしずれるので、以下の修正をしたほうがよさそう。

anaconda3/lib/python3.6/site-packages/keras_applications/resnet50.py

このファイルの219行目

x = layers.MaxPooling2D((3, 3), strides=(2, 2))(x)

x = layers.MaxPooling2D((3, 3), strides=(2, 2), padding='same')(x)

に変える。(MaxPoolingの引数にpadding='same'を追加)


バックエンドの確認

ホームに .keras という隠しディレクトリができている。

その中の、 .keras/keras.json

にバックエンドなどの設定が書かれている。


エラー&警告


WARNING: AVX2 FMA

I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA

これをコードに記述

import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

GPU使用の記述と合わせて、

import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import tensorflow as tf
from keras import backend as K
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list="0"
sess = tf.Session(config=config)
K.set_session(sess)


メモリ解放

from keras import backend as K

K.clear_session()
tf.reset_default_graph()