はじめに
Pythonには、scikit-learnなど便利なライブラリが用意されており、比較的簡単に解析プログラムを組むことができます。ライブラリの中身はブラックボックスになりがちです。
本稿では、単回帰解析に焦点を当て、そのブラックボックスの中を覗いてみたいと思います。
使用するライブラリは、おもにnumpyとTensorFlowとなります。
本稿を書くに当たり、下記書籍を参考にしました。
TensorFlow機械学習クックブック Pythonベースの活用レシピ60+は好著なのですが、TensorFlow1系でプログラムが書かれています。せっかくなので、ここではTensorFlow2系でプログラミングしてみたいと思います。
単回帰解析
説明変数 $(x_i, y_i)$ にできるだけ適合する回帰直線 $y = Ax + b$ を求めます。
最小二乗法(least squares method)
説明変数 $x_1, \cdots, x_N, y_1, \cdots, y_N \in \mathbb{R}$ の直線からの距離 $|y_i - b - Ax_i|$ の二乗の和 $L:=\sum_{i=1}^{N}(y_i - b - Ax_i)^2$ を最小にする回帰直線の切片 $b$ と傾き $A$ を求めます。
$L$ を $A, b$ で偏微分して0とおくと、下記2式が得られます。
\begin{eqnarray}
\frac{{\partial}L}{{\partial}b} &=& -2 \sum_{i=1}^{N}(y_i - b - Ax_i) = 0 \tag{1} \\
\frac{{\partial}L}{{\partial}A} &=& -2 \sum_{i=1}^{N}x_i(y_i - b - Ax_i) = 0 \tag{2}
\end{eqnarray}
この連立方程式を解きます。
(1)(2)を解くと、$\sum_{i=1}^{N}(x_i-\bar{x})^w \neq 0$, すなわち $x_1 = \cdots = x_N ではない$ とき、
\begin{eqnarray}
\hat{A} &=& \frac{\sum_{i=1}^{N}(x_i - \bar{x})(y_i - \bar{y})}{\sum_{i=1}^{N}(x_i - \bar{x})^2} \tag{3} \\
\hat{b} &=& \bar{y} - \hat{A}\bar{x} \tag{4}
\end{eqnarray}
が得られると、統計的機械学習の数理100問 with Pythonに解説が載っています。
この連立方程式から解が得る過程をもう少し詳しく見てみます。こちらのページ(単回帰分析・最小二乗法の公式はどうすれば求められるのか。統計上の誤差と残差の違い)を参考にしました。
\begin{eqnarray}
0 &=& \sum_{i=1}^{n}y_i - \hat{A}\sum_{i=1}^{n}x_i - \hat{b} \cdot n \\
\hat{b} &=& \frac{1}{n}\sum_{i=1}^{n}y_i - \frac{\hat{A}}{n}\sum_{i = 1}^{n}x_i \\
&=& \bar{y} - \hat{A} \cdot \bar{x} \\
\\
0 &=& \sum_{i=1}^{n}x_{i}y_{i} - \hat{A}\sum_{i=1}^{n}x_i^2 - \hat{b}\sum_{i=1}^{n}x_i \\
&=& \sum_{i=1}^{n}{x_i}{y_i} - \bar{y}\sum_{i=1}^{n}x_i - \hat{A} \left( \sum_{i=1}^{n}x_i^2 - \bar{x}\sum_{i=1}^{n}x_i \right) \\
&=& \sum_{i=1}^{n}{x_i}{y_i} - n \bar{x} \bar{y} - \hat{A} \left( \sum_{i=1}^{n}x_i^2 - n \bar{x}^2 \right) \\
\hat{A} &=& \frac{\sum_{i=1}^{n}{x_i}{y_i} - n \bar{x} \bar{y}}{\sum_{i=1}^{n}x_i^2 - n\bar{x^2}} \\
&=& \frac{\sum_{i=1}^{n}{(x_iy_i - \bar{x}\bar{y})}}{\sum_{i=1}^{n}{(x_i^2 - \bar{x}^2)}}
\end{eqnarray}
先ほどの解 $\hat{A} = \frac{\sum_{i=1}^{N}(x_i - \bar{x})(y_i - \bar{y})}{\sum_{i=1}^{N}(x_i - \bar{x})^2}$ と $\hat{A} = \frac{\sum_{i=1}^{n}{(x_iy_i - \bar{x}\bar{y})}}{\sum_{i=1}^{n}{(x_i^2 - \bar{x}^2)}}$ を確認してみます。分母に着目すると、$\sum_{i=1}^{N}(x_i - \bar{x})^2 =\sum_{i=1}^{n}{(x_i^2 - \bar{x}^2)}$ ということになります。$(x_i - \bar{x})^2$ を展開すると、$x_i^2 - 2x_i\bar{x} + \bar{x}^2$ となるので、腑に落ちません。$\bar{x}$ が平均値 $\frac{1}{n}{\sum_{i=1}^{n}{x_i}}$ だからだろうと推測しましたが、せっかくなので数学的に確認してみました。
まず、$x_0, x_1, \cdots, x_n$ の合計を $T = \sum_{i=1}^{n}{x_i}$ とおいておきます。
\begin{eqnarray}
\sum_{i=1}^{n}{(x_i^2 - \bar{x}^2)} &=& \frac{\sum_{i=1}^{n}{(n^2x_i^2 - T^2)}}{n^2} \tag{5}\\
\sum_{i=1}^{n}(x_i - \bar{x})^2 &=& \frac{\sum_{i=1}^{n}{(nx_i - T)^2}}{n^2} \tag{6}
\end{eqnarray}
(5), (6)より
\begin{eqnarray}
\sum_{i=1}^{n}{(n^2x^2_i - T^2)} &=& \sum_{i=1}^{n}{(nx_i - T)^2} \\
&=& \sum_{i=1}^{n}{(n^2x_i^2 - 2nx_iT + T^2)}
\end{eqnarray}
が得られます。
\begin{eqnarray}
-\sum_{i=1}^{n}{T^2} &=& \sum_{i=1}^{n}{(T^2 - 2nx_iT)} \\
-nT^2 &=& nT^2 - \sum_{i=1}^{n}{2nx_iT} \\
-2nT^2 &=& -2nT\sum_{i=1}^{n}{x_i} \\
T &=& \sum_{i=1}^{n}{x_i}
\end{eqnarray}
$T=\sum_{i=1}^{n}{x_i}$であるから、両辺が等価であることが分かります。
少し回り道をしましたが、早速(3)、(4)の式を使って、最小二乗法を実装してみます。
プログラム
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
# データの作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100)
y_vals = x_vals + np.random.normal(0, 1, 100)
def min_eq(x, y):
x_bar, y_bar = np.mean(x), np.mean(y)
beta_1 = tf.divide(np.dot(tf.subtract(x, x_bar), tf.subtract(y, y_bar)), tf.square(tf.linalg.norm(tf.subtract(x, x_bar))))
beta_0 = tf.subtract(y_bar, tf.multiply(beta_1, x_bar))
return beta_1, beta_0
A, b = min_eq(x_vals, y_vals)
print('slope: {}'.format(A.numpy()))
print('y_intercept: {}'.format(b.numpy()))
slope: 0.9931105078364767
y_intercept: 0.06955974394305287
# 最も適合する直線を取得
best_fit = []
for i in x_vals:
best_fit.append(A * i + b)
# 結果をプロット
plt.plot(x_vals, y_vals, 'o', label='Data')
plt.plot(x_vals, best_fit, 'r-', label='Best fit line', linewidth=3)
plt.legend(loc='upper left')
plt.show()
逆行列法
線形回帰(linear regression)は、$A \cdot x = b$ といった一連の行列方程式として表すことができます。行列 $x$ の係数を求めます。$x$ を解くための式は、$x = (A^T \cdot A)^{-1} \cdot A^T \cdot b$ として表すことがでます。
もう少し詳しく見てみます。
直線の方程式は次のように表される。
y = ax + b
この直線が点 $(x_1, y_1)$ と点 $(x_2, y_2)$ を通過するならば、
y_1 = ax_1 + b \\
y_2 = ax_2 + b
が成立する。これを行列で書くと
\begin{bmatrix}
y_1 \\
y_2
\end{bmatrix}
=
\begin{bmatrix}
x_1 & 1 \\
x_2 & 1
\end{bmatrix}
\begin{bmatrix}
a \\
b
\end{bmatrix}
となる。各行列を簡略化して表すと
$$
Y = XA
$$
3点以上が与えられた場合、各項は下記のようになります。
Y =
\begin{bmatrix}
y_1 \\
y_2 \\
y_3
\end{bmatrix} \tag{1}
A =
\begin{bmatrix}
a \\
b
\end{bmatrix} \tag{2}
X =
\begin{bmatrix}
x_1 && 1\\
x_2 && 1\\
x_3 && 1
\end{bmatrix} \tag{3}
$X$ は正方行列ではなく、逆行列を持てません。
そこで、$A$ の前にある行列項を、行列演算して、正方行列に変換します。
$X$ の転置行列 $X^T$ を乗じます。
X^T Y = X^T X A \tag{4}
$X^T X$は正方行列になるので、これが正則行列であるならば、逆行列 $(X^T X)^{-1}$ が存在します。行列式を式(4)の左から掛けると
(X^T X)^{-1} X^T Y = (X^T X)^{-1} X^T X A \tag{5}
右辺Aの前の行列演算結果は単位行列 $E$ となるので、次式が得られます。
(X^T X)^{-1} X^T Y = A \tag{6}
このようにして一組のAが得られます。
プログラム
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
# データの作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100)
y_vals = x_vals + np.random.normal(0, 1, 100)
# 計画行列Aを作成
x_vals_column = np.transpose(np.matrix(x_vals))
ones_column = np.transpose(np.matrix(np.repeat(1, 100)))
A = np.column_stack((x_vals_column, ones_column))
# 行列bを作成
b = np.transpose(np.matrix(y_vals))
# テンソルを作成
A_tensor = tf.constant(A)
b_tensor = tf.constant(b)
# 逆行列法を適用
tA_A = tf.matmul(tf.transpose(A_tensor), A_tensor)
tA_A_inv = tf.linalg.inv(tA_A)
product = tf.matmul(tA_A_inv, tf.transpose(A_tensor))
solution_eval = tf.matmul(product, b_tensor)
# 係数を抽出
slope = solution_eval[0][0]
y_intercept = solution_eval[1][0]
print('slope: ' + str(slope))
print('y_intercept: ' + str(y_intercept))
slope: tf.Tensor(0.9931105078364761, shape=(), dtype=float64)
y_intercept: tf.Tensor(0.06955974394305653, shape=(), dtype=float64)
# 最も適合する直線を取得
best_fit = []
for i in x_vals:
best_fit.append(slope * i + y_intercept)
# 結果をプロット
plt.plot(x_vals, y_vals, 'o', label='Data')
plt.plot(x_vals, best_fit, 'r-', label='Best fit line', linewidth=3)
plt.legend(loc='upper left')
plt.show()
行列分解法
逆行列法の実装は、行列が大きくなると計算効率が落ちる場合があります。行列 $A$ を分解し、分解した行列で行列演算を行う方法を実装します。TensorFlowに実装されているコレスキー分解関数を利用します。ここでは、連立方程式 $A \cdot x = b$ を $L \cdot L' \cdot x = b$ として解いてみよう。まず $L \cdot y = b$ を解き、次に $L' \cdot x = y$ を解くことで、係数行列 $x$ を求めます。
import matplotlib.pyplot as pyplot
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import ops
ops.reset_default_graph()
# データを作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100)
y_vals = x_vals + np.random.normal(0, 1, 100)
# 計画行列Aを作成
x_vals_column = np.transpose(np.matrix(x_vals))
ones_column = np.transpose(np.matrix(np.repeat(1, 100)))
A = np.column_stack((x_vals_column, ones_column))
# 行列bを作成
b = np.transpose(np.matrix(y_vals))
# テンソルを作成
A_tensor = tf.constant(A)
b_tensor = tf.constant(b)
正方行列のコレスキー分解を実行します。
# コレスキー分解
tA_A = tf.matmul(tf.transpose(A_tensor), A_tensor)
L = tf.linalg.cholesky(tA_A)
# L * y = t(A) * b を解く
tA_b = tf.matmul(tf.transpose(A_tensor), b)
sol1 = tf.linalg.solve(L, tA_b)
# L' * y = sol1 を解く
sol2 = tf.linalg.solve(tf.transpose(L), sol1)
solution_eval = sol2
# 係数を抽出
slope = solution_eval[0][0]
y_intercept = solution_eval[1][0]
print('slope: {}'.format(slope))
print('y_intercept: {}'.format(y_intercept))
slope: 0.9931105078364756
y_intercept: 0.06955974394305893
# 最も適合する直線を取得
best_fit = []
for i in x_vals:
best_fit.append(slope + i + y_intercept)
# 結果をプロット
plt.plot(x_vals, y_vals, 'o', label='Data')
plt.plot(x_vals, best_fit, 'r-', label=' Best fit line', linewidth=3)
plt.legend(loc='upper left')
plt.show()
TensorFlowでの線形回帰の実装パターン
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import ops
ops.reset_default_graph()
# データを作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100).astype(np.float32)
y_vals = x_vals + np.random.normal(0, 1, 100).astype(np.float32)
# 学習率とバッチサイズを設定
learning_rate = 0.01
batch_size = 25
A = tf.Variable(0.)
b = tf.Variable(0.)
線形モデルの方程式 y=Ax+b を設定する。
def f(x):
y = A * x + b
return y
def loss(y_pred, y_true):
return tf.reduce_mean(tf.square(y_pred - y_true))
loss_vec = []
for i in range(100):
with tf.GradientTape() as tape:
y_predicted = f(x_vals)
tmp_loss = loss(y_predicted, y_vals)
# get gradients
gradients = tape.gradient(tmp_loss, [A, b])
loss_vec.append(tmp_loss)
# compute and adjust weights
A.assign_sub(gradients[0] * learning_rate)
b.assign_sub(gradients[1] * learning_rate)
if (i + 1) % 25 == 0:
print('Step #{} A = {} b = {}'.format(i + 1, A.numpy(), b.numpy()))
print('Loss = {}'.format(tmp_loss))
Step #25 A = 0.9827542901039124 b = 0.13843616843223572
Loss = 0.9922008514404297
Step #50 A = 0.9839657545089722 b = 0.13037890195846558
Loss = 0.9919329285621643
Step #75 A = 0.9850354790687561 b = 0.12326423078775406
Loss = 0.9917240738868713
Step #100 A = 0.9859801530838013 b = 0.11698180437088013
Loss = 0.9915611147880554
plt.scatter(x_vals, y_vals)
# plot the best fit line
plt.plot(x_vals, f(x_vals), 'r')
plt.show()
plt.plot(loss_vec, 'k-')
plt.title('L2 Loss per Generation')
plt.xlabel('Generation')
plt.ylabel('L2 Loss')
plt.show()
線形回帰の損失関数
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from sklearn import datasets
from tensorflow.python.framework import ops
ops.reset_default_graph()
# データを作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100).astype(np.float32)
y_vals = x_vals + np.random.normal(0, 1, 100).astype(np.float32)
# バッチサイズ、学習率、繰り返し回数を設定
batch_size = 25
learning_rate = 0.1 # 学習率が0.4を超えると収束しなくなる。
iterations = 50
# 変数、モデルの演算を定義
A = tf.Variable(tf.random.normal(shape=[1, 1]))
b = tf.Variable(tf.random.normal(shape=[1, 1]))
def model_output(x):
return tf.add(tf.matmul(x, A), b)
損失関数をL1損失関数に変更します。
def loss_l1(model, x, y):
return tf.reduce_mean(tf.abs(y - model(x)))
# 最適化関数を指定
my_opt_l1 = tf.keras.optimizers.SGD(learning_rate=learning_rate)
# トレーニンググループを開始
loss_vec_l1 = []
for i in range(iterations):
rand_index = np.random.choice(len(x_vals), size=batch_size)
rand_x = tf.Variable(np.transpose([x_vals[rand_index]]))
rand_y = tf.Variable(np.transpose([y_vals[rand_index]]))
with tf.GradientTape() as tape:
temp_loss_l1 = loss_l1(model_output, rand_x, rand_y)
grads = tape.gradient(temp_loss_l1, [A, b])
loss_vec_l1.append(temp_loss_l1)
my_opt_l1.apply_gradients(zip(grads, [A, b]))
if (i + 1) % 25 == 0:
print('Step #{} A = {} b = {}'.format(i + 1, A.numpy(), b.numpy()))
Step #25 A = [[0.96738863]] b = [[0.88311166]]
Step #50 A = [[0.780318]] b = [[0.5991116]]
plt.scatter(x_vals, y_vals)
# plot the best fit line
plt.plot(x_vals, model_output(tf.Variable(np.transpose([x_vals]))), 'r')
plt.show()
L2損失関数を使用するコード
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100).astype(np.float32)
y_vals = x_vals + np.random.normal(0, 1, 100).astype(np.float32)
# バッチサイズ、学習率、繰り返し回数を設定
batch_size = 25
learning_rate = 0.01 # 学習率が0.4を超えると収束しなくなる。
iterations = 50
# 変数、モデルの演算を定義
A = tf.Variable(tf.random.normal(shape=[1, 1]))
b = tf.Variable(tf.random.normal(shape=[1, 1]))
def model_output(x):
return tf.add(tf.matmul(x, A), b)
def loss_l2(model, x, y):
return tf.reduce_mean(tf.square(y - model(x)))
# 最適化関数を指定
my_opt_l2 = tf.keras.optimizers.SGD(learning_rate=learning_rate)
# トレーニンググループを開始
loss_vec_l2 = []
for i in range(iterations):
rand_index = np.random.choice(len(x_vals), size=batch_size)
rand_x = tf.Variable(np.transpose([x_vals[rand_index]]))
rand_y = tf.Variable(np.transpose([y_vals[rand_index]]))
with tf.GradientTape() as tape:
temp_loss_l2 = loss_l2(model_output, rand_x, rand_y)
grads = tape.gradient(temp_loss_l2, [A, b])
loss_vec_l2.append(temp_loss_l2)
my_opt_l2.apply_gradients(zip(grads, [A, b]))
if (i + 1) % 25 == 0:
print('Step #{} A = {} b = {}'.format(i + 1, A.numpy(), b.numpy()))
Step #25 A = [[1.061035]] b = [[-0.483181]]
Step #50 A = [[1.0561559]] b = [[-0.41459158]]
plt.scatter(x_vals, y_vals)
# plot the best fit line
plt.plot(x_vals, model_output(tf.Variable(np.transpose([x_vals]))), 'r')
plt.show()
plt.plot(loss_vec_l1, 'k-', label='L1 Loss')
plt.plot(loss_vec_l2, 'r--', label='L2 Loss')
plt.title('L1 and L2 Loss per Generattion')
plt.xlabel('Generation')
plt.ylabel('L1 loss')
plt.legend(loc='upper right')
plt.show()
デミング回帰
最小二乗線形回帰(least squares linear regression)が直線に対する線の距離を最小化するのに対し、デミング回帰は直線に対する全距離を最小化します。
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from sklearn import datasets
from tensorflow.python.framework import ops
ops.reset_default_graph()
# データを作成
np.random.seed(1234)
x_vals = np.linspace(0, 10, 100).astype(np.float32)
y_vals = x_vals + np.random.normal(0, 1, 100).astype(np.float32)
# バッチサイズ、学習率、繰り返し回数を設定
batch_size = 25
learning_rate = 0.05 # 学習率が0.4を超えると収束しなくなる。
iterations = 50
# 変数、モデルの演算を定義
A = tf.Variable(tf.random.normal(shape=[1, 1]))
b = tf.Variable(tf.random.normal(shape=[1, 1]))
def model_output(x):
return tf.add(tf.matmul(x, A), b)
直線を $y = mx + b$ 、データ点を $(x_0, y_0)$ とすると、これらの点の垂直距離を次のように定義できます。
d = \frac{|y_0 - (mx_0 + b)|}{\sqrt{m^2 +1}}
def loss(model, x, y):
deming_numerator = tf.abs(tf.subtract(y, tf.add(tf.matmul(x, A), b)))
deming_denominator = tf.sqrt(tf.add(tf.square(A), 1))
return tf.reduce_mean(tf.truediv(deming_numerator, deming_denominator))
# 最適化関数を指定
my_opt = tf.keras.optimizers.SGD(learning_rate=0.05)
# トレーニンググループを開始
loss_vec = []
for i in range(1500):
rand_index = np.random.choice(len(x_vals), size=batch_size)
rand_x = tf.Variable(np.transpose([x_vals[rand_index]]))
rand_y = tf.Variable(np.transpose([y_vals[rand_index]]))
with tf.GradientTape() as tape:
temp_loss = loss(model_output, rand_x, rand_y)
grads = tape.gradient(temp_loss, [A, b])
loss_vec.append(temp_loss)
my_opt.apply_gradients(zip(grads, [A, b]))
if (i + 1) % 100 == 0:
print('Step #{} A = {} b = {}'.format(i + 1, A.numpy(), b.numpy()))
Step #100 A = [[1.0606606]] b = [[-0.02573951]]
Step #200 A = [[1.0778327]] b = [[-0.05546956]]
Step #300 A = [[1.0786034]] b = [[-0.11129877]]
Step #400 A = [[1.0735927]] b = [[-0.13460638]]
Step #500 A = [[0.9873692]] b = [[-0.10372327]]
Step #600 A = [[1.0768895]] b = [[-0.12781776]]
Step #700 A = [[0.9659874]] b = [[-0.15313245]]
Step #800 A = [[1.1023445]] b = [[-0.15335608]]
Step #900 A = [[1.0503265]] b = [[-0.17138588]]
Step #1000 A = [[1.0963748]] b = [[-0.20842536]]
Step #1100 A = [[1.060973]] b = [[-0.2865414]]
Step #1200 A = [[1.0580977]] b = [[-0.24569693]]
Step #1300 A = [[1.0354245]] b = [[-0.1955461]]
Step #1400 A = [[1.0163252]] b = [[-0.19011997]]
Step #1500 A = [[1.1161515]] b = [[-0.20182066]]
# 係数を抽出
slope = A[0][0]
y_intercept = b[0][0]
# 最も適合する直線を取得
best_fit = []
for i in x_vals:
best_fit.append(slope + i + y_intercept)
# 結果をプロット
plt.plot(x_vals, y_vals, 'o', label='Data')
plt.plot(x_vals, best_fit, 'r-', label=' Best fit line', linewidth=3)
plt.legend(loc='upper left')
plt.show()
まとめ
TensorFlowとnumpyで単回帰解析を実装することにより、数学的な定義に基いたロジックを理解することができました。今後は、重回帰解析、サポートベクトルマシン、ニューラルネットワークなどをTensorFlowとnumpyで実装し、数学的に理解を深めていきたいと思います。
最後に、数学は楽しいです。あと、プログラマーの性か、$\sum_{i=1}^{n}$ と書くべきところをついつい $\sum_{i=0}^{n}$ と書いてしまいます。