1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Nural Netowrk

Last updated at Posted at 2017-08-10

旧・ニューラルネットワーク。回帰

MultiLayerPerceptron_旧.py
# -*- coding: UTF-8 -*-

import numpy as np
import random
import math

class MultiLayerPerceptron:
    # 隠れ層が2つのニューラルネットワーク

    count = 0
    adjustedValue = 1e-8

    adaGradParameter = [0] * 4
    h1 = None
    h2 = None
    h3 = None


    def __init__(self,
                 numInp=0,
                 numHid=0,
                 numOut=0,
                 epochs=1000,
                 learningRate=0.01,
                 funcHid="tanh",
                 funcOut="softmax",
                 ):

        self.epochs = epochs
        self.numInp = numInp
        self.numHid = numHid
        self.numOut = numOut
        self.learningRate = learningRate

        # self.isFirstEpoch = True

        # 引数の指定に合わせて隠れ層の活性化関数とその微分関数を設定
        if funcHid == "sigmoid":
            self.funcHid = self.sigmoid
            self.funcHid_deriv = self.sigmoid_deriv

        elif funcHid == "tanh":
            self.funcHid = self.tanh
            self.funcHid_deriv = self.tanh_deriv

        elif funcHid == "Leaky_ReLU":  # return nan!
            self.funcHid = self.Leaky_ReLU
            self.funcHid_deriv = self.Leaky_ReLU_deriv

        else:
            pass

        # 交差エントロピー誤差関数を使う場合、出力層の活性化関数の微分は不要
        if funcOut == "sigmoid":
            self.funcOut = self.sigmoid
            self.funcOut_deriv = self.sigmoid_deriv

        elif funcOut == "tanh":
            self.funcOut = self.tanh
            self.funcOut_deriv = self.tanh_deriv

        elif funcOut == "softmax":
            self.funcOut = self.softmax
            self.funcOut_deriv = self.softmax_deriv

        elif funcOut == "Leaky_ReLU":
            self.funcOut = self.Leaky_ReLU
            self.funcOut_deriv = self.Leaky_ReLU_deriv

        elif funcOut == "identity":
            self.funcOut = self.identity
            self.funcOut_deriv = self.identity_deriv

        else:
            pass

        """
        # 重みを (-1.0, 1.0)の一様乱数で初期化
        self.W1 = np.random.uniform(-1.0, 1.0, (self.numHid, self.numInp)).astype(np.float32)
        self.W2 = np.random.uniform(-1.0, 1.0, (self.numHid, self.numHid)).astype(np.float32)
        self.W3 = np.random.uniform(-1.0, 1.0, (self.numOut, self.numHid)).astype(np.float32)

        self.W1 = self.my_initializer(self.W1)
        self.W2 = self.my_initializer(self.W2)
        self.W3 = self.my_initializer(self.W3)
        """

        self.W1 = np.random.randn(self.numHid, self.numInp).astype(np.float32)
        self.W2 = np.random.randn(self.numHid, self.numHid).astype(np.float32)
        self.W3 = np.random.randn(self.numOut, self.numHid).astype(np.float32)

        # Xavier initialization
        self.W1 /= np.sqrt(self.numInp)
        self.W2 /= np.sqrt(self.numHid)
        self.W3 /= np.sqrt(self.numHid)

        # Add bias with 0
        self.W1 = np.insert(self.W1, 0, 0, axis=1)
        self.W2 = np.insert(self.W2, 0, 0, axis=1)
        self.W3 = np.insert(self.W3, 0, 0, axis=1)

        """
        ### TEST ###
        self.W1 = np.array([[1, -0.29347357, -0.3470237 ],[1, 0.48876637,  0.32257119],[1, 0.14908332, -0.65325332]])
        self.W2 = np.array([[1,  0.21143068, -0.11938749, -0.45852581],[1, 0.17979711, -0.34699816, -0.63548869],[1,  0.38606647,  0.16219939,  0.81149703]])
        self.W3 = np.array([[1, -1.49986553, -0.3109121 , -0.65442187],[1, -0.57223064,  0.59579271,  0.53880507]])
        print ("W1",self.W1)
        print ("W2",self.W2)
        print ("W3",self.W3)
        """

    def my_initializer(self, w):

        i = 0
        for data in w:

            in_num = len(data)
            average = np.sum(data) / in_num
            deviation = data - average

            square_dev = deviation * deviation
            avg_square_dev = np.average(square_dev)
            normal_dev = np.sqrt(avg_square_dev)

            """
            normal_dev = np.sqrt(1 / in_num)
            """

            w[i] = deviation / normal_dev

        w = np.insert(w, 0, 0, axis=1)

        return w


    # 重みパラメーターのロード        
    def loadWeight(self):
        self.W1 = np.loadtxt('W1.py')
        self.W2 = np.loadtxt('W2.py')

    # 重みパラメーターのセーブ
    def saveWeight(self):
        np.savetxt('W1.py', self.W1)  # 書き込み
        np.savetxt('W2.py', self.W2)  # 書き込み

    # 活性化関数
    def tanh(self, x):
        return np.tanh(x)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def softmax(self, x):
        x = x - max(x)
        temp = np.exp(x)
        return temp / np.sum(temp)

    def Leaky_ReLU(self, x):
        for i in range(len(x)):
            if x[i] < 0:
                x[i] = 0.01 * x[i]
        return x

    def identity(self, x):
        return x

    # 活性化関数の微分
    def sigmoid_deriv(self, x):
        return x * (1 - x)

    def tanh_deriv(self, x):

        return 1.0 - np.power(np.tanh(x), 2)
        # return 4/((np.exp(x)+np.exp(-x))**2)

    def softmax_deriv(self, x):
        return 1

    def Leaky_ReLU_deriv(self, x):
        for i in range(len(x)):
            if x[i] >= 0:
                x[i] = 1
            else:
                x[i] = 0.01
        return x

    def identity_deriv(self, x):
        for i in range(len(x)):
            x[i] = 1
        return x

    # 最適化アルゴリズム(必要に応じて)
    def adaGrad(self, delta, n):

        self.adaGradParameter[n] += delta * delta

        for i in range(len(self.adaGradParameter[n][0])):
            sqrt = math.sqrt(self.adaGradParameter[n][0][i]) + self.adjustedValue
            delta[0][i] = delta[0][i] / sqrt

        return delta

    # ゼロ平均正規化
    def zeroMean(self, u):

        if np.sum(u) == 0:
            return u

        x = u - np.average(u)
        x = np.array([[1, 2, 3, 4]])

        # The Frobenius norm
        # v = x / np.linalg.norm(x)
        v = x / ((np.sum(x * x)) ** 0.5)

        return v

    # 順伝播
    def fw(self, x, train=False):

        bias = 1

        # print("bias", bias)
        x = np.array(x)
        # Add bias to x
        x = np.insert(x, 0, bias)

        # u1 = np.dot(self.W1, x)
        u1 = np.dot(x, self.W1.T)
        # print("x", x)
        # print("W1", self.W1)
        # print("u1", u1)

        z1 = self.funcHid(u1)
        # Add bias to z1
        z1 = np.insert(z1, 0, bias)
        # print("z1", z1)

        u2 = np.dot(self.W2, z1)
        # print("W2", self.W2)
        # print("u2", u2)

        z2 = self.funcHid(u2)
        # Add bias to z2

        # When reinforcemenrt leaining, skip add bias item
        bias = 0
        z2 = np.insert(z2, 0, bias)

        # print("z2", z2)

        u3 = np.dot(self.W3, z2)
        # print("W3", self.W3)
        # print("u3", u3)

        y = self.funcOut(u3)
        # print("y", y)

        # self.isFirstEpoch = False

        if train:
            return z1, z2, y
        else:
            return y

    # 誤差逆伝播
    def bw(self, x, t):

        # print("bw")

        # 入力を順伝播させて中間層の出力を計算
        z1, z2, y = self.fw(x, train=True)

        bias = 1

        # Back Propagation

        # 出力層の誤差を計算(交差エントロピー誤差関数を使用)
        # delta3 = (y - t) * self.funcOut_deriv(y)
        delta3 = y - t

        # 誤差を逆伝播させて隠れ層の誤差(勾配)を計算
        delta2 = self.funcHid_deriv(z2) * np.dot(self.W3.T, delta3)
        # 勾配からバイアスを削除
        delta2 = np.delete(delta2, 0)

        # 誤差を逆伝播させて隠れ層の誤差(勾配)を計算
        delta1 = self.funcHid_deriv(z1) * np.dot(self.W2.T, delta2)
        # 勾配からバイアスを削除
        delta1 = np.delete(delta1, 0)

        # 出力層の誤差を用いて出力層の重みを更新
        #  行列演算になるので2次元ベクトルに変換する必要がある
        z2 = np.atleast_2d(z2)
        delta3 = np.atleast_2d(delta3)
        self.W3 -= self.learningRate * np.dot(delta3.T, z2)

        # 隠れ層の誤差を用いて隠れ層の重みを更新
        z1 = np.atleast_2d(z1)
        delta2 = np.atleast_2d(delta2)
        self.W2 -= self.learningRate * np.dot(delta2.T, z1)

        # 隠れ層の誤差を用いて入力層の重みを更新
        #  入力層にバイアスを追加
        x = np.insert(x, 0, bias)
        x = np.atleast_2d(x)
        delta1 = np.atleast_2d(delta1)
        self.W1 -= self.learningRate * np.dot(delta1.T, x)

        """
        print("x",x)
        print("z1",z1)
        print("z2",z2)
        print("y",y)
        print("t",t)
        print("func y",self.funcOut_deriv(y))
        print("delta3",delta3)
        print("delta3",delta3)
        print("delta1",delta1)
        """

    # 学習
    def train(self, X, T):

        """訓練データを用いてネットワークの重みを更新する"""
        # 入力データの最初の列にバイアスユニットの入力1を追加
        # X = np.hstack([np.ones([X.shape[0], 1]), X])
        T = np.array(T)

        # 逐次学習(online training)
        # 訓練データからランダムサンプリングして重みを更新をepochs回繰り返す
        for counter in range(self.epochs):

            # 訓練データからランダムに選択する
            idx = np.random.randint(X.shape[0])

            # Backward
            self.bw(X[idx], T[idx])

            # DeBUG
            self.count += 1
            if self.count % 1000 == 0:
                # print (self.count, "delta3", delta3)
                pass

    def predict(self, X, show=False):
        # print("predict")

        for x in X:
            x = np.array(x)

            # 順伝播によりネットワークの出力を計算
            y = self.fw(x)

            if show:
                print("x", x)
                print("y", y)

            if y[0] != y[0]:
                print("W1", self.W1)
                input()

                print("W2", self.W2)
                input()

                print("W3", self.W3)
                input()

                print("x", x)
                print("y", y)
                input()
                exit()

            """
            fl1 = float("%.1f" % a[0])
            fl2 = float("%.1f" % a[1])
            print (i, fl1,":",fl2)
            """

        return y


# sample
if __name__ == "__main__":
    # XOR判定
    # mlp = MultiLayerPerceptron(numInp=2,numHid=2,numOut=2,epochs=1000,learningRate=0.1,funcHid="tanh",funcOut="softmax")
    X = np.array([[1, 0], [0, 1], [1, 1, ], [0, 0]])
    T = np.array([[0, 1, ], [0, 1, ], [1, 0, ], [1, 0]])

    """
    # 多数決
    #mlp = MultiLayerPerceptron(numInp=3,numHid=5,numOut=2,epochs=1000,learningRate=0.1,funcHid="tanh",funcOut="softmax")
    X = np.array([[1, 1, 1], [1, 1, 0], [1, 0, 0],[0, 0 , 0], [1,0,1],[0,1, 0],[0,1,1] ])
    T = np.array([[1, 0],    [1, 0],    [0, 1],   [0, 1],     [1,0],  [0,1],   [1,0]    ])
    """

    # AutoEncoder
    mlp = MultiLayerPerceptron(numInp=2, numHid=3, numOut=2, epochs=1, learningRate=0.1, funcHid="tanh",
                               funcOut="identity")
    X = np.array([[1, 1]])
    T = np.array([[1, 1]])

    mlp.train(X, T)
    mlp.predict(X, True)
    pass


新・多層ニューラルネットワーク。分類用

common.py

def softmax(x):
    x_ =  x - np.max(x)
    exp_x = np.exp(x_)
    y = exp_x / np.sum(exp_x)
    return 


def cross_entropy_error(y, t):
    if y.ndim == 1:
        t = t.reshape(1, t.size)
        y = y.reshape(1, y.size)
    n = y.shape[0]
    return -np.sum(t * np.log(y)) / n
mlp.py
class MultiLayerPerceptron:

    def __init__(self, input_size, hid1_size, out_size, learning_rate):

        self.input_size = input_size
        self.hid1_size = hid1_size
        self.out_size = out_size
        self.learning_rate = learning_rate
        
        alpha = 0.1
        
        self.W1 = np.random.randn(input_size, hid1_size).astype(np.float32) * alpha
        self.W2 = np.random.randn(hid1_size, out_size).astype(np.float32) * alpha
        self.b1 = np.zeros(hid1_size).astype(np.float32) * alpha
        self.b2 = np.zeros(out_size).astype(np.float32) * alpha
        
        # Xavier initialization
#         self.W1 /= np.sqrt(input_size)
#         self.W2 /= np.sqrt(hid1_size)
#         self.b1 /= np.sqrt(input_size)
#         self.b2 /= np.sqrt(hid1_size)

    # 活性化関数
    def tanh(self, x):
        self.tanh_y = np.tanh(x)
        return self.tanh_y

    def sigmoid(self, x):
        self.sigmoid_y = 1 / (1 + np.exp(-x))
        return self.sigmoid_y

    def relu(self, x):
        self.relu_mask = (x <=0)
        out = x.copy()
        out[self.relu_mask] = 0
        return out

    def softmaxWithLoss(self, y , t):
        self.cross_entropy_error_y = self.softmax(y)
        self.cross_entropy_error_t = t
        self.loss = cross_entropy_error(self.cross_entropy_error_y,  t)
        return self.loss
    
    # 活性化関数の微分
    def sigmoid_deriv(self, x):
        f = self.sigmoid_y
        return f * (1 - f)

    def tanh_deriv(self, x):
        tanh = self.tanh_y(x)
        return 1.0 - np.power(tanh, 2)
#         return 1.0 - np.power(np.tanh(x), 2)

    def relu_deriv(self, dout):
        # x が 0以上なら、1
        # x が 0以下なら、0
        dout[self.relu_mask] = 0
        dx = dout 
        return dx
    
    def cross_entropy_error_deriv(self, dout=1):
        n = y.shape[0]
        alpha = 1e-7
        
        return (self.cross_entropy_error_y - cross_entropy_error_t) / n

    # 順伝播
    def fw(self, x, train=True):

        u1 = np.dot(x, self.W1) + self.b1
        z = self.sigmoid(u1)
        u2 = np.dot(z, self.W2) + self.b2
        y = softmax(u2)
        
        self.z = z

        return y

    # 誤差逆伝播
    def bw(self, x, t, train=True):
        
        y = self.fw(x , train)
        n = y.shape[0]

        dy = self.cross_entropy_error_deriv(y, t)
        
        db2 = np.sum(dy, axis=0)
        dW2 = np.dot(self.z.T, dy)

        dz = np.dot(dy, self.W2.T)        
        df = self.sigmoid_deriv(dz)
        
        db1 = np.sum(df, axis=0)
        dW1 = np.dot(x.T, df)

        # SGD
        lr = self.learning_rate
        self.W2 -= dW2 * lr
        self.W1 -= dW1 * lr
        self.b2 -= db2 * lr
        self.b1 -= db1 * lr
        
    # 勾配クリア
    def clear(self):
        pass
train.py
import numpy as np
import random
import math

import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn import linear_model, datasets

# ワインデータの読み込み
load_data = datasets.load_wine()
len(load_data.data)

# 学習データの作成
features = pd.DataFrame(data=load_data.data, columns=load_data.feature_names)
targets = pd.DataFrame(data=load_data.target, columns=['class'])

targets = pd.get_dummies(targets['class'], prefix=None, drop_first=False)

# Store scalings in a dictionary so we can convert back later
scaled_features = {}
for each in features.columns:
    mean, std = features[each].mean(), features[each].std()
    scaled_features[each] = [mean, std]
    features.loc[:, each] = (features[each] - mean)/std

train_features, test_features, train_targets, test_targets = train_test_split(features, targets, test_size=0.1)

train_features.head()

epochs = 50
hid_size = 30
out_size = 3
learning_rate = 0.1
batch_size = 4
losses = []

train_size, input_size = train_features.shape
num_terate = int(train_size/batch_size)
print("train_size: " + str(train_size))
print("input_size: " + str(input_size))
print("num_terate: " + str(num_terate))
print()

model = MultiLayerPerceptron(input_size ,hid_size, out_size, learning_rate)

for epoch in range(epochs):
    
    loss = 0
    for i in range(num_terate):
        batch_mask = np.random.choice(train_size, batch_size)
        train_indices = train_features.index.values[batch_mask]
        
        train_features[0:50].values
        model.bw(train_features[0:50].values, train_targets[0:50].values.astype('float32'))
        
#         model.bw(train_batch.values, target_batch.values.astype('float32'))
        loss += model.loss  / num_terate
        
    losses.append(loss)
    print("epoch: " + str(epoch) + " loss: " +str(loss))    
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?