1
3

言語処理100本ノック2020 (85~90)

Last updated at Posted at 2023-11-07

はじめに

本記事は言語処理100本ノックの解説です。
100本のノックを全てこなした記録をQiitaに残します。
使用言語はPythonです。

今回は第9章: RNN, CNN(85~89)までの解答例をご紹介します。
9章前半(80~84)の解答例はこちらです。

85. 双方向RNN・多層化

コード
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch

class RNN(nn.Module):
    def __init__(self, vocab_size, dw, dh, output):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
        self.rnn = nn.RNN(dw, dh, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(dh*2, output, bias=True)
        self.fc2 = nn.Softmax(dim=1)
        nn.init.xavier_normal_(self.rnn.weight_ih_l0)
        nn.init.xavier_normal_(self.rnn.weight_hh_l0)
        nn.init.xavier_normal_(self.rnn.weight_ih_l0_reverse)
        nn.init.xavier_normal_(self.rnn.weight_hh_l0_reverse)
        nn.init.xavier_normal_(self.fc1.weight)
    def forward(self, x):
        x = self.embed(x)
        _, x = self.rnn(x)
        rnn_out = torch.cat([x[-2,:,:], x[-1,:,:]], dim=1)
        x = self.fc1(rnn_out)
        x = self.fc2(x)
        return x

def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    loss = 0.0
    total = 0
    correct = 0
    model = model.to(device)
    with torch.no_grad():
        for X, Y in dataloader:
            X = X.to(device)
            Y = Y.to(device)
            Y_pred = model(X)
            if criterion != None:
                loss += criterion(Y_pred, Y).item()
            pred = torch.argmax(Y_pred, dim=-1)
            total += len(Y)
            correct += (pred == Y).sum().item()

    return loss / len(dataset), correct / total

def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
    dataset_train = TensorDataset(X_train, y_train)
    dataset_test = TensorDataset(X_test, y_test)
    model = model.to(device)
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)
    for ep in range(num_epochs):
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)
        if ep%30==0:
            lr = lr * 0.1
        model.train()
        for X, Y in dataloader_train:
            X = X.to(device)
            Y = Y.to(device)
            optimizer.zero_grad()
            Y_pred = model(X)
            loss = criterion(Y_pred, Y)
            loss.backward()
            optimizer.step()
        model.eval()

        loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
        loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)

        print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
        TensorboardWriter(model, X_train, Y_train, ep, loss_train, "Train", device)
        TensorboardWriter(model, X_test, Y_test, ep, loss_test, "Test", device)

def TensorboardWriter(model, X, Y, epoch, loss, name, device):
    writer = SummaryWriter(log_dir="logs")
    model = model.to(device)
    X = X.to(device)
    Y_pred = model(X)
    result = torch.max(Y_pred.data, dim=1).indices
    result = result.cpu().data.numpy()
    Y_pred = np.array([np.argmax(y) for y in Y_pred.cpu().data.numpy()])
    Y = np.array([np.argmax(y) for y in Y.cpu().data.numpy()])
    result = torch.tensor(result)
    Y = torch.tensor(Y)
    Y_pred = torch.tensor(Y_pred)
    accuracy = result.eq(Y).sum().numpy()/len(Y_pred)
    writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
    writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
    writer.close()

def CountVocab(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetCodeLow(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    num_list = []
    code_list = []
    pad_list = []
    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        num = line_s[2].replace("\n", "").split(" ")
        num = list(map(int, num))
        num_list.append(num)
        num_tensor = torch.tensor(num)
        pad_list.append(num_tensor)

    max_vocab = CountVocab("train")
    mlen = max([len(x) for x in num_list])
    pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
    pad_list = torch.tensor(pad_list)
    code_list = torch.tensor(code_list)
    return pad_list, code_list

X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 100
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
lr = 1e-2
device = "cuda:0"
model = RNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)

出力結果
image.png

コメント
双方向RNNでやってみましたけど、精度はそれほど高くないですね。。。

86. 畳み込みニューラルネットワーク (CNN)

コード
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch

class CNN(nn.Module):
  def __init__(self, vocab_size, dw, dh, output):
      super().__init__()
      self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
      self.conv1 = nn.Conv2d(1, 3, kernel_size=(3, 300))
      self.tanh = nn.Tanh()
      self.fc1 = nn.Linear(3, output, bias=True)
      self.fc2 = nn.Softmax(dim=1)
  def forward(self, x):
      x = self.embed(x)
      x = x.unsqueeze(1)
      x = self.conv1(x)
      x = self.tanh(x)
      x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
      x = x.view(-1, 3)
      x = self.fc1(x)
      x = self.fc2(x)
      return x

def CountVocab(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetCodeLow(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    num_list = []
    code_list = []
    pad_list = []
    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        num = line_s[2].replace("\n", "").split(" ")
        num = list(map(int, num))
        num_list.append(num)
        num_tensor = torch.tensor(num)
        pad_list.append(num_tensor)
    max_vocab = CountVocab("train")
    mlen = max([len(x) for x in num_list])
    pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
    pad_list = torch.tensor(pad_list)
    code_list = torch.tensor(code_list)
    return pad_list, code_list

X_valid, Y_valid = GetCodeLow("valid")

VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50

model = CNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
Y_pred = model(X_valid)
pred = torch.argmax(Y_pred, dim=-1)
print("accuracy: ", sum(1 for x,y in zip(Y_valid, pred) if x == y) / float(len(Y_valid)))

出力結果
image.png

コメント
今度はCNNをやっていきます。

87. 確率的勾配降下法によるCNNの学習

コード
%load_ext tensorboard
%tensorboard --logdir logs

import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch
import sys

class CNN(nn.Module):
  def __init__(self, vocab_size, dw, dh, output):
      super().__init__()
      self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size - 1)
      self.conv1 = nn.Conv2d(1, 3, kernel_size=(3, 300))
      self.tanh = nn.Tanh()
      self.conv2 = nn.Conv2d(1, 3, kernel_size=(5, 300))
      self.fc1 = nn.Linear(3, output, bias=True)
      self.fc2 = nn.Softmax(dim=1)
  def forward(self, x):
      x = self.embed(x)
      x = x.unsqueeze(1)
      x = self.conv1(x)
      x = self.tanh(x)
      x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
      x = x.view(-1, 3)
      x = self.fc1(x)
      x = self.fc2(x)
      return x

def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    loss = 0.0
    total = 0
    correct = 0
    model = model.to(device)
    with torch.no_grad():
        for X, Y in dataloader:
            X = X.to(device)
            Y = Y.to(device)
            Y_pred = model(X)
            if criterion != None:
                loss += criterion(Y_pred, Y).item()
            pred = torch.argmax(Y_pred, dim=-1)
            total += len(Y)
            correct += (pred == Y).sum().item()

    return loss / len(dataset), correct / total


def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
    dataset_train = TensorDataset(X_train, y_train)
    dataset_test = TensorDataset(X_test, y_test)
    model = model.to(device)
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)
    for ep in range(num_epochs):
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)
        model.train()
        for X, Y in dataloader_train:
            X = X.to(device)
            Y = Y.to(device)
            optimizer.zero_grad()
            Y_pred = model(X)
            loss = criterion(Y_pred, Y)
            loss.backward()
            optimizer.step()
        model.eval()

        loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
        loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)

        print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
        TensorboardWriter(model, X_train, Y_train, ep, loss_train, "Train", device)
        TensorboardWriter(model, X_test, Y_test, ep, loss_test, "Test", device)

def TensorboardWriter(model, X, Y, epoch, loss, name, device):
    writer = SummaryWriter(log_dir="logs")
    model = model.to(device)
    X = X.to(device)
    Y_pred = model(X)
    result = torch.max(Y_pred.data, dim=1).indices
    result = result.cpu().data.numpy()
    Y_pred = np.array([np.argmax(y) for y in Y_pred.cpu().data.numpy()])
    Y = np.array([np.argmax(y) for y in Y.cpu().data.numpy()])
    result = torch.tensor(result)
    Y = torch.tensor(Y)
    Y_pred = torch.tensor(Y_pred)
    accuracy = result.eq(Y).sum().numpy()/len(Y_pred)
    writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
    writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
    writer.close()

def CountVocab(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetCodeLow(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    num_list = []
    code_list = []
    pad_list = []
    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        num = line_s[2].replace("\n", "").split(" ")
        num = list(map(int, num))
        num_list.append(num)
        num_tensor = torch.tensor(num)
        pad_list.append(num_tensor)

    max_vocab = CountVocab("train")
    mlen = max([len(x) for x in num_list])
    pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
    pad_list = torch.tensor(pad_list)
    code_list = torch.tensor(code_list)
    return pad_list, code_list

X_train, Y_train = GetCodeLow("train")
X_test, Y_test = GetCodeLow("test")
BATCH_SIZE = 2
NUM_EPOCHS = 100
VOCAB_SIZE = CountVocab("train")+1
EMB_SIZE = 300
OUTPUT_SIZE = 4
HIDDEN_SIZE = 50
lr = 1e-3
device = "cuda:0"
model = CNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)

出力結果
image.png

コメント
CNNでも精度はそれほど高くなかったですね。。。学習データの作り方がいけなかったのか。。

88. パラメータチューニング

準備
パラメータチューニングのためoptunaを入れます。

コマンド
$ pip install optuna

コード

コード
import optuna
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch

class RNN(nn.Module):
    def __init__(self, vocab_size, dw, dh, output):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
        self.rnn = nn.RNN(dw, dh, batch_first=True, bidirectional=True)
        self.fc1 = nn.Linear(dh*2, output, bias=True)
        self.fc2 = nn.Softmax(dim=1)
    def forward(self, x):
        x = self.embed(x)
        _, x = self.rnn(x)
        rnn_out = torch.cat([x[-2,:,:], x[-1,:,:]], dim=1)
        x = self.fc1(rnn_out)
        x = self.fc2(x)
        return x

class LSTM(nn.Module):
    def __init__(self, vocab_size, dw, dh, output):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, dw, padding_idx=vocab_size-1)
        self.lstm = nn.LSTM(dw, dh, batch_first=True,bidirectional=True)
        self.fc1 = nn.Linear(dh*2, output, bias=True)
        self.fc2 = nn.Softmax(dim=1)
    def forward(self, x):
        x = self.embed(x)
        _, x = self.lstm(x)
        x = torch.cat([x[0][0],x[0][1]], dim=1)
        x = self.fc1(x)
        x = self.fc2(x)
        return x

class CNN(nn.Module):
  def __init__(self, vocab_size, dw, output, layer, unit, activation):
      super().__init__()
      self.layer = layer
      self.embed = nn.Embedding(vocab_size, dw, padding_idx = vocab_size-1)
      if unit == 6:
          units = [6, 4, 2]
      elif unit == 4:
          units = [4, 3, 2]
      elif unit == 2:
          units = [2, 2, 2]
      self.conv1 = nn.Conv2d(1, units[0], kernel_size=(units[0], 300))
      linearoutput = units[0]
      if layer > 1:
          self.conv2 = nn.Conv2d(units[0], units[1], kernel_size=(units[1],1))
          linearoutput = units[1]
      if layer > 2:
          self.conv3 = nn.Conv2d(units[1], units[2], kernel_size=(units[2],1))
          linearoutput = units[2]
      self.fc1 = nn.Linear(linearoutput, output, bias=True)
      self.fc2 = nn.Softmax(dim=1)

      if activation == "Tanh":
          self.active = nn.Tanh()
      elif activation == "ReLU":
          self.active = nn.ReLU()
      elif activation == "Sigmoid":
          self.active = nn.Sigmoid()
  def forward(self, x):
      x = self.embed(x)
      x = x.unsqueeze(1)
      x = self.conv1(x)
      x = self.active(x)
      if self.layer > 1:
          x = self.conv2(x)
          x = self.active(x)
      if self.layer > 2:
          x = self.conv3(x)
          x = self.active(x)
      x = F.max_pool2d(x, kernel_size=(x.size()[2], 1))
      x = x.view(x.size()[0], -1)
      x = self.fc1(x)
      x = self.fc2(x)
      return x

def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    loss = 0.0
    total = 0
    correct = 0
    model = model.to(device)
    with torch.no_grad():
        for X, Y in dataloader:
            X = X.to(device)
            Y = Y.to(device)
            Y_pred = model(X)
            if criterion != None:
                loss += criterion(Y_pred, Y).item()
            pred = torch.argmax(Y_pred, dim=-1)
            total += len(Y)
            correct += (pred == Y).sum().item()
    return loss / len(dataset), correct / total


def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None, optimizer_select="SGD"):
    dataset_train = TensorDataset(X_train, y_train)
    dataset_test = TensorDataset(X_test, y_test)
    model = model.to(device)
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)
    for ep in range(num_epochs):
        if ep%20==0:
            lr = lr * 0.1
        if optimizer_select == "SGD":
            optimizer = torch.optim.SGD(model.parameters(), lr=lr)
        elif optimizer_select == "Adam":
            optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        elif optimizer_select == "RMSprop":
          optimizer = torch.optim.RMSprop(model.parameters(), lr=lr)
        model.train()
        for X, Y in dataloader_train:
            X = X.to(device)
            Y = Y.to(device)
            optimizer.zero_grad()
            Y_pred = model(X)
            loss = criterion(Y_pred, Y)
            loss.backward()
            optimizer.step()
    model.eval()
    _, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)

    return acc_test


def CountVocab(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetCodeLow(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    num_list = []
    code_list = []
    pad_list = []
    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        num = line_s[2].replace("\n", "").split(" ")
        num = list(map(int, num))
        num_list.append(num)
        num_tensor = torch.tensor(num)
        pad_list.append(num_tensor)
    max_vocab = CountVocab("train")
    mlen = max([len(x) for x in num_list])
    pad_list = list(map(lambda x:x + [max_vocab]*(mlen-len(x)), num_list))
    pad_list = torch.tensor(pad_list)
    code_list = torch.tensor(code_list)
    return pad_list, code_list

def objective_RNN(trial):
    X_train, Y_train = GetCodeLow("train")
    X_test, Y_test = GetCodeLow("test")
    BATCH_SIZE = 2
    NUM_EPOCHS = 10
    VOCAB_SIZE = CountVocab("train")+1
    EMB_SIZE = 300
    OUTPUT_SIZE = 4
    lr = 1e-2
    device = "cuda:0"
    model_name_display_only = trial.suggest_categorical("model_name_RNN", ["RNN"])
    HIDDEN_SIZE = trial.suggest_categorical("HIDDEN_SIZE", [10, 50, 100, 500, 1000])
    optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
    model = RNN(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
    score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
    return score

def objective_LSTM(trial):
    X_train, Y_train = GetCodeLow("train")
    X_test, Y_test = GetCodeLow("test")
    BATCH_SIZE = 2
    NUM_EPOCHS = 10
    VOCAB_SIZE = CountVocab("train")+1
    EMB_SIZE = 300
    OUTPUT_SIZE = 4
    lr = 1e-2
    device = "cuda:0"
    model_name_display_only = trial.suggest_categorical("model_name_LSTM", ["LSTM"])
    HIDDEN_SIZE = trial.suggest_categorical("HIDDEN_SIZE", [10, 50, 100, 500, 1000])
    optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
    model = LSTM(VOCAB_SIZE, EMB_SIZE, HIDDEN_SIZE, OUTPUT_SIZE)
    score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
    return score

def objective_CNN(trial):
    X_train, Y_train = GetCodeLow("train")
    X_test, Y_test = GetCodeLow("test")
    BATCH_SIZE = 2
    NUM_EPOCHS = 10
    VOCAB_SIZE = CountVocab("train")+1
    EMB_SIZE = 300
    OUTPUT_SIZE = 4
    lr = 1e-2
    device = "cuda:0"
    model_name_display_only = trial.suggest_categorical("model_name_CNN", ["CNN"])
    layer = trial.suggest_categorical("layer", [1,2,3])
    unit = trial.suggest_categorical("unit", [2,4,6])
    activation = trial.suggest_categorical("activation", ["Tanh", "Sigmoid", "ReLU"])
    optimizer_select = trial.suggest_categorical("optimizer_select", ["SGD", "Adam", "RMSprop"])
    model = CNN(VOCAB_SIZE, EMB_SIZE, OUTPUT_SIZE, layer, unit, activation)
    score = train_model(X_train, Y_train, X_test, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device, optimizer_select)
    return score

study = optuna.create_study(direction='maximize')
study.optimize(objective_CNN, n_trials=81)
study.optimize(objective_RNN, n_trials=15)
study.optimize(objective_LSTM, n_trials=15)
print(study.best_params)
print(study.best_value)
出力結果
{'model_name_CNN': 'CNN', 'layer': 1, 'unit': 6, 'activation': 'Tanh', 'optimizer_select': 'RMSprop'}
0.7241379310344828

コメント
各モデル10epochでの評価値で比較しました。他のモデルとはどんぐりの背比べでしたので、試行回数によっては違う結果になりますね。

89. 事前学習済み言語モデルからの転移学習

準備
Bertを使えるようにします。

コマンド
$ pip install transformers -q

コード

コード
%load_ext tensorboard
%tensorboard --logdir logs
from transformers import AutoTokenizer, AutoModel
from torch.utils.data import TensorDataset, DataLoader, Dataset
from torch.utils.tensorboard import SummaryWriter
import torch.nn as nn
import numpy as np
import torch

class Bert(nn.Module):
    def __init__(self):
        super().__init__()
        self.bert = AutoModel.from_pretrained("bert-base-uncased")
        self.classifier = nn.Linear(in_features = 768, out_features = 4)
    def forward(self, input_ids, attention_mask, token_type_ids):
        outputs = self.bert(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
        pooler_output = outputs.pooler_output
        logits = self.classifier(pooler_output).squeeze(-1)
        return logits

class BertDataset(Dataset):
    def __init__(self, data, label):
        super().__init__()
        self.data_length = len(data["input_ids"])
        self.x_input_ids = data["input_ids"]
        self.x_token_type_ids = data["token_type_ids"]
        self.x_attention_mask = data["attention_mask"]
        self.y = label
    def __len__(self):
        return self.data_length
    def __getitem__(self, idx):
        x_input_ids = torch.tensor(self.x_input_ids[idx])
        x_token_type_ids = torch.tensor(self.x_token_type_ids[idx])
        x_attention_mask = torch.tensor(self.x_attention_mask[idx])
        return {"input_ids":x_input_ids, "token_type_ids":x_token_type_ids, "x_attention_mask":x_attention_mask}, torch.tensor(self.y[idx])

def calculate_loss_and_accuracy(model, dataset, device, criterion=None):
    dataloader = DataLoader(dataset, batch_size=256, shuffle=False)
    loss = 0.0
    total = 0
    correct = 0
    model = model.to(device)
    with torch.no_grad():
        for X, Y in dataloader:
            input_ids = X["input_ids"].to(device)
            attention_mask = X["x_attention_mask"].to(device)
            token_type_ids = X["token_type_ids"].to(device)
            Y = Y.to(device)
            Y_pred =  model(input_ids, attention_mask, token_type_ids)
            if criterion != None:
                loss += criterion(Y_pred, Y).item()
            pred = torch.argmax(Y_pred, dim=-1)
            total += len(Y)
            correct += (pred == Y).sum().item()
    return loss / len(dataset), correct / total


def train_model(X_train, y_train, X_test, y_test, batch_size, model, lr, num_epochs, device, collate_fn=None):
    dataset_train = BertDataset(X_train, y_train)
    dataset_test = BertDataset(X_test, y_test)
    dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
    dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=True)
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    criterion = criterion.to(device)
    for ep in range(num_epochs):
        optimizer = torch.optim.SGD(model.parameters(), lr=lr)
        model.train()
        if ep%30==0:
            lr = lr * 0.1
        for X, Y in dataloader_train:
            input_ids = X["input_ids"].to(device)
            attention_mask = X["x_attention_mask"].to(device)
            token_type_ids = X["token_type_ids"].to(device)

            Y = Y.to(device)
            optimizer.zero_grad()
            Y_pred = model(input_ids, attention_mask, token_type_ids)
            loss = criterion(Y_pred, Y)
            loss.backward()
            optimizer.step()
        model.eval()

        loss_train, acc_train = calculate_loss_and_accuracy(model, dataset_train, device, criterion=criterion)
        loss_test, acc_test = calculate_loss_and_accuracy(model, dataset_test, device, criterion=criterion)

        print(f'epoch: {ep + 1}, loss_train: {loss_train:.4f}, accuracy_train: {acc_train:.4f}, loss_Test: {loss_test:.4f}, accuracy_Test: {acc_test:.4f}')
        TensorboardWriter(ep, loss_train, acc_train, "Train")
        TensorboardWriter(ep, loss_test, acc_test, "Test")

def TensorboardWriter(epoch, loss, accuracy, name):
    writer = SummaryWriter(log_dir="logs")
    writer.add_scalar("Loss/{}_Loss".format(name), loss, epoch)
    writer.add_scalar("Accuracy/{}_Accuracy".format(name), accuracy, epoch)
    writer.close()

def CountVocab(name):
    f = open("[PATH]/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    max_num = []
    for line in lines:
        line_t = line.split("\t")[2].replace("\n", "").split(" ")
        max_num.extend(map(int, line_t))
    vocab_max = max(max_num)+1
    return vocab_max

def GetStrLow(name):
    f = open("[PATH]/RNN_CNN/{}_code.txt".format(name), "r")
    lines = f.readlines()
    f.close()
    sent_list = []
    code_list = []

    for line in lines:
        line_s = line.split("\t")
        code_list.append(int(line_s[0]))
        sent = line_s[1].replace("\n", "")
        sent_list.append(sent)
    code_list = torch.tensor(code_list)
    return sent_list, code_list

X_train, Y_train = GetStrLow("train")
X_test, Y_test = GetStrLow("test")

MAX_LENGTH = 32
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
X_train_tokenizer = tokenizer.batch_encode_plus(X_train, padding = "max_length", max_length = MAX_LENGTH, truncation=True)
X_test_tokenizer = tokenizer.batch_encode_plus(X_test, padding = "max_length", max_length = MAX_LENGTH, truncation=True)

BATCH_SIZE = 8
NUM_EPOCHS = 100
lr = 1e-3
device = "cuda:0"
model = Bert()
train_model(X_train_tokenizer, Y_train, X_test_tokenizer, Y_test, BATCH_SIZE, model, lr, NUM_EPOCHS, device)

出力結果
image.png

コメント
分類精度93.7%を達成しました。やはり大規模言語モデルはすごいですね。

他章の解答例

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3