はじめに
製造業で機械学習を使おうとしたときに必ず「解釈性」を求められませんか?機械学習の解釈性についてはいろいろな手法がありますが、重回帰がのように式で表せると非常にうれしいです。そんなものを探している中で、LSRF5法というものを見つけたので実装してみました。
LSRF5法とは
LSRF5法(Least Squares-assisted Rule Extraction method Version 5)とは、ニューラルネットワークを用いて入出力間の関係をわかりやすく記述するために考えられた手法です。下記は重回帰とLSRF5法の最終的なアウトプットの式です。
重回帰
y = β_1x_1 + β_2x_2+ ・・・ β_nx_n + ε
LSRF5法
y = c_1x_{11}^{β_{11}}x_{12}^{β_{12}}・・・x_{1n}^{β_{1n}} + c_2x_{21}^{β_{21}}x_{22}^{β_{22}}・・・x_{2n}^{β_{2n}} + c_nx_{n1}^{β_{n1}}x_{n2}^{β_{n2}}・・・x_{nn}^{β_{nn}} + ε
重回帰が(係数)x(変数)で記述されるのに対して、LSRF5法が(変数)^(係数)の形で表せるのが特徴です。どちらも入力と出力の関係がなじみのある式で表せるので機械学習アレルギーがあっても受け入れられやすいのかなと思います。
LSRF5法では3層のニューラルネットをベースにしており、各変数の自然対数をとったものを入力として活性化関数にexpを使っています。中間層のノードの数で項の数が決まります。下の図自体はRF5法というみたいです。LSRF5法では、得られた重み(係数の乗数になるもの)を整数/(1~10の整数)で近似して、最終の重みciとバイアスc0を最小二乗法で決めなおすという操作をするのでRF5法にLSがついてLSRF5法になっているようです。
(機械学習を活用したアーク溶接現象数法則式の導出と知識抽出 https://www.jstage.jst.go.jp/article/materia/58/8/58_449/_pdf )
実装
今回は、テストデータとして1つの項+バイアスで表されるデータを使います。
df = pd.DataFrame()
df['x1'] = [random.randint(1, 100) for i in range(100)]
df['x2'] = [random.randint(50, 200) for i in range(100)]
df['x3'] = [random.randint(10, 600) for i in range(100)]
df['x4'] = [random.randint(100, 150) for i in range(100)]
df['target'] = 0.13*(df['x1']**(3/5) * df['x2']**(2/3)*df['x3']**(5/9)*df['x4']**(5/7)) + 20
df
STEP1
RF5法で入力と出力の関係を得ます。
class LSRF5Net(nn.Module):
def __init__(self, input_size, n_nodes, epochs):
super().__init__()
self.fc1 = nn.Linear(input_size, n_nodes, bias=False)
self.fc2 = nn.Linear(n_nodes,1)
self.epochs = epochs
def forward(self, x):
x = torch.exp(self.fc1(x))
x = self.fc2(x)
return x
STEP2
STEP1で得た重みを整数/(1~10の整数)で近似します。
中間層のノードは1つの3層ニューラルネットを定義します。
def decimal_to_fraction(decimal):
def gcd(a, b):
# 最大公約数を計算する関数
while b:
a, b = b, a % b
return a
# 整数部と小数部に分割
integer_part, decimal_part = divmod(decimal, 1)
# 分数化したい小数部を整数化
numerator = int(decimal_part * 10)
best_num = 0
diff_ = 0
# 分母の候補を1から10まで繰り返し、近似値を探す
for denominator in range(1, 11):
# 分数を計算
numerator_copy = numerator
denominator_copy = denominator
# 分数を約分
common_divisor = gcd(numerator_copy, denominator_copy)
numerator_copy //= common_divisor
denominator_copy //= common_divisor
# 分数と小数の差の絶対値を計算し、最も近い近似値を探す
diff = abs(decimal - (integer_part + numerator_copy / denominator_copy))
if denominator == 1:
best_num = integer_part + numerator_copy / denominator_copy
diff_ = diff
else:
if diff > abs(decimal - (integer_part + numerator_copy / denominator_copy)):
diff_ = diff
best_num = integer_part + numerator_copy / denominator_copy
else:
diff_ = diff_
return integer_part*denominator_copy+numerator_copy, denominator_copy
a = []
b = []
for decimal in best_model['fc1.weight'][0]: # ノード数を1にしているため
decimal = decimal.detach().numpy()
numerator, denominator = decimal_to_fraction(decimal)
a.append(denominator)
b.append(numerator)
STEP3
STEP2で得た整数/(1~10の整数)を指数に採用して、ciとc0を最小二乗法で決定します。
今回は中間層のノード数が1用のコードになっています。
def func(x, ci, c0):
# 求める関数の定義
y = ci * x + c0
return y
x_ = []
for x in X:
x_.append([np.prod(np.array(x) ** (np.array(b) / np.array(a)))])
x_ = np.array(x_).flatten() # 近似したb/aを使って項を計算する
y_ = np.array([i.detach().numpy() for i in y]).flatten() # 目標値をnumpy型に変換
initial = [best_model['fc2.weight'][0][0].detach().numpy(), best_model['fc2.bias'][0].detach().numpy()] # RF5法で得たci, c0を初期値として利用
# 最小二乗法でフィッテイング
popt, pcov = optimize.curve_fit(f=func, xdata=x_, ydata=y_, p0=initial)
STEP4
最終的な誤差を確認します。
# 誤差の確認
r2 = r2_score(y_, func(x_, popt[0], popt[1]))
rmse = np.sqrt(mean_squared_error(y_, func(x_, popt[0], popt[1])))
print('R2 : ', r2)
print('RMSE : ', rmse)
STEP5
必要に応じて中間層のノードを変更して、STEP1~4を繰り返します。
結果
今回、LSRF5法で得られた式は下記の通りでした。
y = 0.434x_1^{\frac{1}{2}}x_2^{\frac{3}{5}}x_3^{\frac{1}{2}}x_4^{\frac{7}{10}} -2386
用意したデータ
y = 0.13x_1^{\frac{3}{5}}x_2^{\frac{2}{3}}x_3^{\frac{5}{9}}x_4^{\frac{5}{7}} + 20
なかなか近づいてくれませんでしたが、STEP2のb/aに近似する部分をもう少しいい感じにできるんじゃないかなとなんとなく思っています。
終わりに
LSRF5法を試してみました。とりあえず実装というところでやってみたので結果はいまいちでしたが、式の形で表すことができるというのは非常に大きいメリットだと感じています。
参考文献
機械学習を活用したアーク溶接現象の数法則式の導出と知識抽出(https://www.jstage.jst.go.jp/article/materia/58/8/58_449/_pdf )
全実装
import numpy as np
import pandas as pd
import scipy.optimize as optimize
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score, mean_squared_error
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn as nn
import random
def fix_seeds(seed=0):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
fix_seeds(0)
df = pd.DataFrame()
df['x1'] = [random.randint(1, 100) for i in range(100)]
df['x2'] = [random.randint(50, 200) for i in range(100)]
df['x3'] = [random.randint(10, 600) for i in range(100)]
df['x4'] = [random.randint(100, 150) for i in range(100)]
df['target'] = 0.13*(df['x1']**(3/5) * df['x2']**(2/3)*df['x3']**(5/9)*df['x4']**(5/7)) + 20
def early_Stopping(loss_values, patience=10, delta=0.000001):
if len(loss_values) <= patience:
return False # 学習を続ける
minimum = np.min(loss_values[:-patience])
current = loss_values[-1]
diff = abs(current - minimum)
if diff < delta:
return True # 損失関数の改善がなくなったと判定してきりあげる
else:
return False # 学習を続ける
# 3層のニューラルネットを構成する
class LSRF5Net(nn.Module):
def __init__(self, input_size, n_nodes, epochs):
super().__init__()
self.fc1 = nn.Linear(input_size, n_nodes, bias=False)
self.fc2 = nn.Linear(n_nodes,1)
self.epochs = epochs
def forward(self, x):
x = torch.exp(self.fc1(x))
x = self.fc2(x)
return x
def train_step(self, X, y, optimizer, loss_fn):
optimizer.zero_grad()
pred = self.forward(X)
loss = loss_fn(pred, y)
loss.backward()
optimizer.step()
loss = loss.detach().numpy()
return loss, pred
def train(self, X, y, optimizer, loss_fn, early_stopping):
loss_values = []
preds = []
best_model = []
best_loss = 0
for i in range(self.epochs):
loss, pred = self.train_step(X, y, optimizer, loss_fn)
loss_values.append(loss)
preds.append(pred)
if i == 0:
best_loss = loss
else:
if best_loss > loss:
best_loss = loss
best_model = self.state_dict()
if i % 4000 == 0:
print(str(i), 'Loss:', loss)
if early_stopping(loss_values):
print(str(i), 'Loss:', loss)
break
return loss_values, preds, best_model
X = df.drop('target', axis=1).values
logX = torch.from_numpy(np.log(X)).to(torch.float32)
y = df['target'].values
y = torch.from_numpy(y).to(torch.float32).unsqueeze(1)
n_nodes = 1
epochs = 30000
fix_seeds(0)
LSRF5 = LSRF5Net(X.shape[1], n_nodes, epochs)
optimizer = optim.Adam(LSRF5.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
early_stopping = early_Stopping
loss_values, preds, best_model = LSRF5.train(logX, y, optimizer, loss_fn, early_stopping)
def decimal_to_fraction(decimal):
def gcd(a, b):
# 最大公約数を計算する関数
while b:
a, b = b, a % b
return a
# 整数部と小数部に分割
integer_part, decimal_part = divmod(decimal, 1)
# 分数化したい小数部を整数化
numerator = int(decimal_part * 10)
best_num = 0
diff_ = 0
# 分母の候補を1から10まで繰り返し、近似値を探す
for denominator in range(1, 11):
# 分数を計算
numerator_copy = numerator
denominator_copy = denominator
# 分数を約分
common_divisor = gcd(numerator_copy, denominator_copy)
numerator_copy //= common_divisor
denominator_copy //= common_divisor
# 分数と小数の差の絶対値を計算し、最も近い近似値を探す
diff = abs(decimal - (integer_part + numerator_copy / denominator_copy))
if denominator == 1:
best_num = integer_part + numerator_copy / denominator_copy
diff_ = diff
else:
if diff > abs(decimal - (integer_part + numerator_copy / denominator_copy)):
diff_ = diff
best_num = integer_part + numerator_copy / denominator_copy
else:
diff_ = diff_
return integer_part*denominator_copy+numerator_copy, denominator_copy
a = []
b = []
for decimal in best_model['fc1.weight'][0]: # ノード数を1にしているため
decimal = decimal.detach().numpy()
numerator, denominator = decimal_to_fraction(decimal)
a.append(denominator)
b.append(numerator)
print('b:', b)
print('a:', a)
print(np.array(b) / np.array(a))
def func(x, ci, c0):
# 求める関数の定義
y = ci * x + c0
return y
x_ = []
for x in X:
x_.append([np.prod(np.array(x) ** (np.array(b) / np.array(a)))])
x_ = np.array(x_).flatten() # 近似したb/aを使って項を計算する
y_ = np.array([i.detach().numpy() for i in y]).flatten() # 目標値をnumpy型に変換
initial = [best_model['fc2.weight'][0][0].detach().numpy(), best_model['fc2.bias'][0].detach().numpy()] # RF5法で得たci, c0を初期値として利用
# 最小二乗法でフィッテイング
popt, pcov = optimize.curve_fit(f=func, xdata=x_, ydata=y_, p0=initial)
# 誤差の確認
r2 = r2_score(y_, func(x_, popt[0], popt[1]))
rmse = np.sqrt(mean_squared_error(y_, func(x_, popt[0], popt[1])))
print('R2 : ', r2)
print('RMSE : ', rmse)
改定
2024/5/1 URLがリンク切れになっていたところを修正しました。