はじめに
本記事は言語処理100本ノックの解説です。
100本のノックを全てこなした記録をQiitaに残します。
使用言語はPythonです。
今回は第9章: RNN, CNN(85~89)までの解答例をご紹介します。
9章前半(80~84)の解答例はこちらです。
85. 双方向RNN・多層化
コード
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
class RNN(nn.Module):
def __init__(self, vocab_size, dw, dh, output):
super().__init__()
self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
self.rnn = nn.RNN(dw, dh, batch_first=True, bidirectional=True)
self.fc1 = nn.Linear(dh*2, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
nn.init.xavier_normal_(self.rnn.weight_ih_l0)
nn.init.xavier_normal_(self.rnn.weight_hh_l0)
nn.init.xavier_normal_(self.rnn.weight_ih_l0_reverse)
nn.init.xavier_normal_(self.rnn.weight_hh_l0_reverse)
nn.init.xavier_normal_(self.fc1.weight)
def forward(self, x):
x = self.embed(x)
_, x = self.rnn(x)
rnn_out = torch.cat([x[-2,:,:], x[-1,:,:]], dim=1)
x = self.fc1(rnn_out)
x = self.fc2(x)
return x
def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
loss = 0.0
total = 0
correct = 0
model = model.to(device)
with torch.no_grad():
for X, Y in dataloader:
X = X.to(device)
Y = Y.to(device)
Y_pred = model(X)
if criterion != None:
loss += criterion(Y_pred, Y).item()
pred = torch.argmax(Y_pred, dim=-1)
total += len(Y)
correct += (pred == Y).sum().item()
return loss / len(dataset), correct / total
def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
dataset_train = TensorDataset(X_train, y_train)
dataset_test = TensorDataset(X_test, y_test)
model = model.to(device)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
for ep in range(num_epochs):
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
if ep%30==0:
lr = lr * 0.1
model.train()
for X, Y in dataloader_train:
X = X.to(device)
Y = Y.to(device)
optimizer.zero_grad()
Y_pred = model(X)
loss = criterion(Y_pred, Y)
loss.backward()
optimizer.step()
model.eval()
loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)
print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
TensorboardWriter(model, X_train, Y_train, ep, loss_train, "Train", device)
TensorboardWriter(model, X_test, Y_test, ep, loss_test, "Test", device)
def TensorboardWriter(model, X, Y, epoch, loss, name, device):
writer = SummaryWriter(log_dir="logs")
model = model.to(device)
X = X.to(device)
Y_pred = model(X)
result = torch.max(Y_pred.data, dim=1).indices
result = result.cpu().data.numpy()
Y_pred = np.array([np.argmax(y) for y in Y_pred.cpu().data.numpy()])
Y = np.array([np.argmax(y) for y in Y.cpu().data.numpy()])
result = torch.tensor(result)
Y = torch.tensor(Y)
Y_pred = torch.tensor(Y_pred)
accuracy = result.eq(Y).sum().numpy()/len(Y_pred)
writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
writer.close()
def CountVocab(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
max_num = []
for line in lines:
line_t = line.split("\t")[2].replace("\n", "").split(" ")
max_num.extend(map(int, line_t))
vocab_max = max(max_num)+1
return vocab_max
def GetCodeLow(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
num_list = []
code_list = []
pad_list = []
for line in lines:
line_s = line.split("\t")
code_list.append(int(line_s[0]))
num = line_s[2].replace("\n", "").split(" ")
num = list(map(int, num))
num_list.append(num)
num_tensor = torch.tensor(num)
pad_list.append(num_tensor)
max_vocab = CountVocab("train")
mlen = max([len(x) for x in num_list])
pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
pad_list = torch.tensor(pad_list)
code_list = torch.tensor(code_list)
return pad_list, code_list
X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 100
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
lr = 1e-2
device = "cuda:0"
model = RNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)
コメント
双方向RNNでやってみましたけど、精度はそれほど高くないですね。。。
86. 畳み込みニューラルネットワーク (CNN)
コード
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
class CNN(nn.Module):
def __init__(self, vocab_size, dw, dh, output):
super().__init__()
self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
self.conv1 = nn.Conv2d(1, 3, kernel_size=(3, 300))
self.tanh = nn.Tanh()
self.fc1 = nn.Linear(3, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
def forward(self, x):
x = self.embed(x)
x = x.unsqueeze(1)
x = self.conv1(x)
x = self.tanh(x)
x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
x = x.view(-1, 3)
x = self.fc1(x)
x = self.fc2(x)
return x
def CountVocab(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
max_num = []
for line in lines:
line_t = line.split("\t")[2].replace("\n", "").split(" ")
max_num.extend(map(int, line_t))
vocab_max = max(max_num)+1
return vocab_max
def GetCodeLow(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
num_list = []
code_list = []
pad_list = []
for line in lines:
line_s = line.split("\t")
code_list.append(int(line_s[0]))
num = line_s[2].replace("\n", "").split(" ")
num = list(map(int, num))
num_list.append(num)
num_tensor = torch.tensor(num)
pad_list.append(num_tensor)
max_vocab = CountVocab("train")
mlen = max([len(x) for x in num_list])
pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
pad_list = torch.tensor(pad_list)
code_list = torch.tensor(code_list)
return pad_list, code_list
X_valid, Y_valid = GetCodeLow("valid")
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
model = CNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
Y_pred = model(X_valid)
pred = torch.argmax(Y_pred, dim=-1)
print("accuracy: ", sum(1 for x,y in zip(Y_valid, pred) if x == y) / float(len(Y_valid)))
コメント
今度はCNNをやっていきます。
87. 確率的勾配降下法によるCNNの学習
コード
%load_ext tensorboard
%tensorboard --logdir logs
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
import sys
class CNN(nn.Module):
def __init__(self, vocab_size, dw, dh, output):
super().__init__()
self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size - 1)
self.conv1 = nn.Conv2d(1, 3, kernel_size=(3, 300))
self.tanh = nn.Tanh()
self.conv2 = nn.Conv2d(1, 3, kernel_size=(5, 300))
self.fc1 = nn.Linear(3, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
def forward(self, x):
x = self.embed(x)
x = x.unsqueeze(1)
x = self.conv1(x)
x = self.tanh(x)
x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
x = x.view(-1, 3)
x = self.fc1(x)
x = self.fc2(x)
return x
def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
loss = 0.0
total = 0
correct = 0
model = model.to(device)
with torch.no_grad():
for X, Y in dataloader:
X = X.to(device)
Y = Y.to(device)
Y_pred = model(X)
if criterion != None:
loss += criterion(Y_pred, Y).item()
pred = torch.argmax(Y_pred, dim=-1)
total += len(Y)
correct += (pred == Y).sum().item()
return loss / len(dataset), correct / total
def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
dataset_train = TensorDataset(X_train, y_train)
dataset_test = TensorDataset(X_test, y_test)
model = model.to(device)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
for ep in range(num_epochs):
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
model.train()
for X, Y in dataloader_train:
X = X.to(device)
Y = Y.to(device)
optimizer.zero_grad()
Y_pred = model(X)
loss = criterion(Y_pred, Y)
loss.backward()
optimizer.step()
model.eval()
loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)
print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
TensorboardWriter(model, X_train, Y_train, ep, loss_train, "Train", device)
TensorboardWriter(model, X_test, Y_test, ep, loss_test, "Test", device)
def TensorboardWriter(model, X, Y, epoch, loss, name, device):
writer = SummaryWriter(log_dir="logs")
model = model.to(device)
X = X.to(device)
Y_pred = model(X)
result = torch.max(Y_pred.data, dim=1).indices
result = result.cpu().data.numpy()
Y_pred = np.array([np.argmax(y) for y in Y_pred.cpu().data.numpy()])
Y = np.array([np.argmax(y) for y in Y.cpu().data.numpy()])
result = torch.tensor(result)
Y = torch.tensor(Y)
Y_pred = torch.tensor(Y_pred)
accuracy = result.eq(Y).sum().numpy()/len(Y_pred)
writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
writer.close()
def CountVocab(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
max_num = []
for line in lines:
line_t = line.split("\t")[2].replace("\n", "").split(" ")
max_num.extend(map(int, line_t))
vocab_max = max(max_num)+1
return vocab_max
def GetCodeLow(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
num_list = []
code_list = []
pad_list = []
for line in lines:
line_s = line.split("\t")
code_list.append(int(line_s[0]))
num = line_s[2].replace("\n", "").split(" ")
num = list(map(int, num))
num_list.append(num)
num_tensor = torch.tensor(num)
pad_list.append(num_tensor)
max_vocab = CountVocab("train")
mlen = max([len(x) for x in num_list])
pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
pad_list = torch.tensor(pad_list)
code_list = torch.tensor(code_list)
return pad_list, code_list
X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 100
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
lr = 1e-3
device = "cuda:0"
model = CNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)
コメント
CNNでも精度はそれほど高くなかったですね。。。学習データの作り方がいけなかったのか。。
88. パラメータチューニング
準備
パラメータチューニングのためoptunaを入れます。
コマンド
$ pip install optuna
コード
コード
import optuna
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
class RNN(nn.Module):
def __init__(self, vocab_size, dw, dh, output):
super().__init__()
self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
self.rnn = nn.RNN(dw, dh, batch_first=True, bidirectional=True)
self.fc1 = nn.Linear(dh*2, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
def forward(self, x):
x = self.embed(x)
_, x = self.rnn(x)
rnn_out = torch.cat([x[-2,:,:], x[-1,:,:]], dim=1)
x = self.fc1(rnn_out)
x = self.fc2(x)
return x
class LSTM(nn.Module):
def __init__(self, vocab_size, dw, dh, output):
super().__init__()
self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
self.lstm = nn.LSTM(dw, dh, batch_first=True,bidirectional=True)
self.fc1 = nn.Linear(dh*2, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
def forward(self, x):
x = self.embed(x)
_, x = self.lstm(x)
x = torch.cat([x[0][0],x[0][1]], dim=1)
x = self.fc1(x)
x = self.fc2(x)
return x
class CNN(nn.Module):
def __init__(self, vocab_size, dw, output, layer, unit, activation):
super().__init__()
self.layer = layer
self.embed = nn.Embedding(vocab_size, dw, padding_idx = vocab_size-1)
if unit == 6:
units = [6, 4, 2]
elif unit == 4:
units = [4, 3, 2]
elif unit == 2:
units = [2, 2, 2]
self.conv1 = nn.Conv2d(1, units[0], kernel_size=(units[0], 300))
linearoutput = units[0]
if layer > 1:
self.conv2 = nn.Conv2d(units[0], units[1], kernel_size=(units[1],1))
linearoutput = units[1]
if layer > 2:
self.conv3 = nn.Conv2d(units[1], units[2], kernel_size=(units[2],1))
linearoutput = units[2]
self.fc1 = nn.Linear(linearoutput, output, bias=True)
self.fc2 = nn.Softmax(dim=1)
if activation == "Tanh":
self.active = nn.Tanh()
elif activation == "ReLU":
self.active = nn.ReLU()
elif activation == "Sigmoid":
self.active = nn.Sigmoid()
def forward(self, x):
x = self.embed(x)
x = x.unsqueeze(1)
x = self.conv1(x)
x = self.active(x)
if self.layer > 1:
x = self.conv2(x)
x = self.active(x)
if self.layer > 2:
x = self.conv3(x)
x = self.active(x)
x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
x = x.view(x.size()[0], -1)
x = self.fc1(x)
x = self.fc2(x)
return x
def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
loss = 0.0
total = 0
correct = 0
model = model.to(device)
with torch.no_grad():
for X, Y in dataloader:
X = X.to(device)
Y = Y.to(device)
Y_pred = model(X)
if criterion != None:
loss += criterion(Y_pred, Y).item()
pred = torch.argmax(Y_pred, dim=-1)
total += len(Y)
correct += (pred == Y).sum().item()
return loss / len(dataset), correct / total
def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None, optimizer_select="SGD"):
dataset_train = TensorDataset(X_train, y_train)
dataset_test = TensorDataset(X_test, y_test)
model = model.to(device)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
for ep in range(num_epochs):
if ep%20==0:
lr = lr * 0.1
if optimizer_select == "SGD":
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
elif optimizer_select == "Adam":
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
elif optimizer_select == "RMSprop":
optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
model.train()
for X, Y in dataloader_train:
X = X.to(device)
Y = Y.to(device)
optimizer.zero_grad()
Y_pred = model(X)
loss = criterion(Y_pred, Y)
loss.backward()
optimizer.step()
model.eval()
_, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)
return acc_test
def CountVocab(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
max_num = []
for line in lines:
line_t = line.split("\t")[2].replace("\n", "").split(" ")
max_num.extend(map(int, line_t))
vocab_max = max(max_num)+1
return vocab_max
def GetCodeLow(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
num_list = []
code_list = []
pad_list = []
for line in lines:
line_s = line.split("\t")
code_list.append(int(line_s[0]))
num = line_s[2].replace("\n", "").split(" ")
num = list(map(int, num))
num_list.append(num)
num_tensor = torch.tensor(num)
pad_list.append(num_tensor)
max_vocab = CountVocab("train")
mlen = max([len(x) for x in num_list])
pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
pad_list = torch.tensor(pad_list)
code_list = torch.tensor(code_list)
return pad_list, code_list
def objective_RNN(trial):
X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 10
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
lr = 1e-2
device = "cuda:0"
model_name_display_only = trial.suggest_categorical("model_name_RNN", ["RNN"])
HIDDEN_SIZE = trial.suggest_categorical("HIDDEN_SIZE", [10, 50, 100, 500, 1000])
optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
model = RNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
return score
def objective_LSTM(trial):
X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 10
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
lr = 1e-2
device = "cuda:0"
model_name_display_only = trial.suggest_categorical("model_name_LSTM", ["LSTM"])
HIDDEN_SIZE = trial.suggest_categorical("HIDDEN_SIZE", [10, 50, 100, 500, 1000])
optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
model = LSTM(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
return score
def objective_CNN(trial):
X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 10
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
lr = 1e-2
device = "cuda:0"
model_name_display_only = trial.suggest_categorical("model_name_CNN", ["CNN"])
layer = trial.suggest_categorical("layer", [1,2,3])
unit = trial.suggest_categorical("unit", [2,4,6])
activation = trial.suggest_categorical("activation", ["Tanh", "Sigmoid", "ReLU"])
optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
model = CNN(VOCAB_SIZE, EMB_SIZE, OUTPUT_SIZE, layer, unit, activation)
score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
return score
study = optuna.create_study(direction='maximize')
study.optimize(objective_CNN, n_trials=81)
study.optimize(objective_RNN, n_trials=15)
study.optimize(objective_LSTM, n_trials=15)
print(study.best_params)
print(study.best_value)
出力結果
{'model_name_CNN': 'CNN', 'layer': 1, 'unit': 6, 'activation': 'Tanh', 'optimizer_select': 'RMSprop'}
0.7241379310344828
コメント
各モデル10epochでの評価値で比較しました。他のモデルとはどんぐりの背比べでしたので、試行回数によっては違う結果になりますね。
89. 事前学習済み言語モデルからの転移学習
準備
Bertを使えるようにします。
コマンド
$ pip install transformers -q
コード
コード
%load_ext tensorboard
%tensorboard --logdir logs
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import TensorDataset, DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
class Bert(nn.Module):
def __init__(self):
super().__init__()
self.bert = AutoModel.from_pretrained("bert-base-uncased")
self.classifier = nn.Linear(in_features = 768, out_features = 4)
def forward(self, input_ids, attention_mask, token_type_ids):
outputs = self.bert(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
pooler_output = outputs.pooler_output
logits = self.classifier(pooler_output).squeeze(-1)
return logits
class BertDataset(Dataset):
def __init__(self, data, label):
super().__init__()
self.data_length = len(data["input_ids"])
self.x_input_ids = data["input_ids"]
self.x_token_type_ids = data["token_type_ids"]
self.x_attention_mask = data["attention_mask"]
self.y = label
def __len__(self):
return self.data_length
def __getitem__(self, idx):
x_input_ids = torch.tensor(self.x_input_ids[idx])
x_token_type_ids = torch.tensor(self.x_token_type_ids[idx])
x_attention_mask = torch.tensor(self.x_attention_mask[idx])
return {"input_ids":x_input_ids, "token_type_ids":x_token_type_ids, "x_attention_mask":x_attention_mask}, torch.tensor(self.y[idx])
def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
dataloader = DataLoader(dataset, batch_size=256, shuffle=False)
loss = 0.0
total = 0
correct = 0
model = model.to(device)
with torch.no_grad():
for X, Y in dataloader:
input_ids = X["input_ids"].to(device)
attention_mask = X["x_attention_mask"].to(device)
token_type_ids = X["token_type_ids"].to(device)
Y = Y.to(device)
Y_pred = model(input_ids, attention_mask, token_type_ids)
if criterion != None:
loss += criterion(Y_pred, Y).item()
pred = torch.argmax(Y_pred, dim=-1)
total += len(Y)
correct += (pred == Y).sum().item()
return loss / len(dataset), correct / total
def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
dataset_train = BertDataset(X_train, y_train)
dataset_test = BertDataset(X_test, y_test)
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
criterion = criterion.to(device)
for ep in range(num_epochs):
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
model.train()
if ep%30==0:
lr = lr * 0.1
for X, Y in dataloader_train:
input_ids = X["input_ids"].to(device)
attention_mask = X["x_attention_mask"].to(device)
token_type_ids = X["token_type_ids"].to(device)
Y = Y.to(device)
optimizer.zero_grad()
Y_pred = model(input_ids, attention_mask, token_type_ids)
loss = criterion(Y_pred, Y)
loss.backward()
optimizer.step()
model.eval()
loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)
print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
TensorboardWriter(ep, loss_train, acc_train, "Train")
TensorboardWriter(ep, loss_test, acc_test, "Test")
def TensorboardWriter(epoch, loss, accuracy, name):
writer = SummaryWriter(log_dir="logs")
writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
writer.close()
def CountVocab(name):
f = open("[PATH]/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
max_num = []
for line in lines:
line_t = line.split("\t")[2].replace("\n", "").split(" ")
max_num.extend(map(int, line_t))
vocab_max = max(max_num)+1
return vocab_max
def GetStrLow(name):
f = open("[PATH]/RNN_CNN/{}_code.txt".format(name), "r")
lines = f.readlines()
f.close()
sent_list = []
code_list = []
for line in lines:
line_s = line.split("\t")
code_list.append(int(line_s[0]))
sent = line_s[1].replace("\n", "")
sent_list.append(sent)
code_list = torch.tensor(code_list)
return sent_list, code_list
X_train, Y_train = GetStrLow("train")
X_test, Y_test = GetStrLow("test")
MAX_LENGTH = 32
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
X_train_tokenizer = tokenizer.batch_encode_plus(X_train, padding = "max_length", max_length = MAX_LENGTH, truncation=True)
X_test_tokenizer = tokenizer.batch_encode_plus(X_test, padding = "max_length", max_length = MAX_LENGTH, truncation=True)
BATCH_SIZE = 8
NUM_EPOCHS = 100
lr = 1e-3
device = "cuda:0"
model = Bert()
train_model(X_train_tokenizer, Y_train, X_test_tokenizer, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)
コメント
分類精度93.7%を達成しました。やはり大規模言語モデルはすごいですね。
他章の解答例