前回は、Kerasでの回帰分析モデルの使い方を書いてみました。
今回は、分類モデルについて、ちょっとだけ書いてみます。
まず、Kerasでの分類モデルの作り方は様々情報がありますが、私は以下のブログを参考にさせていただきました。
あと、Keras公式サイトも参考にしています。
利用する情報は、前回どおり、アメダスのある日のデータ。
再度以下に示しておきます。(作り方は、その1あたりで書いてあったような気がする)
year,month,day,hour,temp,wind,angle,weather,
2019,8,13,1,24.9,1.4,0,2
2019,8,13,2,24.1,2.2,0,2
2019,8,13,3,23.8,1.4,0,2
2019,8,13,4,23.5,1.2,0,2
2019,8,13,5,23.2,1.8,0,2
2019,8,13,6,23.9,0.7,15,2
2019,8,13,7,25.1,0.9,13,2
2019,8,13,8,26.7,1.0,10,2
2019,8,13,9,28.6,1.6,5,2
2019,8,13,10,30.3,1.2,8,2
2019,8,13,11,30.6,1.3,11,2
2019,8,13,12,31.4,2.5,1,2
2019,8,13,13,33.3,2.0,5,2
2019,8,13,14,33.0,2.3,16,2
2019,8,13,15,33.9,1.8,3,2
2019,8,13,16,32.2,3.2,13,2
2019,8,13,17,29.4,1.0,15,10
2019,8,13,18,27.1,4.4,11,10
2019,8,13,19,25.9,3.7,13,10
2019,8,13,20,26.0,2.4,16,4
2019,8,13,21,26.0,0.9,16,4
2019,8,13,22,25.6,1.3,16,2
2019,8,13,23,25.4,2.6,0,2
何の分類をしようかと思いましたが、風速が一定値以上(例えば2mとします)のときに強風、そうでないときは微風/無風、という風に2パターンに分けてみます。
データの下処理(取り込み、正規化など)は以下のようになります。
import pandas as pd
import numpy as np
# deta making???
csv_input = pd.read_csv(filepath_or_buffer="data_out.csv",
encoding="ms932",
sep=",")
# インプットの項目数(行数 * カラム数)を返却します。
# 指定したカラムだけ抽出したDataFrameオブジェクトを返却します。
x = np.array(csv_input[["hour"]])
y = np.array(csv_input[["wind"]])
# num of records
N = len(x)
# 正規化
x_max = np.max(x,axis=0)
x_min = np.min(x,axis=0)
x = (x - np.min(x,axis=0))/(np.max(x,axis=0) - np.min(x,axis=0))
# y > 2[m] : strong
# y <= 2[m] : weak
y_new = np.zeros(len(y),dtype=int)
for k in range(len(y)):
if y[k] > 2:
y_new[k] = 1
y = y_new.reshape(y.shape)
yには出力データが入っていて、
- 1のとき → 強風と判断
- 0のとき → 微風/無風と判断
となっています。
これを学習させるために、Kerasで以下のようなコードでモデルを構築してみます。
# 学習のためのモデルを作る
model = Sequential()
# 全結合層(1層->30層)
model.add(Dense(input_dim=1, output_dim=30, bias=True))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# 全結合層(30層->1層)
model.add(Dense(output_dim=1))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# モデルをコンパイル
model.compile(loss="binary_crossentropy", optimizer="sgd", metrics=["accuracy"])
# 学習を実行
model.fit(x, y, epochs=5000, batch_size=32, verbose=1)
最後の出力部分、1つだけのときには、binary_crossentropyを選択するのが正解のようです。
誤差関数としては、以下のものを想定しています。(パターン認識と機械学習上p.236の式5.23を簡単にしたもの)
E(\textbf{w}) = - \sum_{n=1}^{N}
\{ t_{n} \ln y (x_n,\textbf{w}) + (1-t_{n})\ln (1-y (x_n,\textbf{w})) \}
インデックスnは、サンプルの数を示していて、t_nは、x_nに対応する正解値(1 or 0)。
y(x_n,w)は、入力=x_n、ニューラルネットパラメタ=wとしたときの推論出力を意味しています(このときE(w)=0と最小値を取る)。
どんなx_nに対しても、t_n = y(x_n,w)が成り立つようなwを見つけるのが学習というプロセスになります。
ここで、結果評価用に、以前にも出した正解率を出す関数を一つ準備します。
(Kerasのログにも出ているのですが、練習用に自前で準備)
# y:predict
# t:true
def checkOKPercent(y,t):
# from predict param
sign_newral = np.sign(np.array(y).reshape([len(t),1]) - 0.5)
# from true param
sign_orig = np.sign(np.array(t.reshape([len(t),1])) - 0.5)
# are there same sign??
NGCNT = np.sum(np.abs(sign_newral-sign_orig))/2
# calc NG percent in [0.0-1.0]
NGPer = NGCNT / len(t)
# return OK percent [0.0-1.0]
return 1.0-NGPer
ニューラルネットワークの出力である、y(x_n,w)は実際には[0,1]の間の任意の値(float???)を取ることになり、0.5以上なら強風、0.5未満なら微風/無風という意味となります。
以上まとめて、1つのソースにしたものを以下に貼り付けます。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Activation, Dense
# y:predict
# t:true
def checkOKPercent(y,t):
# from predict param
sign_newral = np.sign(np.array(y).reshape([len(t),1]) - 0.5)
# from true param
sign_orig = np.sign(np.array(t.reshape([len(t),1])) - 0.5)
# are there same sign??
NGCNT = np.sum(np.abs(sign_newral-sign_orig))/2
# calc NG percent in [0.0-1.0]
NGPer = NGCNT / len(t)
# return OK percent [0.0-1.0]
return 1.0-NGPer
# deta making???
csv_input = pd.read_csv(filepath_or_buffer="data_out.csv",
encoding="ms932",
sep=",")
# インプットの項目数(行数 * カラム数)を返却します。
# 指定したカラムだけ抽出したDataFrameオブジェクトを返却します。
x = np.array(csv_input[["hour"]])
y = np.array(csv_input[["wind"]])
# num of records
N = len(x)
# 正規化
x_max = np.max(x,axis=0)
x_min = np.min(x,axis=0)
x = (x - np.min(x,axis=0))/(np.max(x,axis=0) - np.min(x,axis=0))
# y > 2[m] : strong
# y <= 2[m] : weak
y_new = np.zeros(len(y),dtype=int)
for k in range(len(y)):
if y[k] > 2:
y_new[k] = 1
y = y_new.reshape(y.shape)
# 学習のためのモデルを作る
model = Sequential()
# 全結合層(1層->30層)
model.add(Dense(input_dim=1, output_dim=30, bias=True))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# 全結合層(30層->1層)
model.add(Dense(output_dim=1))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# モデルをコンパイル
model.compile(loss="binary_crossentropy", optimizer="sgd", metrics=["accuracy"])
# 学習を実行
model.fit(x, y, epochs=5000, batch_size=32, verbose=1)
# 真値のプロット
plt.plot(x,y,marker='x',label="true")
# 推論でKerasの結果を計算,表示
y_predict = model.predict(x)
# Keras計算結果のプロット
plt.plot(x,y_predict,marker='x',label="predict")
# 凡例表示
plt.legend()
# display result
print('OK %.2f[percent]' % (checkOKPercent(y_predict,y)*100.0))
気が付けば、importからtensorflowを消せるんですね・・・。
さて、結果をプロットしてみると、以下のようになりました。
横軸が(正規化した)時間、縦軸が強風かどうか となっています。
青いのが正解、オレンジがニューラルネットワークの結果です。
なんだか・・・やっぱり直線近似っぽい感じになってしまうのですね。
収束の具合がコンソールに出ているのでチェックしてみます。
Epoch 4994/5000
23/23 [==============================] - 0s 87us/step - loss: 0.6013 - acc: 0.6522
Epoch 4995/5000
23/23 [==============================] - 0s 87us/step - loss: 0.6012 - acc: 0.6522
Epoch 4996/5000
23/23 [==============================] - 0s 43us/step - loss: 0.6012 - acc: 0.6522
Epoch 4997/5000
23/23 [==============================] - 0s 43us/step - loss: 0.6012 - acc: 0.6522
Epoch 4998/5000
23/23 [==============================] - 0s 43us/step - loss: 0.6012 - acc: 0.6522
Epoch 4999/5000
23/23 [==============================] - 0s 43us/step - loss: 0.6012 - acc: 0.6522
Epoch 5000/5000
23/23 [==============================] - 0s 43us/step - loss: 0.6012 - acc: 0.6522
OK 65.22[percent]
lossが0.6012あたりに収束しているので、ロジックとしては動いているようです。
ただ、正解率は65.22%。
かならずどちらかに予想してもこれぐらいはいく気がするので、性能としてはイマイチです。
そこで、前回やったようなニューラルネットワークの初期値を設定するパターンで試してみました。
いきなり全部ソースコードを載せます。
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Activation, Dense
from keras.utils.np_utils import to_categorical
from keras import backend as K
import keras
# y:predict
# t:true
def checkOKPercent(y,t):
# from predict param
sign_newral = np.sign(np.array(y).reshape([len(t),1]) - 0.5)
# from true param
sign_orig = np.sign(np.array(t.reshape([len(t),1])) - 0.5)
# are there same sign??
NGCNT = np.sum(np.abs(sign_newral-sign_orig))/2
# calc NG percent in [0.0-1.0]
NGPer = NGCNT / len(t)
# return OK percent [0.0-1.0]
return 1.0-NGPer
# init infomation for keras layers or models
class InitInfo:
# constractor
# x:input y:output
def __init__(self,x,y):
self.x = x
self.y = y
# calc coefficient of keras models(1st layer)
# input s:changing point in [0,1]
# sign:[1]raise,[0]down
# return b:coefficient of bias
# w:coefficient of x
# notice - it can make like step function using this return values(s,sign)
def calc_b_w(self,s,sign):
N = 1000 # 仮置き
# s = -b/w
if sign > 0:
b = -N
else:
b = N
if s != 0:
w = -b/s
else:
w = 1
return b,w
# calc coefficient of keras models(1st and 2nd layer)
def calc_w_h(self):
K = len(self.x)
# coefficient of 1st layer(x,w)
w_array = np.zeros([K*2,2])
# coefficient of 2nd layer
h_array = np.zeros([K*2,1])
w_idx = 0
for k in range(K):
# x[k] , y[k]
# make one step function
# startX : calc raise point in [0,1]
if k > 0:
startX = self.x[k] + (self.x[k-1] - self.x[k])/2
else:
startX = 0
# endX : calc down point in [0,1]
if k < K-1:
endX = self.x[k] + (self.x[k+1] - self.x[k])/2
else:
endX = 1
# calc b,w
if k > 0:
b,w = self.calc_b_w(startX,1)
else:
# init???
b = 100
w = 1
# stepfunction 1stHalf
# __________
# 0 ________|
#
w_array[w_idx,0] = w
w_array[w_idx,1] = b
h_array[w_idx,0] = self.y[k]
w_idx += 1
# stepfunction 2ndHalf
#
# 0 __________
# |________
b,w = self.calc_b_w(endX,1)
w_array[w_idx,0] = w
w_array[w_idx,1] = b
h_array[w_idx,0] = self.y[k]*-1
# shape of 1st + 2nd is under wave
# _
# 0 ________| |________
#
w_idx += 1
# record param
self.w = w_array
self.h = h_array
self.w_init = w_array[:,0]
self.b_init = w_array[:,1]
self.paramN = len(h_array)
return
# for bias coefficients setting
def initB(self, shape, name=None):
value = self.b_init
value = value.reshape(shape)
return K.variable(value, name=name)
# for w coefficients (x) setting
def initW(self, shape, name=None):
value = self.w_init
value = value.reshape(shape)
return K.variable(value, name=name)
# for h coefficients setting
def initH(self, shape, name=None):
value = self.h
value = value.reshape(shape)
return K.variable(value, name=name)
# deta making???
csv_input = pd.read_csv(filepath_or_buffer="data_out.csv",
encoding="ms932",
sep=",")
# インプットの項目数(行数 * カラム数)を返却します。
print(csv_input.size)
# 指定したカラムだけ抽出したDataFrameオブジェクトを返却します。
x = np.array(csv_input[["hour"]])
y = np.array(csv_input[["wind"]])
print(y.shape)
# num of records
N = len(x)
# 正規化
x_max = np.max(x,axis=0)
x_min = np.min(x,axis=0)
x = (x - np.min(x,axis=0))/(np.max(x,axis=0) - np.min(x,axis=0))
# y > 2[m] : strong
# y <= 2[m] : weak
y_new = np.zeros(len(y),dtype=int)
for k in range(len(y)):
if y[k] > 2:
y_new[k] = 1
y_new = y_new.reshape(y.shape)
y = np.array(y_new,dtype=float)
# create InitInfo object
objInitInfo = InitInfo(x,y_orig)
# calc init value of w and h(and bias)
objInitInfo.calc_w_h()
# 学習のためのモデルを作る
model = Sequential()
# 全結合層(1層->XXX層)
model.add(Dense(input_dim=1, output_dim=objInitInfo.paramN,
bias=True,
kernel_initializer=objInitInfo.initW,
bias_initializer=objInitInfo.initB))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# 全結合層(XXX層->2層)
model.add(Dense(output_dim=1,kernel_initializer=objInitInfo.initH))
# 活性化関数(softmax関数)
model.add(Activation("sigmoid"))
sgd_ = keras.optimizers.SGD(lr=0.05)
cb = keras.callbacks.EarlyStopping(monitor='loss',
min_delta=0.0004,
patience=1,
verbose=0,
mode='auto',
baseline=None)
# モデルをコンパイル
model.compile(loss="binary_crossentropy", optimizer=sgd_, metrics=["accuracy"])
# 学習を実行
model.fit(x, y, epochs=5000, batch_size=32, verbose=1,callbacks=[cb])
# 真値のプロット
plt.plot(x,y,marker='x',label="true")
# 推論でKerasの結果を計算,表示
y_predict = model.predict(x)
# Keras計算結果のプロット
plt.plot(x,y_predict,marker='x',label="predict")
# 凡例表示
plt.legend()
# display result
print('OK per %.2f ' % (checkOKPercent(y_predict,y)*100.0))
収束したと判断したら抜けるようなコールバックの設定も追加してあります。
結果は???
23/23 [==============================] - 0s 0us/step - loss: 0.2310 - acc: 1.0000
NG per 100.00
素晴らしい、正解率が100%になりました!!!
初期値さえ上手く設定できればよいということが分かりました。
しかし、なんだか汎用性が無いような???
ちょっと係数をランダムに降ってみて、試してみたところ、以下のような設定だと上手く行きそうでした。
# for bias coefficients setting
def initB(shape, name=None):
L = np.prod(shape)
value = np.ones(L).reshape(shape)*(-1000)
return K.variable(value, name=name)
# for w coefficients (x) setting
def initW(shape, name=None):
value = 1000/(np.random.random(shape))
return K.variable(value, name=name)
xから中間層へ入れるところの係数ですが、Bias側は1000固定、x側(w)は[0,1]の乱数を作って、1000/乱数(1000以上になる)に適当に振ります(ゼロ割起きたらゴメンナサイ)。
※1000という数値はある程度大きければ何でも良いと思われます
これによって、中間層ではステップ関数が適当に出あがるんじゃないかと思われます。
それ以降、中間層からyへの出力は何でもいけそうでした。
ということで、以上をまとめると・・・
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import Activation, Dense
from keras.utils.np_utils import to_categorical
from keras import backend as K
import keras
# y:predict
# t:true
def checkOKPercent(y,t):
# from predict param
sign_newral = np.sign(np.array(y).reshape([len(t),1]) - 0.5)
# from true param
sign_orig = np.sign(np.array(t.reshape([len(t),1])) - 0.5)
# are there same sign??
NGCNT = np.sum(np.abs(sign_newral-sign_orig))/2
# calc NG percent in [0.0-1.0]
NGPer = NGCNT / len(t)
# return OK percent [0.0-1.0]
return 1.0-NGPer
# for bias coefficients setting
def initB(shape, name=None):
L = np.prod(shape)
value = np.ones(L).reshape(shape)*(-1000)
return K.variable(value, name=name)
# for w coefficients (x) setting
def initW(shape, name=None):
value = 1000/(np.random.random(shape))
return K.variable(value, name=name)
# deta making???
csv_input = pd.read_csv(filepath_or_buffer="data_out.csv",
encoding="ms932",
sep=",")
# 指定したカラムだけ抽出したDataFrameオブジェクトを返却します。
x = np.array(csv_input[["hour"]])
y = np.array(csv_input[["wind"]])
# num of records
N = len(x)
# 正規化
x_max = np.max(x,axis=0)
x_min = np.min(x,axis=0)
x = (x - np.min(x,axis=0))/(np.max(x,axis=0) - np.min(x,axis=0))
# y > 2[m] : strong
# y <= 2[m] : weak
y_new = np.zeros(len(y),dtype=int)
for k in range(len(y)):
if y[k] > 2:
y_new[k] = 1
y_new = y_new.reshape(y.shape)
y = np.array(y_new,dtype=float)
# 学習のためのモデルを作る
model = Sequential()
# 全結合層(1層->XXX層)
model.add(Dense(input_dim=1, output_dim=50,
bias=True,
kernel_initializer=initW,
bias_initializer=initB))
# 活性化関数(Sigmoid関数)
model.add(Activation("sigmoid"))
# 全結合層(XXX層->2層)
model.add(Dense(output_dim=1))
# 活性化関数(softmax関数)
model.add(Activation("sigmoid"))
sgd_ = keras.optimizers.SGD(lr=0.3)
cb = keras.callbacks.EarlyStopping(monitor='loss',
min_delta=0.0001,
patience=1,
verbose=0,
mode='auto',
baseline=None)
# モデルをコンパイル
model.compile(loss="binary_crossentropy", optimizer=sgd_, metrics=["accuracy"])
# 学習を実行
model.fit(x, y, epochs=5000, batch_size=32, verbose=1,callbacks=[cb])
# 真値のプロット
plt.plot(x,y,marker='x',label="true")
# 推論でKerasの結果を計算,表示
y_predict = model.predict(x)
# Keras計算結果のプロット
plt.plot(x,y_predict,marker='x',label="predict")
# 凡例表示
plt.legend()
# display result
print('OK per %.2f ' % (checkOKPercent(y_predict,y)*100.0))
結果も良好でした。
Epoch 1032/5000
23/23 [==============================] - 0s 87us/step - loss: 0.1018 - acc: 1.0000
NG per 100.00
中間層のノード数を50としていましたが、これだと正解率が100%にいかないことも多かったです。
これはノード数を増やしていくことで、安定度が上がっていくんじゃないかと思われます(実験結果より)。
Epoch 5000/5000
23/23 [==============================] - 0s 0us/step - loss: 0.0058 - acc: 1.0000
OK per 100.00
今回は単純な2クラス分類をKerasで実装してみました。
softmax関数を使う方法もあるようなので(冒頭に紹介した記事はsoftmaxでの実装がメインでした)、そちらも書こうと思っていましたが、ちょっと多くなってしまったので別の機会にしたいと思います。