はじめに
『機械学習はデータの準備が大事!!!』とよく言われる。PyTorchの勉強中もそんなかんじだ。その割にその『準備』の仕方を教えてくれないことがほとんど。
準備万端のデータを持ってきてそれではお勉強を始めましょうでも良いのだが、『どうすれば生データを学習できる形にできるの?』 という肝心なところが書いていないサイトばかりで私のような初学者にとって応用がかなり難しいと感じた。
今回は一番ありがちであろうpandas.DataFrame
形式からスタートし、学習可能な形式(機械学習でもReady to Runっていうのだろうか?)まで整形、学習、そのモデルとパラメータを保存して使用する方法まで一気通貫で記述する。
ここで言葉の定義として以下のように呼び分けることとする。そこまで気にしなくても大丈夫。
- アーキテクチャ
ニューラルネットワークの構造そのもの(Classとして定義) - モデル
ニューラルネットワークの構造と重みやバイアスなどを含めた実態(アーキテクチャクラスをインスタンス化したもの)
- データの準備
- テータの整形
- アーキテクチャのクラス、学習/テスト関数の定義
- インスタンス化して学習を実行
- 学習状態のグラフ化
- 未知データに対する予測
- モデルの保存と読み込み
このページでは細かいところには言及しない。
具体的な内容はPyTorchの公式がめちゃ丁寧に書いてくれているので、英語が読める方は質の高い一次情報を読むことを推奨します。
作りたいモデル
みんな大好きscikit-learnのirisデータを使用する。
データを与えられると、setosa, versicolor, virginicaのどれであるかを判断するモデルを作成する。
-
説明変数
sepal length (cm)
,sepal width (cm)
,petal length (cm)
,petal width (cm)
(それぞれ$l_s,w_s,l_p,w_p$と表記)を縦にならべたベクトル$\boldsymbol{X}$ -
目的変数
setosa,versicolor,virginicaである確率(それぞれ$p_\text{setosa},p_\text{versicolor},p_\text{virginica}$と表記)をを縦にならべた離散的な確率分布ベクトル$\boldsymbol{p}$
つまりこういう関数$f$をニューラルネットワークモデルとして作りたい。
$$
\begin{pmatrix}
l_s \\ w_s \\ l_p \\ w_p
\end{pmatrix}
\xrightarrow{f}
\begin{pmatrix}
p_\text{setosa} \\ p_\text{versicolor} \\ p_\text{virginica}
\end{pmatrix}
$$
とする。
このあたりは用途によって変わるが、「入力も出力もベクトル」であることと「入出力ベクトルの次元は一般に一致しない」くらい分かれば良い気がする。
モジュールの読み込みと設定
まずは必要となるモジュールを読み込む。from sklearn import datasets
はサンプルデータ用なので本質的には必要ない。今回numpy
のメソッドは使用しませんが、データ整形時に必要であればimportしてください。
# 主要モジュール
import torch
import pandas as pd
# import numpy as np
import matplotlib.pyplot as plt
# 学習用のサンプルデータ
from sklearn import datasets
# データを整理する
from torch.utils.data import DataLoader
# 全結合層と活性化関数
from torch import nn
from torch.nn import functional as F
# 損失関数と最適化関数
from torch import optim
GPUが使えるかどうかを確認する
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"run with {device}")
GPU(CUDA)の設定をしていればrun with cuda
と表示され、そうでなければrun with cpu
と表示されるはず。
今回の計算はそんなに重くないのでCPUでも問題ない。
PyTorchは仕様上、CPU上のデータとGPU上のデータで計算をすることができない。device
という変数に予め勝負するフィールドを設定しておくことでCPU vs GPUの衝突が起きないようにしている。
衝突が起きるとエラーを吐いてくれるので原因っぽいところに.to("cpu")
or.to("cuda")
を使ってCPUないしGPUに送ってあげれば解決することが多い。
データの準備
irisデータをサンプルとして持ってくる。入力データとして一般的っぽい表形式にしておく。
このセクションは本題ではない。
iris = datasets.load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
# targetというカラムに目的変数となる名前を入れる
df['target'] = iris.target_names[iris.target]
以下のような150行5列のDataFrameが得られるはずだ。
sepal length (cm) | sepal width (cm) | petal length (cm) | petal width (cm) | target | |
---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | setosa |
1 | 4.9 | 3.0 | 1.4 | 0.2 | setosa |
2 | 4.7 | 3.2 | 1.3 | 0.2 | setosa |
3 | 4.6 | 3.1 | 1.5 | 0.2 | setosa |
4 | 5.0 | 3.6 | 1.4 | 0.2 | setosa |
... | ... | ... | ... | ... | ... |
150 rows × 5 columns
データの整形
ここからが本題。ここからが、というかこの章がまさに私が知りたかった内容だ。
表形式で入力されたデータを下記の手順で変換していく。
DataFrame
→ndarray
→Tensor
→TensorDataset
→DataLoader
具体的には以下のような操作を実施する。
-
DataFrame
の説明変数と目的変数を別の変数に分け、データを数量化(今回はone-hot表現)する -
DataFrame
をndarray
に変換 -
ndarray
をTensor
に変換 - 説明変数と目的変数の
Tensor
からTensorDataset
を作成 -
TensorDataset
を学習用とテスト用に分割 -
TensorDataset
からDataLoader
を作成する
# 1.DataFrameを説明変数と目的変数に分ける
x = df[["sepal length (cm)",
"sepal width (cm)",
"petal length (cm)",
"petal width (cm)"]]
t = df[["target"]]
t = pd.get_dummies(df[["target"]])
# 2.DataFrameをnarrayに変換
x = x.to_numpy().astype("float32")
t = t.to_numpy().astype("float32")
# 3.ndarrayをTensorに変換
x = torch.from_numpy(x).to(device)
t = torch.from_numpy(t).to(device)
# 4.TensorからDatasetを作成
dataset = torch.utils.data.TensorDataset(x, t)
# 5.学習用データとテスト用データに分ける
r = 0.8
train_dataset, test_dataset = torch.utils.data.random_split(dataset, [r,1-r])
# 6.DataLoaderに変換
batch_size = 20
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)
-
get_dummies
メソッドを使用してone-hot表現に変換。 -
to_numpy()
メソッドでndarray
に変換。
.astype("float32")
で実数をPyTorchのデフォルトであるfloat32に変換した。
NumPyとPyTorchはデフォルトの型が違うのでPyTorchのデフォルトであるfloat32
かint64
へ明示的に変更しておくことが望ましい。 -
torch.from_numpy()
関数でTensor
に変換。.to(device)
でCPUないしGPUへデータを転送している。 -
torch.utils.data.TensorDataset()
関数でDataset
を作成 - 全データからランダムに選んだ80%を学習用、残りはテスト用とした。
- バッチサイズ(ミニバッチ)を20とし
DataLoader
に変換
ここまでできれば一旦データの整形は終了。
train_loader
で学習、test_loader
でテストをおこなっていく準備ができた。
ここからニューラルネットワークのアーキテクチャを構築していく。
アーキテクチャとtrain/test関数の定義
アーキテクチャをClass、trainとtestは関数として定義しておく。
アーキテクチャの定義
nn.Module
を継承したClassを定義する。
__init(self)__
でスーパークラスのsuper.__init__()
を呼び出したあとで層、損失関数、最適化関数、などを定義する。
forward(self, x)
で順伝播の流れ方を定義する。
__init__
とforward
はこの名前でなければならない。
optim.Adam(self.parameters())
に変数lr=数値
と記述すると学習係数を設定できる。
学習係数を大きくすると学習を早く進める事ができるが、最適解の近くで振動収束しない場合がある。逆に小さくすると最適解に収まりやすいが、収束に時間がかかる。
class Net(nn.Module):
def __init__(self):
super().__init__()
# 全結合層を3つ
self.fc1 = nn.Linear(4, 64)
self.fc2 = nn.Linear(64, 64)
self.fc3 = nn.Linear(64, 3)
# 損失関数と最適化関数
self.criterion = nn.CrossEntropyLoss()
self.optimizer = optim.Adam(self.parameters())
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
x = F.relu(x)
x = self.fc3(x)
return x
ちなみに雛形は下記のもので、ネットの情報から中身を埋めていけば良い。
class Net(nn.Module):
def __init__(self):
super().__init__()
...
def forward(self, x):
...
return x
ちょっと算数の話
self.fc = nn.Linear(N, M)
は線形変換、いわゆる行列を定義しているにすぎない。
y = self.fc(x)
と記述すれば
\begin{pmatrix}
y_1 \\\ y_2 \\\ \vdots \\\ y_M
\end{pmatrix}
=
\begin{pmatrix}
w_{11} & \cdots & w_{1j} & \cdots & w_{1N}\\
\vdots & \ddots & & & \vdots \\
w_{i1} & & w_{ij} & & w_{iN} \\
\vdots & & & \ddots & \vdots \\
w_{M1} & \cdots & w_{Mj} & \cdots & w_{MN}
\end{pmatrix}
\begin{pmatrix}
x_1 \\\ x_2 \\\ \vdots \\\ x_N
\end{pmatrix}
+
\begin{pmatrix}
b_1 \\\ b_2 \\\ \vdots \\\ b_M
\end{pmatrix}
を計算したことになる。
重み行列$\hat{w}$は$M\times N$行列で$N$次元ベクトルを$M$次元に変換する。
バイアスベクトル$\boldsymbol{b}$は$M$次元である。
この$w_{ij}$や$b_i$を何とかして求めようという算段。
線形変換は何度行っても線形でしかない(定義であり最大のメリットでもあるが)ので活性化関数 で非線形性を与えてやることでいい感じになるよと言った算段である。
制度の良いモデルが作れさえすれば良いので、ここは先人のノウハウを模倣しながら作成していく。
train関数とtest関数の定義
1epoch分の学習とテストを実行する関数を定義する。
それぞれ引数としてmodel
(アーキテクチャのインスタンス→モデル)とdata_loader
(データ整形で作成したもの)
def train(model, data_loader):
# 今は学習時であることを明示するコード
model.train()
# 正しい予測数、全体のデータ数を数えるカウンターの0初期化
total_correct = 0
total_data_len = 0
loss_mean = 0
# ミニバッチごとにループさせる,train_loaderの中身を出し切ったら1エポックとなる
for j, (x, t) in enumerate(data_loader):
y = model(x) # 順伝播
model.optimizer.zero_grad() # 勾配を初期化(前回のループ時の勾配を削除)
loss = model.criterion(y, t) # 損失を計算
loss.backward() # 逆伝播で勾配を計算
model.optimizer.step() # 最適化
loss_mean += loss.item()
# ミニバッチごとの正答率と損失を求める
_, index_y = torch.max(y, axis=1) # 最も確率が高いと予測したindex
_, index_t = torch.max(t, axis=1) # 正解のindex
for i in range(len(t)): # データ一つずつループ,ミニバッチの中身出しきるまで
total_data_len += 1 # 全データ数を集計
if index_y[i] == index_t[i]:
total_correct += 1 # 正解のデータ数を集計
loss_mean = loss_mean / (j+1)
return total_correct, total_data_len, loss_mean
def test(model, data_loader):
# モデルを評価モードにする
model.eval()
# 正しい予測数、全体のデータ数を数えるカウンターの0初期化
total_data_len = 0
total_correct = 0
loss_mean = 0
for j, (x, t) in enumerate(data_loader):
y = model(x) # 順伝播(=予測)
loss = model.criterion(y, t) # 損失を計算
loss_mean += loss.item()
# ミニバッチごとの正答率と損失を求める
_, index_y = torch.max(y, axis=1) # 最も確率が高いと予測したindex
_, index_t = torch.max(t, axis=1) # 正解のindex
for i in range(len(t)): # データ一つずつループ,ミニバッチの中身出しきるまで
total_data_len += 1 # 全データ数を集計
if index_y[i] == index_t[i]:
total_correct += 1 # 正解のデータ数を集計
loss_mean = loss_mean / (j+1)
return total_correct, total_data_len, loss_mean
学習の実施
定義したアーキテクチャと関数を使って学習を実施していく。
- アーキテクチャのインスタンス
model
を作成 -
model
に対してtrain_dataset
を使ってtrain
を実施 -
model
に対してtest_dataset
を使ってtest
を実施
model
に対しtrain
関数をぶつけ続けることで精度を上げていく。
今回は501 epochs学習をしてみる。
# アーキテクチャのインスタンス作成
model = Net().to(device)
epochs = 500+1
record_train_loss = []
record_test_loss = []
for epoch in range(epochs):
train_correct_len, train_data_len, train_loss = train(model, train_loader)
test_correct_len, test_data_len, test_loss = test(model, test_loader)
train_acc = train_correct_len/train_data_len*100
test_acc = test_correct_len/test_data_len*100
record_train_loss.append(train_loss)
record_test_loss.append(test_loss)
if epoch%100 == 0:
print(f"epoch={epoch}, train:{train_correct_len}/{train_data_len}({train_acc:.2f}%), {train_loss:.5f}, test:{test_correct_len}/{test_data_len}({test_acc:.2f}%), {test_loss:.5f}")
epoch=0, train:44/121(36.36%), 1.11466, test:10/29(34.48%), 1.10308
epoch=100, train:118/121(97.52%), 0.06812, test:27/29(93.10%), 0.11353
epoch=200, train:118/121(97.52%), 0.05574, test:27/29(93.10%), 0.10665
epoch=300, train:118/121(97.52%), 0.06284, test:28/29(96.55%), 0.09738
epoch=400, train:118/121(97.52%), 0.06457, test:27/29(93.10%), 0.09130
epoch=500, train:118/121(97.52%), 0.04698, test:28/29(96.55%), 0.09885
損失の可視化
リストとしてlossのデータを持っておいたのでグラフにプロット。
plt.plot(range(len(record_train_loss)), record_train_loss, label="Train")
plt.plot(range(len(record_test_loss)), record_test_loss, label="Test")
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Error")
plt.show()
わるないやん
なめらかでないのはミニバッチを使用しているためで、データ準備の段階でバッチサイズを学習データのサイズにすればなめらかな曲線が得られる。
ただし、ミニバッチを使わないということは逆伝播の回数が減るということなので収束には更に時間がかかるようになる。
未知データに対する予測
irisデータの1つめの数字を入力してみる。答えはsetosa(確率分布でいうと$(1,0,0)$が正解)
入力ベクトルと同じ形のTensorを入力してやればよい。確率値に直すため、softmax
関数をかける。
data = torch.Tensor([5.1,3.5,1.4,0.2]).to(device)
pred = model(data)
pred = F.softmax(pred,dim=0)
for target,probability in zip(pd.unique(df["target"]),pred):
print(f"{target} : {probability.item():.5f}")
setosa : 0.9999985694885254
versicolor : 1.4037335631655878e-06
virginica : 1.6665163340147924e-18
ほとんど100%でsetosaと判断されているため、正しく予測ができていると言えるだろう。
モデルの保存と読み込み
学習済みモデルを保存できれば、「サチってないから追加学習」や「別のスクリプトから呼び出し」ができる。また「新しいモデルの一部として流用」することもできる。
モデル全体を保存する
TorchScriptという形式(?)で保存する。この方法なら学習済みモデルを別のスクリプトから呼び出して使用することもできる。
# Torch Script形式で保存
model_scripted = torch.jit.script(model)
model_scripted.save('model_scripted.pth')
# Torch Script形式で読み込み
model_from_script = torch.jit.load('model_scripted.pth', map_location="cpu")
モデルは保存されるタイミングでCPUにいたかGPUにいたかを覚えている。PyTorchは仕様上、CPU上のデータとGPU上のデータで計算をすることができないため、GPUで出力したモデルをCPUで動かそうとするとエラーになる。
map_location="cpu"
のように明示しておくと無用なエラーを避けることができる。
重みのみを保存する
この保存方法ではモデルの重みだけを保存する。モデルのクラスがすでに定義されている状態でしか使えない。
追加でepochを回すときにつかえると思う。
# 重みとバイアスのみ保存
torch.save(model.state_dict(), 'model_weight.pth')
# 重みの読み込み
model_from_weight = Net()
model_from_weight.load_state_dict(torch.load('model_weight.pth', map_location="cpu"))
上記と同様の理由でmap_location="cpu"
のように明示しておくと無用なエラーを避けることができる。
よくわからんけど公式に書いてあるやり方
これもモデルクラスがないと動かない。重みとバイアスだけの保存との違いやメリットがよくわからない。(わかる方教えてください。)
# モデル全体の保存
torch.save(model, 'model.pth')
# モデルの読み込み
model_from_all = torch.load('model.pth', map_location="cpu")