7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Numpyスクラッチ実装】MobileNet V1

Posted at

MobileNetの構造についてはこちらにまとめておきました。

概要

  • 複雑な深層学習モデル構築はKerasやPyTorch等を利用した実装が早くて楽
  • でもそれではつまらない
  • 簡単なモデルであればアマチュアスクラッチでも動きそう
  • なので最低限必要となるtrain_test_split, OneHotEncoder, MobileNet V1を実装
  • 使用ライブラリはNumpy, Matplotlib, Pyprind(学習進捗バー表示用)のみ
  • データはCIFAR-10を使用

洗練された深層学習ライブラリによるモデルの機能や構造とは異なる点が複数あります。
あくまで論文を元にしたスクラッチ学習・モデル理解を目的とした実装ですので、「モデルを使って何かしたい」という場合は素直に深層学習ライブラリを利用した方が無難です。
Python機械学習プログラミングゼロから作るDeepLearningのコードを適宜利用させていただいております。

対象読者

「本で読んだスクラッチ実装だけでは何か物足りない」というただ一点において共感していただける方々に有益な情報かと思います。
やはりスクラッチは面白いですね。
この調子でV2, V3も(時間が許す限り)実装していきたいです!

非対象読者

挙げきれないですが、ざっくり以下の通りでしょうか...。

  • 深層学習ライブラリと併用したい
  • Edge TPUで使いたい
  • 転移学習したい
  • include_top=Falseにして簡単に別の層群に付け替えたい
  • 学習したモデル、重みを保存したい

注意点

  • エラー処理の詳細な条件検討はしていません。
  • ReLU6は未実装(単純な失念だけど、いずれにせよ量子化・軽量化まで考えると実装は難しそう)
  • 重みの保存くらいは将来できそうだが未実装
  • うまく動いているだけで、実装としては間違っている場合はご指摘ください。

評価結果

先に評価結果を出しておきます。
モデルとしてちゃんと動くかどうかをまずは見たかったので、とりあえず精度は度外視し、データサンプル数は全体のごく一部しか使用していません。
パラメータ調整も適当です。
データや実験方法に関してはこちらを参考にしています。
Google Colaboratoryでやってますが、メモリが少なすぎるのがきついですね。無料なので文句言えませんが...。

SGD evaluation (Learning rate: 0.01)

データ数

訓練データ数:2400
検証データ数:600

モデル構造

※実際には全層が出力表示されます。
スクリーンショット 2019-09-02 19.07.24.png
スクリーンショット 2019-09-02 19.31.23.png
スクリーンショット 2019-09-02 19.08.03.png

学習の推移

sgd.png

Adam evaluation (Learning rate: 0.001)

データ数

訓練データ数:2400
検証データ数:600

モデル構造

※実際には全層が出力表示されます。
スクリーンショット 2019-09-02 19.09.55.png
スクリーンショット 2019-09-02 19.31.23.png
スクリーンショット 2019-09-02 19.10.41.png

学習の推移

adam.png

サンプル数が少なすぎるのでグラフが非常に粗いですが、コストが下がり、正解率が上がる傾向は一応見えているので、たぶんうまく実装できているものと思われます。
テストデータの推論時間は、平均で約5.8ms/imageでした。データ数が少ないのでなんとも言えませんが...。
今後は条件を振って、揃えて、スピードや精度を深層学習ライブラリと比較していきたいです。

コード

train_test_split

今回はあまり使っていませんが、scikit-learnのソースコードや実際の出力結果を比較して、だいたい同じ挙動となっていることは確認しています。

train_test_split.py
def train_test_split(X,y,test_size=0.25,
                     random_state=None,shuffle=True,stratify=None):
    """
    Split the data to be learned and tested.

    Parameters
    ----------
    X : ndarray, shape (n_samples, n_features)
      data to be learned and tested
    y : ndarray, shape (n_samples, )
      objective labels
    test_size : float (0<test_size<1)(default: 0.25)
      set the rate of test size
    random_state : int
      set the pseudo-random number to be used in RandomStateGenerator
    shuffle : boolean (default:True)
      shuffle before split or not. If False, set stratify as None.
    stratify : array-like or None
      array for stratified sampling

    Returns
    ----------
    X_train : ndarray, shape (n_samples, n_features)
      data to be learned
    X_test : ndarray, shape (n_samples, n_features)
      data to be tested
    y_train : ndarray, shape (n_samples, )
      labels for X_train
    y_test : ndarray, shape (n_samples, )
      labels for X_test
    """
    # Error if feature samples number does not corresponds to y number.
    if X.shape[0] != y.shape[0]:
        raise ValueError("X samples number({}) is not same as y {}.".format(
                X.shape[0], y.shape[0]))
    
    # make several parameters to be used
    n_samples = X.shape[0]
    n_train = np.floor((1-test_size) * n_samples).astype(int)
    n_test = n_samples - n_train
    classes = np.unique(y)
    n_classes = len(classes)
    class_counts = np.bincount(y)
    class_indices = np.split(np.argsort(y, kind='mergesort'),
                             np.cumsum(class_counts)[:-1])
    
    # Case1: Shuffle=False and stratify=None
    if shuffle is False and stratify is None:
        X_test = X[:n_test]
        X_train = X[n_test:(n_test + n_train)]
        y_test = y[:n_test]
        y_train = y[n_test:(n_test + n_train)]
        
        return X_train, X_test, y_train, y_test
    
    # Case2: Shuffle=False and stratify=y
    elif shuffle is False and stratify is not None:
        raise ValueError("If 'shuffle' parameter is False, "
                         "then 'stratify' parameter should be None.")
    
    # Case3: Shuffle=True and stratify=None
    elif shuffle is True and stratify is None:
        rng = np.random.RandomState(seed=random_state)
        # shuffle and split
        permutation = rng.permutation(n_samples)
        ind_test = permutation[:n_test]
        ind_train = permutation[n_test:(n_test + n_train)]
        
        X_train = X[ind_train]
        X_test = X[ind_test]
        y_train = y[ind_train]
        y_test = y[ind_test]
        
        yield X_train
        yield X_test
        yield y_train
        yield y_test
    
    # Case4: Shuffle=True and stratify=y
    else:
        # set a number of samples to be selected per each class
        rng = np.random.RandomState(seed=random_state)
        n_i = _extracting_func(class_counts, n_train, rng)
        class_counts_remaining = class_counts - n_i
        t_i = _extracting_func(class_counts_remaining, n_test, rng)
        
        train = []
        test = []
        
        # select at random which indices should be assigned to train and test set
        for i in range(n_classes):
            permutation = rng.permutation(class_counts[i])
            perm_indices_class_i = class_indices[i].take(
                    permutation,mode='clip')
            train.extend(perm_indices_class_i[:n_i[i]])
            test.extend(perm_indices_class_i[n_i[i]:n_i[i] + t_i[i]])
        
        ind_train = rng.permutation(train)
        ind_test = rng.permutation(test)
        
        X_train = X[ind_train]
        X_test = X[ind_test]
        y_train = y[ind_train]
        y_test = y[ind_test]
        
        yield X_train
        yield X_test
        yield y_train
        yield y_test

def _extracting_func(class_counts, n_draws, rng):
    """
    Internal function for "train_test_split" in "Case4: Shuffle=True and stratify=y"
    
    Stratified sampling at random a certain number(n_draws) of samples 
    from population in class_counts.

    """
    # assign each number of samples to be extracted per each class
    continuous = n_draws * (class_counts / class_counts.sum())
    floored = np.floor(continuous)
    need_to_add = int(n_draws - floored.sum())
    # determine which classes should be added one more because of flooring
    if need_to_add > 0:
        remainder = continuous - floored
        # sort the remaining values in an unascending manner
        values = np.sort(np.unique(remainder))[::-1]
        for value in values:
            inds, = np.where(remainder == value)
            # set the number of value to be added
            add_now = min(len(inds), need_to_add)
            # determine at random where should be added
            inds = rng.choice(inds, size=add_now, replace=False)
            floored[inds] += 1
            # repeat until when 'need to add' becomes 0
            need_to_add -= add_now
            if need_to_add == 0:
                break
    return floored.astype(np.int)

OneHotEncoder

これは直感のままシンプルに書いています。

onehotencoder.py
class OneHotEncoder:
    def fit(self, y):
        self.max_value = np.max(y) + 1
    
    def transform(self, y):
        y_onehot = np.zeros((y.shape[0], self.max_value))
        for i in range(y.shape[0]):
            y_onehot[i, y[i]] += 1.
        return y_onehot
    
    def fit_transform(self, y):
        self.fit(y)
        y_onehot = self.transform(y)
        return y_onehot

MobileNet V1

"MobileNet_v1"の中で"SeparableConvLayers", "ConvLayer", "FullConnLayer"(全結合層)などを呼んでいます。"SeparableConvLayers"の中ではさらに"ConvLayer", "BatchNormalization", "Activator"を呼んでいます。

List of classes and functions

  • class MobileNet_v1
  • class SeparableConvLayers
  • class ConvLayer
  • class BatchNormalization
  • class PoolLayer
  • def im2col
  • def col2im
  • class Flatten
  • class Dropout
  • class FullConnLayer
  • class LearningMethod
  • class SimpleInitializer
  • class Activator
  • class Softmax
  • class GetMiniBatch
mobilenet_v1.py
class MobileNet_v1:
    """
    List of instances (Forward propagation ascending order)
    ----------
    - ConvLayer
    - DepthwiseSeparableConvLayers * 13
    - PoolLayer
    - Flatten
    - Dropout
    - FullConnLayer (without ReLU)
    - Softmax

    Parameters
    ----------
    input_shape : tuple
        Shape of input data(eg. (3, 224, 224))
    alpha : float (0.25, 0.50, 0.75 or 1.0)
        Width of the network (width multiplier) decreasing the number of filters in each layer.
    dropout : float
        dropout rate
    include_top : int
        Whether to include the fully-connected layer at the top of the network.
    pooling : str
        PoolLayer mode ("Max" or "Average")
    n_classes : int
        Number of classes for inference
    learning_method : str
        Optimization method ("SGD", "Momentum", "AdaGrad" or "Adam")
    lr : float
        Learning rate
    sgd_momentum : float
        Momentum parameter in "Momentum SGD"
    adam_beta1 : float
        Beta1 parameter in "Adam"
    adam_beta2 : float
        Beta2 parameter in "Adam"
    batch_size : int
        Mini-batch size
    sigma : str or float
        SD for initial value of weights of each layer ("Xavier", "He" or float value)
    activation : str
        Activation function ("sigmoid", "tanh" or "ReLU")
    gamma : float
        Scaling parameter for normalized data in batchnormalization
    beta : float
        Shifting parameter for normalized data in batchnormalization
    momentum : float
        Momentum for the moving mean and the moving variance in batchnormalization.
    random_seed : int (default : None)
        Pseudo random seed for data shuffling in each epoch
    verbose : bool (default : True)
        Whether to export learning proccess or not

    Attributes
    ----------
    self.train_cost_ : list
        Cost value per epoch for train data(Cross entropy error)
    self.val_cost_ : list
        Cost value per epoch for validation data(Cross entropy error)
    self.train_accuracy_ : list
        Accuracy value per epoch for train data
    self.val_accuracy_ : list
        Accuracy value per epoch for validation data
    
    """
    def __init__(self, input_shape=(3, 224, 224), alpha=1., dropout=1e-3, 
                 pooling="Average", n_classes=1000, 
                 learning_method="SGD", lr=0.01, sgd_momentum=0.9, adam_beta1=0.9, adam_beta2=0.999, 
                 batch_size=10, sigma="Xavier", activation="ReLU", 
                 gamma=1., beta=0., momentum=0.9, 
                 n_epochs = 10, random_seed=0, verbose=True):
        
        self.input_shape = input_shape
        self.alpha = alpha
        if alpha != 0.25 and alpha != 0.50 and alpha != 0.75 and alpha != 1.0:
            raise ValueError("Set alpha to 0.25, 0.50, 0.75 or 1.0 .")
        self.dropout = dropout
        self.pooling = pooling
        self.n_classes = n_classes
        self.learning_method = learning_method
        self.lr = lr
        self.sgd_momentum = sgd_momentum
        self.adam_beta1 = adam_beta1
        self.adam_beta2 = adam_beta2
        self.batch_size = batch_size
        self.sigma = sigma
        self.activation = activation
        self.gamma = gamma
        self.beta = beta
        self.momentum = momentum
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.random_seed = random_seed # For mini-batch training
        self.verbose = verbose
    
    def build(self, show_model=True):
        """
        Building model for classification
        
        Parameters
        ----------
        show_model : bool
            Whether to show the summary of the model built

        """
        # For cost and accuracy value
        self.train_cost_ = []
        self.val_cost_ = []
        self.train_accuracy_ = []
        self.val_accuracy_ = []
        
        # Height of images, number of features(pixels) in an image
        image_size = self.input_shape[1]
        n_features = (image_size**2) * self.input_shape[0]
        
        # Create an optimizer instance
        optimizer = LearningMethod(self.learning_method, self.lr, self.sgd_momentum, self.adam_beta1, self.adam_beta2)
        
        # Create instances of initial layers
        self.ConvLayer = ConvLayer(
            FN=int(32*self.alpha), C=3, FS=3, sigma=self.sigma, n=n_features, optimizer=optimizer, S=2, P=1)
        self.BatchNormalization = BatchNormalization(gamma=self.gamma, beta=self.beta, momentum=self.momentum)
        self.Activator = Activator(self.activation)
        
        # Prepare parameters for a series of SeparableConvLayers
        FN_list = [32,64,64,128,128,128,128,256,256,256,256,512,512,512,512,512,512,512,512,512,512,512,512,1024,1024,1024]
        C_list = [32,32,64,64,128,128,128,128,256,256,256,256,512,512,512,512,512,512,512,512,512,512,512,512,1024,1024]
        FS_list = [3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1,3,1]
        input_size_list = [112,112,112,56,56,56,56,28,28,28,28,14,14,14,14,14,14,14,14,14,14,14,14,7,7,7]
        S_list = [1,1,2,1,1,1,2,1,1,1,2,1,1,1,1,1,1,1,1,1,1,1,2,1,2,1]
        n_list = []
        
        # Adjust parameters with alpha
        for index in np.arange(26):
            FN_list[index] = int(FN_list[index] * self.alpha)
            C_list[index] = int(C_list[index] * self.alpha)
            input_size_list[index] = int(input_size_list[index] * (image_size/224))
            n_list.append(((input_size_list[index])**2) * C_list[index])
            
        # Create instances of a series of SeparableConvLayers
        self.SCLayers = []
        for index in np.arange(0,26,2):
            self.SCLayers.append(SeparableConvLayers(
                optimizer, self.sigma, self.activation, 
                FN_list[index], C_list[index], FS_list[index], n_list[index], S_list[index], 1, 
                FN_list[index+1], C_list[index+1], FS_list[index+1], n_list[index+1], S_list[index+1], 0, 
                self.gamma, self.beta, self.momentum))
        
        # Create instances of final layers
        self.PoolLayer = PoolLayer(PH=input_size_list[-1], PW=input_size_list[-1], pooling=self.pooling)
        self.Flatten = Flatten()
        self.Dropout = Dropout(prob=self.dropout)
        self.FullConnLayer = FullConnLayer(FN_list[-1], self.n_classes, 
                                           SimpleInitializer(self.sigma, FN_list[-1]), optimizer)
        self.Softmax = Softmax()
        
        # If model should be visualized, 
        if show_model:
            Layer_names = ["Conv / s2     ",
                           "Conv dw / s1","Conv / s1     ","Conv dw / s2","Conv / s1     ",
                           "Conv dw / s1","Conv / s1     ","Conv dw / s2","Conv / s1     ",
                           "Conv dw / s1","Conv / s1     ","Conv dw / s2","Conv / s1     ",
                           "Conv dw / s1","Conv / s1     ","Conv dw / s1","Conv / s1     ",
                           "Conv dw / s1","Conv / s1     ","Conv dw / s1","Conv / s1     ",
                           "Conv dw / s1","Conv / s1     ",
                           "Conv dw / s2","Conv / s1     ","Conv dw / s2","Conv / s1     ",
                           "{} Pool / s1".format(self.pooling),"FC / s1       ","Softmax / s1"
                          ]
            Input_channels = [self.input_shape[0], ] + C_list + [C_list[-1], C_list[-1], self.n_classes]
            Input_sizes = [self.input_shape[1], ] + input_size_list + [input_size_list[-1],1,1]
            Output_channels = [int(32*self.alpha), ] + FN_list + [FN_list[-1], self.n_classes, self.n_classes]
            Output_sizes = input_size_list + [input_size_list[-1],1,1,1]
            
            print("------Below are being showed without BatchNormalization, Activator and Flatten layers.------\n")
            print("Layer (stride)             Input Shape           Output Shape        #   ")
            print("==================================================")
            for i in range(30):
                print("{}             ({},{},{})             ({},{},{})".format(
                    Layer_names[i],
                    Input_channels[i],
                    Input_sizes[i],
                    Input_sizes[i],
                    Output_channels[i],
                    Output_sizes[i],
                    Output_sizes[i]))
                if i != 29:
                    print("________________________________________________________________")
            print("==================================================")
            print("shape: (C, H, W)")
        
    def fit(self, X_train, y_train, X_val=None, y_val=None):
        """
        Fitting (training) for classification

        Parameters
        ----------
        X_train : ndarray, shape (n_samples, channels, image_height, image_width)
            Features of train data
        y_train : ndarray, shape (n_samples, )
            Labels of train data
        X_val : ndarray, shape (n_samples, channels, image_height, image_width)
            Features of validation data
        y_val : ndarray, shape (n_samples, )
            Labels of validation data
        """
        # Labels OneHotEncoding
        enc = OneHotEncoder()
        y_train_onehot = enc.fit_transform(y_train[:, np.newaxis])
        y_val_onehot = enc.transform(y_val[:, np.newaxis])
        
        # Set progress bar for training
        pbar = pyprind.ProgBar(self.n_epochs * (np.ceil(X_train.shape[0] / self.batch_size).astype(np.int)))
        
        # Create a random_seeds array
        rgn = np.random.RandomState(self.random_seed)
        random_seeds = np.arange(self.n_epochs)
        rgn.shuffle(random_seeds)
        
        # Train per epoch
        for i in range(self.n_epochs):
            get_mini_batch = GetMiniBatch(X_train, y_train_onehot, 
                                          batch_size=self.batch_size, seed=random_seeds[i])
            
            # Create batch data
            for X_train_batch, y_train_batch in get_mini_batch:
                
                # Forward propagation
                z_out, a_out = self._forward(X_train_batch, train_flg=True)
                
                # Backward propagation
                dx = self.Softmax.backward(a_out, y_train_batch) # including cross entropy error and softmax
                dx = self.FullConnLayer.backward(dx)
                dx = self.Dropout.backward(dx)
                dx = self.Flatten.backward(dx)
                dx = self.PoolLayer.backward(dx)
                for i in np.arange(12, -1, -1):
                    dx = self.SCLayers[i].backward(dx)
                dx = self.Activator.backward(dx)
                dx = self.BatchNormalization.backward(dx)
                dx = self.ConvLayer.backward(dx)
                
                # Update progress bar
                pbar.update()
            
            # Save cost and accuracy of train data per epoch
            z_out, a_out = self._forward(X_train, train_flg=False)
            # Cost value
            value1 = y_train_onehot * (np.log(a_out + 1e-07)) # for ZeroDivision error
            value2 = (1. - y_train_onehot) * np.log(1. - a_out + 1e-07) # for ZeroDivision error
            train_cost = -np.sum(value1 + value2)
            self.train_cost_.append(train_cost)
            # Accuracy
            y_train_pred = self.predict(X_train)
            train_accuracy = (np.sum(y_train == y_train_pred).astype(np.float)) / y_train.shape[0]
            self.train_accuracy_.append(train_accuracy)
            
            # Save as well if there are validation data
            if X_val is not None and y_val is not None:
                z_out, a_out = self._forward(X_val, train_flg=False)
                # Cost value
                value1 = y_val_onehot * (np.log(a_out + 1e-07)) # for ZeroDivision error
                value2 = (1. - y_val_onehot) * np.log(1. - a_out + 1e-07) # for ZeroDivision error
                val_cost = -np.sum(value1 + value2)
                self.val_cost_.append(val_cost)
                # Accuracy
                y_val_pred = self.predict(X_val)
                val_accuracy = (np.sum(y_val == y_val_pred).astype(np.float)) / y_val.shape[0]
                self.val_accuracy_.append(val_accuracy)
        
        # Each cost value should be divided by mean value
        self.train_cost_ /= (sum(self.train_cost_) / len(self.train_cost_))
        if len(self.val_cost_) > 0:
            self.val_cost_ /= (sum(self.val_cost_) / len(self.val_cost_))
        
        # If verbose, export the train proccess
        if self.verbose:
            plt.rcParams["font.size"] = 16
            fig, ax = plt.subplots(1, 2, figsize=(20,9))
            ax[0].plot(np.arange(1, len(self.train_cost_)+1), self.train_cost_, 
                       linewidth=2, linestyle="-", marker="o", color="steelblue", label="Train")
            ax[0].set_xlabel("epoch numbers")
            ax[0].set_ylabel("cost value (averaged)")
            ax[1].plot(np.arange(1, len(self.train_accuracy_)+1), self.train_accuracy_, 
                       linewidth=2, linestyle="-", marker="o", color="steelblue", label="Train")
            ax[1].set_xlabel("epoch numbers")
            ax[1].set_ylabel("accuracy")
            # Export as well if there are validation data
            if len(self.val_cost_) > 0:
                ax[0].plot(np.arange(1, len(self.val_cost_)+1), self.val_cost_, 
                           linewidth=2, linestyle="--", marker="o", color="orangered", label="Validation")
                ax[1].plot(np.arange(1, len(self.val_accuracy_)+1), self.val_accuracy_, 
                           linewidth=2, linestyle="--", marker="o", color="orangered", label="Validation")
            ax[0].legend()
            ax[1].legend()
            plt.show()
    
    def predict(self, X_test):
        z_out, a_out = self._forward(X_test, train_flg=False)
        return np.argmax(z_out, axis=1) # Export max-value index in input data for Softmax layer
    
    def _forward(self, x, train_flg):
        # Forward propagation
        x = self.ConvLayer.forward(x)
        x = self.BatchNormalization.forward(x, train_flg)
        x = self.Activator.forward(x)
        for i in np.arange(13):
            x = self.SCLayers[i].forward(x, train_flg)
        x = self.PoolLayer.forward(x)
        x = self.Flatten.forward(x)
        x = self.Dropout.forward(x)
        z_out = self.FullConnLayer.forward(x)
        a_out = self.Softmax.forward(z_out)
        return z_out, a_out


class SeparableConvLayers:
    """
    List of instances (Forward propagation ascending order)
    ----------
    - ConvLayer (Depthwise)
    - BatchNormalization
    - Activator (ReLU)
    - ConvLayer (Pointwise)
    - BatchNormalization
    - Activator (ReLU)

    Parameters
    ----------
    optimizer : an instance
        An instance of a class "LearningMethod"
    sigma : str or float
        SD for initial value of weights of each layer ("Xavier", "He" or float value)
    activation : str
        Activation function ("sigmoid", "tanh" or "ReLU")
    train_flg : bool
        Whether Batchnormalization layer is in train phase or not
    gamma : float
        Scaling parameter for normalized data
    beta : float
        Shifting parameter for normalized data
    momentum : float
        Momentum for the moving mean and the moving variance in BatchNormalization.

    """
    def __init__(self, optimizer, sigma, activation, 
                 FN_depth, C_depth, FS_depth, n_depth, S_depth, P_depth, 
                 FN_point, C_point, FS_point, n_point, S_point, P_point,
                 gamma, beta, momentum):
        
        # Set each layers
        # Depthwise convolution
        self.DepthwiseConvLayer = ConvLayer(
            FN_depth, C_depth, FS_depth, sigma, n_depth, optimizer, S=S_depth, P=P_depth, depthwise=True)
        self.BatchNormalization_depth = BatchNormalization(
            gamma=gamma, beta=beta, momentum=momentum)
        self.Activator_depth = Activator(activation)
        # Pointwise convolution
        self.PointwiseConvLayer = ConvLayer(
            FN_point, C_point, FS_point, sigma, n_point, optimizer, S=S_point, P=P_point)
        self.BatchNormalization_point = BatchNormalization(
            gamma=gamma, beta=beta, momentum=momentum)
        self.Activator_point = Activator(activation)
    
    def forward(self, x, train_flg):
        x = self.DepthwiseConvLayer.forward(x)
        x = self.BatchNormalization_depth.forward(x, train_flg)
        x = self.Activator_depth.forward(x)
        x = self.PointwiseConvLayer.forward(x)
        x = self.BatchNormalization_point.forward(x, train_flg)
        x = self.Activator_point.forward(x)
        return x
    
    def backward(self, dx):
        dx = self.Activator_point.backward(dx)
        dx = self.BatchNormalization_point.backward(dx)
        dx = self.PointwiseConvLayer.backward(dx)
        dx = self.Activator_depth.backward(dx)
        dx = self.BatchNormalization_depth.backward(dx)
        dx = self.DepthwiseConvLayer.backward(dx)
        return dx

class ConvLayer:
    """
    Convolution layer class
    
    Parameters
    ----------
    FN : int
      Output channels (filter number)
    C : int
      Input channels
    FS : int
      Filter size (number of elements(pixels))
    sigma : str, float or int
      SD for initial value of weights ("Xavier", "He" or float value)
    n : int
      Total node number in previous layer or number of features (input image)
    optimizer : instance
      An instance of a class "LearningMethod"
    S : int (default: 1)
      Stride
    P : int (default: 0)
      Padding
    depthwise : bool
      Whether depthwise convolution or not
    
    Attributes
    ----------
    self.z_shape : input dimensions
      Input data dimensions preserved for backward propagation
    
    """
    def __init__(self, FN, C, FS, sigma, n, optimizer, S=1, P=0, depthwise=False):
        self.S = S
        self.P = P
        self.optimizer = optimizer
        self.depthwise = depthwise
        
        # Set sigma to float value
        if (type(sigma) is float) or (type(sigma) is int):
            self.sigma = float(sigma)
        elif sigma == "Xavier":
            self.sigma = np.sqrt(1. / n)
        elif sigma == "He":
            self.sigma = np.sqrt(2. / n)
        else:
            raise ValueError("Set \"Xavier\" or \"He\" or numerical value (float or int).")
        
        # self.w and self.b initialization
        self.w = self.sigma * np.random.randn(FN, C, FS, FS)
        # Mask weights when depthwise convolution
        if self.depthwise:
            self.mask = np.zeros((FN, C, FS, FS))
            for mask in range(FN):
                # FN = C when depthwise conv
                self.mask[mask, mask, :, :] += 1
            self.w *= self.mask
            
        self.b = np.zeros(FN)
        
        # Create parameter "v" when Momentum optimization
        if self.optimizer.learning_method == "Momentum":
            self.v_w = np.zeros((FN, C, FS, FS))
            self.v_b = np.zeros(FN)
        
        # Create parameter "h" for learning coefficients decay when AdaGrad optimization
        elif self.optimizer.learning_method == "AdaGrad":
            self.h_w = np.zeros((FN, C, FS, FS))
            self.h_b = np.zeros(FN)
        
        # Create parameter "m" and "v" when Adam optimization
        elif self.optimizer.learning_method == "Adam":
            self.m_w = np.zeros((FN, C, FS, FS))
            self.v_w = np.zeros((FN, C, FS, FS))
            self.m_b = np.zeros(FN)
            self.v_b = np.zeros(FN)
        
    def forward(self, z):
        FN, C, FH, FW = self.w.shape # (Output channels, Input channels, Filter height, filter width)
        N, C, H, W = z.shape # (Number of batch data, Input channels, Input image height, Input image width)
        OH = self._conv_func(H, self.P, FH, self.S) # Output height
        OW = self._conv_func(W, self.P, FW, self.S) # Output width
        
        col = im2col(z, FH, FW, self.S, self.P)
        col_w = self.w.reshape(FN, -1).T
        
        # Inner product (N * OH * OW, C * FH * FW) and (C * FS * FS, FN) to (N * OH * OW, FN)
        a = np.dot(col, col_w) + self.b
        # Split N * OH * OW into each and transpose to "NCHW" standard shape
        a = a.reshape(N, OH, OW, -1).transpose(0, 3, 1, 2)
        
        # Save for backward propagation
        self.z_shape = z.shape
        self.col = col # N, OH, OW in axis 0, C, FH, FW in axis 1
        self.col_w = col_w # shape (C * FS * FS, FN)
        
        return a  
    
    def backward(self, grad_a):
        FN, C, FH, FW = self.w.shape # (Output channels, Input channels, Filter height, filter width)
        N, C, H, W = self.z_shape # (Number of batch data, Input channels, Input image height, Input image width)
        
        # Traspose to (N, OH, OW, FN) and reshape to (N * OH * OW, FN)
        grad_a = grad_a.transpose(0,2,3,1).reshape(-1, FN)
        
        # b gradient calculation
        self.grad_b = np.sum(grad_a, axis=0)
        
        # w gradient calculation
        # Inner product (C * FH * FW, N * OH * OW) and (N * OH * OW, FN) to (C * FH * FW, FN)
        self.grad_w = np.dot(self.col.T, grad_a)
        # Transpose to (FN, C * FH * FW) and reshape to (FN, C, FH, FW) standard shape
        self.grad_w = self.grad_w.transpose(1, 0).reshape(FN, C, FH, FW)
        # Mask gradients when depthwise convolution
        if self.depthwise:
            self.grad_w *= self.mask
        
        # z gradient calculation
        # Inner product (N * OH * OW, FN) and (FN, C * FS * FS) to (N * OH * OW, C * FS * FS)
        grad_col = np.dot(grad_a, self.col_w.T)
        # Adjust to "NCHW" standard shape with col2im
        grad_z = col2im(grad_col, self.z_shape, FH, FW, self.S, self.P)

        # weights update
        self = self.optimizer.update(self)
        
        return grad_z
    
    def _conv_func(self, N, P, F, S):
        # Output size calculation with input image size (N), padding (P), filter size (F), stride (S)
        return int(((N + 2 * P - F)/ S) + 1)


class BatchNormalization:
    """
    http://arxiv.org/abs/1502.03167
    """
    def __init__(self, gamma, beta, momentum, running_mean=None, running_var=None):
        self.gamma = gamma
        self.beta = beta
        self.momentum = momentum
        self.input_shape = None # 4 dimensions in Conv layer and 2 dimensions in Dense layer  

        # mean and variance to be used in test phase
        self.running_mean = running_mean
        self.running_var = running_var  
        
        # for backward propagation
        self.batch_size = None
        self.xc = None
        self.std = None
        self.dgamma = None
        self.dbeta = None

    def forward(self, x, train_flg):
        self.input_shape = x.shape
        if x.ndim != 2:
            N, C, H, W = x.shape
            x = x.reshape(N, -1)

        out = self.__forward(x, train_flg)
        
        return out.reshape(*self.input_shape)
            
    def __forward(self, x, train_flg):
        if self.running_mean is None:
            N, D = x.shape
            self.running_mean = np.zeros(D)
            self.running_var = np.zeros(D)
                        
        if train_flg:
            mu = x.mean(axis=0)
            xc = x - mu
            var = np.mean(xc**2, axis=0)
            std = np.sqrt(var + 10e-7)
            xn = xc / std
            
            self.batch_size = x.shape[0]
            self.xc = xc
            self.xn = xn
            self.std = std
            self.running_mean = self.momentum * self.running_mean + (1-self.momentum) * mu
            self.running_var = self.momentum * self.running_var + (1-self.momentum) * var            
        else:
            xc = x - self.running_mean
            xn = xc / ((np.sqrt(self.running_var + 10e-7)))
            
        out = self.gamma * xn + self.beta 
        return out

    def backward(self, dout):
        if dout.ndim != 2:
            N, C, H, W = dout.shape
            dout = dout.reshape(N, -1)

        dx = self.__backward(dout)

        dx = dx.reshape(*self.input_shape)
        return dx

    def __backward(self, dout):
        dbeta = dout.sum(axis=0)
        dgamma = np.sum(self.xn * dout, axis=0)
        dxn = self.gamma * dout
        dxc = dxn / self.std
        dstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0)
        dvar = 0.5 * dstd / self.std
        dxc += (2.0 / self.batch_size) * self.xc * dvar
        dmu = np.sum(dxc, axis=0)
        dx = dxc - dmu / self.batch_size
        
        self.dgamma = dgamma
        self.dbeta = dbeta
        
        return dx


class PoolLayer:
    """
    Parameters
    ----------
    PH(Pooling Height) : int
        Height of pooling kernels
    PW(Pooling Width) : int
        Width of pooling kernels
    PS(Pooling Stride) : int
        Stride value of pooling kernels
    PP(Pooling Padding) : int
        Padding value of pooling kernels
    pooling : str
        "Max" or "Average"
    
    Attributes
    ----------
    self.z_shape : tuple of ints
      Dimension values of input data saved for calculation of backward propagation
    
    """
    def __init__(self, PH, PW, PS=1, PP=0, pooling="Average"):
        self.PH = PH
        self.PW = PW
        self.PS = PS
        self.PP = PP
        self.pooling = pooling
        if self.pooling != "Max" and self.pooling != "Average":
            raise ValueError("Set \"Max\" or \"Average\".")
        
    def forward(self, z):
        N, C, H, W = z.shape
        OH = int(1 + (H - self.PH) / self.PS)
        OW = int(1 + (W - self.PW) / self.PS)
        
        # (N, C, H, W) to (N * OH * OW, C * FH * FW)
        # At this moment, arrays correspond to each pooling-applied area are already alligned along axis 1.
        col = im2col(z, self.PH, self.PW, self.PS, self.PP)
        # Set size of axis 1 to that of each pooling-applied area
        col = col.reshape(-1, self.PH * self.PW)
        
        if self.pooling == "Max":
            a = np.max(col, axis=1)
            a = a.reshape(N, OH, OW, C).transpose(0, 3, 1, 2)
            
            # Save for backward propagation
            self.z_shape = z.shape
            self.arg_max = np.argmax(col, axis=1) # Index of max value to be learned for backward propagation
        
        elif self.pooling == "Average":
            a = np.mean(col, axis=1)
            a = a.reshape(N, OH, OW, C).transpose(0, 3, 1, 2)
            
            # Save for backward propagation
            self.z_shape = z.shape
            self.col_shape = col.shape[0]
        
        return a

    def backward(self, grad_a):
        # Reshape to (N, OH, OW, C)
        grad_a = grad_a.transpose(0, 2, 3, 1)
        
        pool_size = self.PH * self.PW
        # Make a zero array with a size(Number of gradient elements, size of elements of pooling applied area)
        grad_col = np.zeros((grad_a.size, pool_size))
        
        if self.pooling == "Max":
            # Update gradients only where values have been exploited when forward propagation
            grad_col[np.arange(self.arg_max.size), self.arg_max.flatten()] = grad_a.flatten()
        elif self.pooling == "Average":
            # Update with mean values of gradients
            grad_col += grad_a.flatten().reshape(-1,1)/pool_size
            
        # Reshape to (N * OH * OW, C * FH * FW)
        grad_col = grad_col.reshape(grad_a.shape + (pool_size,))
        grad_col = grad_col.reshape(grad_col.shape[0] * grad_col.shape[1] * grad_col.shape[2], -1)
        # Adjust to "NCHW" shape by col2im
        grad_z = col2im(grad_col, self.z_shape, self.PH, self.PW, self.PS, self.PP)
        
        return grad_z


def im2col(X, FH, FW, S=1, P=0):
    """
    Parameters
    ----------
    X : shape (n_samples, channels, heights, width)
    FH : Filter height
    FW : Filter width
    S : Stride
    P : Padding

    Returns
    -------
    col : 2 dimensions ndarray
    
    """
    N, C, H, W = X.shape
    # Calculate pseudo output image height and width (4 dimensions)
    OH = (H + 2 * P - FH)//S + 1
    OW = (W + 2 * P - FW)//S + 1

    img = np.pad(X, [(0,0), (0,0), (P, P), (P, P)], 'constant') # 高さと幅方向にパディング
    col = np.zeros((N, C, FH, FW, OH, OW))
    
    # Extracting every image data corresponds to each filter elements
    for y in range(FH):
        y_max = y + S * OH
        for x in range(FW):
            x_max = x + S * OW
            col[:, :, y, x, :, :] = img[:, :, y:y_max:S, x:x_max:S]
    
    # N, OH, OW in axis 0, C, FH, FW in axis 1
    col = col.transpose(0, 4, 5, 1, 2, 3).reshape(N * OH * OW, -1)
    return col


def col2im(col, X_shape, FH, FW, S=1, P=0):
    """
    Parameters
    ----------
    col : shape (N * OH * OW, C * FS * FS)
    X_shape : Input data shape (e.g. (10, 1, 28, 28))
    FH : Filter height
    FW : Filter width
    S : Stride
    P : Padding

    Returns
    -------
    X : shape(n_samples, channels, heights, width)

    """
    # Inverse process compared to im2col
    N, C, H, W = X_shape
    OH = (H + 2 * P - FH)//S + 1
    OW = (W + 2 * P - FW)//S + 1
    col = col.reshape(N, OH, OW, C, FH, FW).transpose(0, 3, 4, 5, 1, 2)

    img = np.zeros((N, C, H + 2 * P + S - 1, W + 2 * P + S - 1))
    for y in range(FH):
        y_max = y + S * OH
        for x in range(FW):
            x_max = x + S * OW
            img[:, :, y:y_max:S, x:x_max:S] += col[:, :, y, x, :, :]
    
    # Return "NCHW" standard shape (excluding padding edge)
    X = img[:, :, P:H + P, P:W + P]
    return X


class Flatten:
    """
    Flatten class
    
    Reduce dimensions (flattening) and make up data suitable for Dense layer inputs
    
    Attributes
    ----------
    self.z_shape : Input data dimensions
      Dimension values of input data saved for calculation of backward propagation
    
    """
    def forward(self, z):
        # Vanish the dimensions of output channels and image size
        self.z_shape = z.shape # for backward propagation
        z = z.reshape(z.shape[0], z.shape[1] * z.shape[2] * z.shape[3])
        
        return z
    
    def backward(self, grad_a):
        # Reshape for the previous layer
        grad_z = grad_a.reshape(self.z_shape)
        
        return grad_z

class Dropout():
    def __init__(self, prob=1e-3):
        self.prob = prob

    def forward(self, X):
        # Switch on and off using mask matrix
        self.mask = np.random.binomial(1, 1. - self.prob, size=X.shape) / (1. - self.prob)
        out = X * self.mask
        return out
    
    def backward(self, dout):
        # Switch on and off using mask matrix
        dX = dout * self.mask
        return dX

class FullConnLayer:
    """
    Dense layer class
    
    Parameters
    ----------
    n_nodes1 : int
      Previous nodes total number
    n_nodes2 : int
      Next nodes total number
    initializer : instance
      An instance for initialization method
    optimizer : instance
      An instance for optimization method
    
    Attributes
    ----------
    self.z : shape (batch_size, n_nodes1)
      Input data preserved for backward propagation
    
    """
    def __init__(self, n_nodes1, n_nodes2, initializer, optimizer):
        # self.w and self.b initialization using initializer
        self.w = initializer.W(n_nodes1, n_nodes2)
        self.b = initializer.B(n_nodes2)
        self.optimizer = optimizer
        
        # Create parameter "v" when Momentum optimization
        if self.optimizer.learning_method == "Momentum":
            self.v_w = np.zeros((n_nodes1, n_nodes2))
            self.v_b = np.zeros(n_nodes2)
        
        # Create parameter "h" for learning coefficients decay when AdaGrad optimization
        elif self.optimizer.learning_method == "AdaGrad":
            self.h_w = np.zeros((n_nodes1, n_nodes2))
            self.h_b = np.zeros(n_nodes2)
        
        # Create parameter "m" and "v" when Adam optimization
        elif self.optimizer.learning_method == "Adam":
            self.m_w = np.zeros((n_nodes1, n_nodes2))
            self.v_w = np.zeros((n_nodes1, n_nodes2))
            self.m_b = np.zeros(n_nodes2)
            self.v_b = np.zeros(n_nodes2)
        
    def forward(self, z):
        """
        Parameters
        ----------
        z : shape (batch_size, n_nodes1)
            Input data
        
        Returns
        ----------
        a : shape (batch_size, n_nodes2)
            Output data
        
        """
        # Save input data and the dimensions for backward propagation
        self.z = z
        self.z_shape = z.shape
        a = np.dot(z, self.w) + self.b
        
        return a
    
    def backward(self, grad_a):
        """
        Parameters
        ----------
        grad_a : shape (batch_size, n_nodes2)
            Gradients flowed from behind
        
        Returns
        ----------
        grad_z : shape (batch_size, n_nodes1)
            Gradients to be sent to the previous layer
        
        """
        self.grad_w = np.dot(self.z.T, grad_a)
        self.grad_b = grad_a.sum(axis=0)
        grad_z = np.dot(grad_a, self.w.T)
        # Update weights value
        self = self.optimizer.update(self)
        
        return grad_z


class LearningMethod:
    """
    For an instance to be handled in to "Layer" class constructor as an optimization method.
    Update layer's weights with "layer.----".

    Parameters
    ----------
    learning_method : str
        "SGD", "Momentum", "AdaGrad" or "Adam"
    lr : float
        Learning rate
    sgd_momentum : float
        Momentum parameter in "Momentum SGD"
    adam_beta1 : float
        Beta1 parameter in "Adam"
    adam_beta2 : float
        Beta2 parameter in "Adam"
        
    """
    def __init__(self, learning_method, lr, sgd_momentum=0.9, adam_beta1=0.9, adam_beta2=0.999):
        self.learning_method = learning_method
        self.lr = lr
        self.sgd_momentum = sgd_momentum
        self.adam_beta1 = adam_beta1
        self.adam_beta2 = adam_beta2
        self.iter = 0 # for "Adam" optimization
    
    def update(self, layer):
        if self.learning_method == "SGD":
            layer.w -= self.lr * layer.grad_w / layer.z_shape[0]
            layer.b -= self.lr * layer.grad_b / layer.z_shape[0]
            return layer
          
        elif self.learning_method == "Momentum":
            layer.v_w = self.sgd_momentum * layer.v_w - self.lr * layer.grad_w / layer.z_shape[0]
            layer.v_b = self.sgd_momentum * layer.v_b - self.lr * layer.grad_b / layer.z_shape[0]
            layer.w += layer.v_w
            layer.b += layer.v_b
            return layer
          
        elif self.learning_method == "AdaGrad":
            layer.h_w += layer.grad_w * layer.grad_w
            layer.h_b += layer.grad_b * layer.grad_b
            layer.w -= self.lr * (1. / (np.sqrt(layer.h_w) + 1e-07)) * layer.grad_w / layer.z_shape[0]
            layer.b -= self.lr * (1. / (np.sqrt(layer.h_b) + 1e-07)) * layer.grad_b / layer.z_shape[0]
            return layer
          
        elif self.learning_method == "Adam":
            self.iter += 1
            lr_t  = self.lr * np.sqrt(1.0 - self.adam_beta2**self.iter) / (1.0 - self.adam_beta1**self.iter)
            
            layer.m_w += (1 - self.adam_beta1) * (layer.grad_w - layer.m_w)
            layer.v_w += (1 - self.adam_beta2) * (layer.grad_w**2 - layer.v_w)
            layer.m_b += (1 - self.adam_beta1) * (layer.grad_b - layer.m_b)
            layer.v_b += (1 - self.adam_beta2) * (layer.grad_b**2 - layer.v_b)
            
            layer.w -= lr_t * layer.m_w / (np.sqrt(layer.v_w) + 1e-7) / layer.z_shape[0]
            layer.b -= lr_t * layer.m_b / (np.sqrt(layer.v_b) + 1e-7) / layer.z_shape[0]
            
            return layer
          
        else:
            raise ValueError("Set \"SGD\", \"Momentum\", \"AdaGrad\" or \"Adam\".")


class SimpleInitializer:
    """
    An instance to be handled in to Dense layer (FullConnLayer class) constructor as initializing method.
    Initializing using "self.W" and "self.B".

    Parameters
    ----------
    sigma : str, float or int
        SD for initial value of weights of each layer ("Xavier", "He" or float value)
    n : int
        Nodes number in previous layer
        
    """
    def __init__(self, sigma, n):
        if (type(sigma) is float) or (type(sigma) is int):
            self.sigma = float(sigma)
        elif sigma == "Xavier":
            self.sigma = np.sqrt(1. / n)
        elif sigma == "He":
            self.sigma = np.sqrt(2. / n)
        else:
            raise ValueError("Set \"Xavier\" or \"He\" or numerical value (float or int).")
    
    def W(self, n_nodes1, n_nodes2):
        return self.sigma * np.random.randn(n_nodes1, n_nodes2)
    
    def B(self, n_nodes2):
        return self.sigma * np.random.randn(n_nodes2)


class Activator:
    """
    Activation function class
    
    Parameters
    ----------
    activation : str
        Types of activation function ("sigmoid", "tanh" or "ReLU")
    
    Attributes
    ----------
    self.a : shape (batch_size, n_nodes2)
      Input data preserved for backward propagation
    
    """
    def __init__(self, activation):
        self.activation = activation
    
    def forward(self, a):
        # Save input data "a" for backward propagation
        self.a = a
        if self.activation == "sigmoid":
            sigmoid_range = 34.538776394910684
            a = np.clip(a, -sigmoid_range, sigmoid_range) # for overflow
            return 1 / (1 + np.exp(-a))
        elif self.activation == "tanh":
            return np.tanh(a)
        elif self.activation == "ReLU":
            return np.maximum(0, a)
        else:
            raise ValueError("Set \"sigmoid\" or \"tanh\" or \"ReLU\".")
            
    def backward(self, grad_a):
        if self.activation == "sigmoid":
            return grad_a * (self.a * (1. - self.a))
        elif self.activation == "tanh":
            return grad_a * (1. - (np.tanh(self.a))**2)
        elif self.activation == "ReLU":
            derivative = np.where(self.a > 0., 1., 0.)
            return grad_a * derivative
        else:
            raise ValueError("Set \"sigmoid\" or \"tanh\" or \"ReLU\".")

class Softmax:
    """
    Softmax function class
    
    """
    def forward(self, a):
        a_max = np.max(a, axis=1)
        exp_a = np.exp(a - a_max.reshape(-1, 1)) # a_max for overflow
        sum_exp_a = np.sum(exp_a, axis=1).reshape(-1, 1)
        
        return exp_a / sum_exp_a
    
    def backward(self, a_out, y):
        return a_out - y


class GetMiniBatch:
    """
    Iterator to acquire mini-batch data
    Acquire mini-batch data using "for" sentence in neural network

    Parameters
    ----------
    X : shape (n_samples, channels, image_height, image_width)
      Train data
    y : shape (n_samples, 1)
      Labels
    batch_size : int
      Batch size
    seed : int
      Seed value for pseudo randomization
    
    """
    def __init__(self, X, y, batch_size=10, seed=0):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self.X = X[shuffle_index]
        self.y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int)

    # Return iteration times in 1 epoch
    def __len__(self):
        return self._stop

    # Return batch data per iteration
    def __getitem__(self, item):
        p0 = item * self.batch_size
        p1 = item * self.batch_size + self.batch_size
        return self.X[p0:p1], self.y[p0:p1]        

    # Set iteration count value to zero
    def __iter__(self):
        self._counter = 0
        return self

    # Return batch data for next iteration
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter * self.batch_size
        p1 = self._counter * self.batch_size + self.batch_size
        self._counter += 1
        return self.X[p0:p1], self.y[p0:p1]
7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?