重回帰問題を解くのに、ちょいとTransformerなんか使っちゃおうかなと思ったら、簡単な情報が意外と見つからなかったので、今更な内容ですが書いてみました。
前提
- 入力はCSVファイルです
- 全部数値データです
- Pytorchを使います
コード説明
解説を入れながら、実際のコードを説明していきます。
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
使用するモジュールをインポートします。
まぁそういうものだと思ってください。
# デバイス設定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
やっぱりGPUを使いたいので、そのように設定します。
使えない環境であれば、CPUで頑張って動作します。
学習データの読み込み
ここから、学習データの読み込み関連のコードになります。
# データ読み込み
data = pd.read_csv('sample.csv') # すべて数値データに変換済み
まずCSVファイルから学習データを読み込みます。
ちなみに、値はすべて数値データになっていると仮定します。
(なっていなかったら、読み込んだ後に頑張って数値化してください)
# 特徴量とターゲットに分割
X = data.drop('target', axis=1).values
y = data['target'].values
カラム「target」を目的変数として、説明変数と分離します。
(カラム名は適当に直してください)
# データの標準化
scaler = StandardScaler()
X = scaler.fit_transform(X)
データの標準化を行います。
やらなくても動きますが、やっぱりやった方がいいです。
# トレーニングデータとテストデータに分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
テスト用のデータと分割します。
後で精度を確認するときに使用します。
ここでは8:2に分割しています。
さらに順番はランダムになってます。
y_train = y_train.astype('float32')
y_test = y_test.astype('float32')
値の型を「float32」に変換しています。
そういうものだと思ってください。
# Tensorに変換
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1).to(device)
ここから学習データをミニバッチで読み込むための準備になります。
まず、データをテンソルの形式に変換します。
(できればGPUメモリに突っ込みたいところです)
# データセットとデータローダの作成
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
batch_size = 64 # 適切なバッチサイズを設定
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
データセットとデータローダを作成して、学習時にそこから少しづつデータを取得できるようにしておきます。
いっぺんに学習データを読み込むと、メモリが足らなくなり学習できないことが良くありますので、ミニバッチにした方が良いかと思います。
モデルの作成
学習モデル(Transformerの本体)を定義します。
# Transformerベースのモデル定義
class TransformerRegression(nn.Module):
def __init__(self, input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, dropout):
super(TransformerRegression, self).__init__()
self.input_layer = nn.Linear(input_dim, d_model)
encoder_layers = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_encoder_layers)
self.output_layer = nn.Linear(d_model, 1)
def forward(self, src):
src = self.input_layer(src)
src = self.transformer_encoder(src)
output = self.output_layer(src)
return output
エンコーダにTransformerを使用し、前後を全結合(線形結合)で挟んでいます。
ここは色々と工夫のし甲斐があるところですが、今回は簡単であることを重視して、これでいきます。
ちなみに図にするとこんな感じです。
入力データ (Input Data)
↓
Linear Layer (input_dim -> d_model)
↓
+-----------------------------+
| Transformer Encoder |
| +-----------------------+ |
| | Encoder Layer 1 | |
| | Multi-Head Attention | |
| | Feedforward Network | |
| +-----------------------+ |
| +-----------------------+ |
| | Encoder Layer 2 | |
| | Multi-Head Attention | |
| | Feedforward Network | |
| +-----------------------+ |
| ... |
| +-----------------------+ |
| | Encoder Layer n | |
| | Multi-Head Attention | |
| | Feedforward Network | |
| +-----------------------+ |
+-----------------------------+
↓
Linear Layer (d_model -> 1)
↓
出力データ (Output Data)
1. 入力層 (Input Layer)
- 特徴量ベクトル X(サイズ:batch_size x input_dim)を全結合層に入力し、次元を d_model に変換します
- 出力サイズ:batch_size x d_model
2. Transformerエンコーダ層 (Transformer Encoder Layer)
- エンコーダレイヤーがスタックされた構造。各レイヤーは次の処理を行います
- マルチヘッドアテンション:入力間の関係性を計算
- フィードフォワードネットワーク:各特徴を線形変換し、非線形変換を行います
- 出力サイズ:batch_size x d_model
3. 出力層 (Output Layer)
- Transformerエンコーダ層の出力を全結合層に入力し、回帰タスク用のスカラー値に変換します
- 出力サイズ:batch_size x 1
学習
ここから学習処理に入ります。
# ハイパーパラメータ設定
input_dim = X_train.shape[1]
d_model = 64
nhead = 8
num_encoder_layers = 3
dim_feedforward = 128
dropout = 0.1
まずは学習で使用するパラメータ値を宣言しています。
ここもいろいろ調整しがいのあるところです。
# モデル、損失関数、オプティマイザの定義
model = TransformerRegression(input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, dropout).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
先ほど作成したモデルをインスタンス化し、損失関数とオプティマイザを用意します。
損失関数やオプティマイザも、いろいろ試してみると面白いかと思います。
# 学習
num_epochs = 100
for epoch in range(num_epochs):
model.train()
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
とりあえず100エポックでぶん回してみます。
検証
学習した結果の精度を確認してみます。
# 検証
model.eval()
test_loss = 0.0
with torch.no_grad():
for batch_X, batch_y in test_loader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
predictions = model(batch_X)
test_loss += criterion(predictions, batch_y).item()
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')
ロスを出力していますので、できるだけ小さくなるように頑張りたいところです。
推論
検証のコードを見ればわかりますが、推論はこうします。
# 推論
with torch.no_grad():
predictions = model(X)
print(predictions)
まとめ
とりあえず、こんな感じで重回帰をTransformerで求められるようになりました。
Transformerでやらなくちゃいけないってことはないんですけどね。
おまけ
実際には、毎回学習するわけにはいかないので、学習結果をファイルにSave/Loadして使うことになるかと思います。
保存
Saveはこんな感じです。
# モデルの保存
model_path = 'transformer_regression_model.pth'
torch.save(model.state_dict(), model_path)
print(f'Model saved to {model_path}')
読み込み
Loadはこんな感じです。
# モデルのインスタンス化と読み込み
model = TransformerRegression(input_dim, d_model, nhead, num_encoder_layers, dim_feedforward, dropout).to(device)
model.load_state_dict(torch.load('transformer_regression_model.pth'))
model.eval()
print('Model loaded and ready for inference')
おまけのおまけ
このコードは、ChatGPTの協力で作成しました。