LSTM 2値分類における不均衡データを解消したい
解決したいこと
ここに解決したい内容を記載してください。
現在LSTMを用いた2値分類のプログラムを作成しています。
実際に作成し、精度を評価したところ98%という精度を出すことができました。
しかし、蓋を開けてみると小数側のデータが100件,非小数側のデータが7000件という不均衡データとなっているため、一見精度は高いが小数側(本当に検知したい側)を全く検知できていませんでした。
本来であれば小数側を増やしたり、多数側を減らすような処理が必要ですが、データが特殊なためデータを増やすことは難しく、データ数を小数側に揃えるように減らすとデータが200件と少なすぎる状態です。
そこで小数側に重みを付けることで改善を図ろうと考えているのですが、この手法は考え方として正しいでしょうか。また、実際に重みを付けたい場合コードではどのように記述したらよいのでしょうか。
現在はclass_weightの利用を検討しています。
検索を主に重みづけについて調べていますが、それらしいコード紹介が少なく難航しています。
コードを添付いたしますので何卒宜しくお願い致します。
発生している問題・エラー
または、問題・エラーが起きている画像をここにドラッグアンドドロップ
該当するソースコード
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' #警告表示の解除
from sklearn.preprocessing import LabelEncoder #sklearnライブラリに含まれるLabelEncoderクラスをインポートしている(LabelEncoderは数値や文字列で表されたラベルを(0~ラベル種類数-1)までの数値に変換する)
from keras.utils import np_utils
import pandas as pd #データ解析を容易にするためのモジュール pdという名前でインポートしている
from keras.preprocessing.text import Tokenizer #Kerasライブラリに含まれるTokenizerクラスをインポートしている(Tokenizerは文章のベクトル化に必要)
from keras.utils import pad_sequences #Kerasライブラリに含まれるsequenceクラスとインポートしている(sequenceは異なる長さのテンソルの末尾を0埋めして長さを揃えてくれる)
from sklearn.model_selection import train_test_split #sklearnライブラリに含まれるtrain_test_splitクラスをインポートしている(train_test_splitはデータをテストと訓練に二分割する)
from keras.models import Sequential #kerasライブラリに含まれるSequentialクラスをインポートしている(Sequentialは入力と出力が一つずつのニューラルネットワーク)
from keras.layers import Dense, Embedding, LSTM #kerasライブラリに含まれるDense, Embedding, LSTMクラスをインポートしている(後日調べる)
from keras.callbacks import EarlyStopping #kerasライブラリに含まれるEarlyStoppingクラスをインポートしている(EarlyStoppingは過学習を引き起こす前に学習を止めてくれる)
from keras.layers import Dropout #kerasライブラリに含まれるDropoutクラスをインポートしている(Dropoutは過学習を抑制してくれる理論は知らん)
from keras.backend import clear_session #kerasライブラリに含まれるclear_sessionクラスをインポートしている(clear_sessionは識別子をリセットする)
from keras.models import Sequential #kerasライブラリに含まれるSequentialクラスをインポートしている(clear_sessionは識別子をリセットする)
from keras.layers import Dense, Activation, Dropout, LSTM #
from tensorflow.keras.optimizers import Adam #tensorflowライブラリに含まれるAdamクラスをインポートしている()
import optuna #
from keras.wrappers.scikit_learn import KerasClassifier #kerasライブラリに含まれるKerasClassifierクラスをインポートしている()
from sklearn.model_selection import cross_val_score
with open("all_api_call.txt") as f:
content = f.readlines()
content = [x.strip() for x in content]
data = pd.DataFrame() #データフレームの作成(データフレームは表形式のデータになる)
data['feature'] = content
max_words = 800 #トークン化するAPIの総数を定義
max_len = 100
X = data['feature']
#トークン化,シーケンス番号への変換
#トークン化...文章を単語や文字列単語に分割すること(今回はAPI名ごとに分割)
#シーケンス番号...機械学習で扱いやすい数値
tokenizer = Tokenizer(num_words=max_words) #Tokenizerのインスタンスを作成・num_wordsは学習させたい単語の種数を入れる
tokenizer.fit_on_texts(X) #XをAPI名ごと分割し,それぞれに番号を与える(トークン化)・(例)'getasynckeystate': 1, 'ntdelayexecution': 2, 'ntclose': 3...
X = tokenizer.texts_to_sequences(X) #API名をシーケンス番号に変換する
X = pad_sequences(X, maxlen=max_len, truncating='post') #シーケンスの設定(maxlenはシーケンスの最大値,truncatingはmaxlen以上のシーケンスを前後どちらで切り詰めるか"post"は末尾(この場合は101回目以降がカットされる))
#print(tokenizer.word_index)
#print(X)
#シーケンス番号とAPI名称の対応状況について(補足)#
#トークン化の際に与えた番号とシーケンス番号が一致している
#上記対応付けを行うことでLSTMを使用して訓練を可能にしている
#正解ラベルの読み込み
with open("labels.csv") as f:
label_data = f.readlines()
#正解ラベルの整形
label_data = [x.strip() for x in label_data]
data["labels"] = label_data
#検出器の作成(今回はVirusとラベリングされている物を検出する)
y = data["labels"].apply(lambda x: 1 if x == "Backdoor.Win32.Androm" else 0) #条件定義(applyはデータフレームの各行に条件を適応する)
#訓練データとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.15) #テストデータが15%,訓練が85%で分割
#ニューラルネットワークの各層について(補足)
#埋め込み層...整数でエンコードされた単語ベクトルを受け取り,各単語のインデックスに対する分散表現を算出する
#分散表現...単語を低次元の実数値ベクトルで表す表現(他の方法では単語の並び順や順序を維持できない)
#LSTM層...前層からの出力である分散表現を受け取り,それをもとに訓練を行う
#出力層...予測値を出力する
#LSTMを用いてモデル準備
def malware_model():
model = Sequential() #Sequential宣言
model.add(Embedding(max_words, 300, input_length=max_len)) #整数でエンコードされた単語(API名)ベクトルを受け取り,各単語のインデックスに対する分散表現を算出する
model.add(LSTM(32, return_sequences=True))
model.add(Dense(1, activation='sigmoid'))
return model
#モデルを有効化して訓練
model = malware_model()
model.summary() #モデルの要約を出力する
model.compile( #訓練に関する設定を行う
loss='binary_crossentropy', #損失関数に二値交差エントロピーを指定
optimizer='adam',metrics=['accuracy'] #最適化アルゴリズムにAdamを指定
)
#損失関数...正解値と算出した予測値のズレを求めるのに用いる
#二値交差エントロピー...2つの確立分布がどれくらい離れているかを表す指標
#最適化アルゴリズム...損失を0にすることを目的としたもの
#Adam...わからん
history = model.fit( #訓練の実行(history = model.fit(...)は固定)
X_train,
y_train,
batch_size=64, #サブセット分ける(サイズは2のn乗から決める)
epochs=10, #学習する回数を設定(試しながら決める)
validation_data=(X_test, y_test), #検証するデータを引数として渡す
verbose=1 #結果をエポックごとのログがプログレスバーの進行状況とともに表示
)
#ハイパーパラメータチューニング
class Objective:
def __init__(self, X, y): #クラス構造で情報を保持するための重要な構文
self.X = X #クラス構造を取る際の定型の構文
self.y = y #クラス構造を取る際の定型の構文
def __call__(self, trial): #インスタンスを関数のとして扱う
# セッションのクリア
clear_session() #セッションのクリア
# モデルの作成と、パラメータ探索の設定
model = Sequential() #Sequentialモデルの生成
model.add(Embedding(max_words, 300, input_length=max_len)) #整数でエンコードされた単語(API名)ベクトルを受け取り,各単語のインデックスに対する分散表現を算出する
model.add(LSTM(32, return_sequences=True))
model.add(Dense(1, activation='sigmoid'))
optimizer = Adam(
learning_rate=trial.suggest_loguniform("learning_rate", 1e-5, 1e-1),
beta_1=trial.suggest_uniform("beta_1", 0.0, 1.0),
beta_2=trial.suggest_uniform("beta_2", 0.0, 1.0)
)
model.compile(
loss='binary_crossentropy',
optimizer=optimizer,
metrics=['accuracy']
)
model.fit(
self.X,
self.y,
batch_size=256,
epochs=10,
validation_data=(X_test, y_test)
)
return model.evaluate(X_test, y_test, verbose=0)[1]
objective = Objective(X_train, y_train)
study = optuna.create_study()
study.optimize(objective, timeout=1200)
print('params:', study.best_params)
#実検証
def buildmodel():
model = Sequential()
model.add(Embedding(max_words, 300, input_length=max_len))
# model.add(LSTM(32, return_sequences=True))
model.add(LSTM(32, return_sequences=False))
model.add(Dense(1, activation='sigmoid'))
# ベストのパラメータを設定
optimizer = Adam(
learning_rate = study.best_params['learning_rate'],
beta_1 = study.best_params['beta_1'],
beta_2 = study.best_params['beta_2']
)
model.compile(
loss='binary_crossentropy',
optimizer=optimizer,
metrics=['accuracy']
)
return model
clf = KerasClassifier(
buildmodel,
epochs=10,
batch_size=256,
verbose=1
)
results = cross_val_score(clf, X, y, cv=5)
print('Test accuracy: ', results.mean())