概要
こちらに掲載しているライブドアニュースコーパスを利用した多値分類モデルのコードの解説になります。学習部分までを解説しており、予測部分は省略しています。
使用する環境・データ
GoogleColabを使用します。
データはLivedoorニュースコーパスを使用します。
事前準備
自身のGoogleドライブにライブドアニュースコーパスを配置してください。
必要に応じてデータ数を削減してください。
コード解説
コード内に#—1
のような形でコメントを入れ、コメントに対応したインデックス番号で該当のコードについて説明しています。
Googleドライブをマウント
from google.colab import drive
drive.mount("/content/drive/")
まずGoogleColabに自分のGoogleドライブをマウントします。
cudaのバージョン確認
!nvcc --version
cudaのバージョンを確認しておきます。cudaのバージョンによってインストールが必要なPytorchのバージョンが異なってきます。
各種ライブラリインストール
!pip install protobuf==3.9.2
!pip install transformers==4.20.1 fugashi==1.1.2 ipadic==1.0.0 torchtext==0.11.0 pytorch-lightning==1.6.4 numpy==1.21.6 openpyxl
!pip install torch==1.10.0+cu111 torchvision==0.11.0+cu111 torchaudio==0.10.0 -f https://download.pytorch.org/whl/torch_stable.html
各種ライブラリインポート
import glob
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from transformers import BertJapaneseTokenizer, BertForSequenceClassification
import pytorch_lightning as pl
import random
import numpy as np
from tqdm import tqdm
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, StochasticWeightAveraging
# 日本語の事前学習モデル
MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'
東北大学 乾・鈴木研究室により公開されている事前学習モデルを利用させてもらいます。
データの前処理
path = "/content/drive/My Drive/機械学習/text/" #--1
#--2
dir_files = os.listdir(path=path)
dir_names = [f for f in dir_files if os.path.isdir(os.path.join(path, f))]
#--3
label_n = len(dir_names)
print(f'=={label_n}==')
#--4
text_list = []
label_list = []
label_dic = {}
#--5
for i, dir_name in enumerate(dir_names):
label_dic[i] = dir_name
file_names = glob.glob(path + dir_name + "/*.txt")
for file_name in file_names:
if os.path.basename(file_name) == "LICENSE.txt":
continue
with open(file_name, "r") as f:
text = f.readlines()[3:]
text = "".join(text)
text = text.translate(str.maketrans({"\n":"", "\t":"", "\r":"", "\u3000":""}))
text_list.append(text)
label_list.append(i)
- データが格納されているフォルダを指定します。ここは自分で作成したディレクトリ名を指定してください。
- ライブドアニュースコーパスは分類ごとにディレクトリが分かれており、ディレクトリ名がそのまま分類名になっています。そのため、ラベル名を取得する目的で、ディレクトリ名を取得します。
- ラベル数の確認をします。私はメモリに乗り切らなかったため、ラベル数を大幅に削減しており、今回はラベル数3になっています。
- dataframe作成用にテキストとラベルのリストを作ります。
text_list
は入力値であるテキストのリスト、label_list
は正解ラベルのリストになります。label_dic
はどのラベル名がどのラベル番号に変換されているか?を辞書で管理します。 -
dir_names
で繰り返し処理をすることで各ディレクトリにアクセスします。その中でfile_names
を取得し、繰り返し処理することで、ディレクトリ配下にあるファイルにアクセスし、テキスト取得します。ニュースの本文は4行目以降からなので、f.readlines()[3:]
として取得しています。
Dataframeを作成する
document_df = pd.DataFrame(data={'text': text_list,'label': label_list})
入力値のリストであるtext_list
と正解ラベルのリストであるlabel_list
を使用してdataframeを作成します。document_df
の中身は以下のようになります。
text label
0 東京工業大学とソニーは、世界最高速6.3 Gb/sのミリ波無線データ伝送を実現する高周波 (... 0
1 就職活動や外回りの仕事中もインターネットが使えることで、面接や取引が有利に進んだ、という場面... 0
2 行楽の季節は残念なことに、過酷な交通渋滞の季節でもある。運転者の体力的消耗は、道路を共有する... 0
3 テレビCMも多く放送されているため、誰もが見たことがあるだろうauの人気プランと言えば「au... 0
4 先日、気象庁より「関東甲信地方は梅雨入りしたとみられる」と発表された。平年より1日遅く、統計... 0
ラベル重み計算
#--1
label_count = document_df['label'].value_counts(sort=False)
label_count_dict = label_count.to_dict()
print(f'各ラベル数:{label_count_dict}')
#--2
label_weight = []
for i in range(label_n):
rate = (label_count_dict[i] / len(document_df))*100
weight = 100 - rate
label_weight.append(weight)
print(f'ラベル重み:{label_weight}')
今回のデータはそうでもないのですが、データによってはラベルごとにデータ数の偏りがある場合があります。その場合、dataloaderを作成する際にsamplerを渡してラベルの偏りがないようにすること、損失関数にラベルの重みを渡してデータ数の少ないラベルの重みをあげてあげることが必要になります。
今回はラベル数に偏りはないですが、今後の応用も考えてラベルの重み計算をし、適用していきます。
-
各ラベル数をカウントしています。
value_counts
はpandas.Seriesのメソッドで、ユニークな要素の出現回数をSeries形式で返します。to_dict()
はそれを辞書に変換します。label_count_dict
の中身は以下のようになります。各ラベル数:{0: 398, 1: 504, 2: 390}
-
各ラベルの重みを計算しリスト化します。
重みは全体に対する比で計算しており、100-((ラベルiのラベル数/全体のラベル数)*100)
で計算しています。
リストを作成する時、リストのインデックスと数値化されたラベルの値が対応するように並び替えをします(この例では辞書のラベル値は順番に並んでいますが並ばないこともあります)。label_weight
の値は以下のようになります。ラベル重み:[69.19504643962848, 60.9907120743034, 69.81424148606811]
データを分割する
#--1
train_val_df, test_df = train_test_split(document_df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.1, random_state=42)
#--2
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
-
データを「学習用」「検証用」「テスト用」の3つに分割します。まず「学習&検証用データ」と「テストデータ」の2つに分割し、それをさらに「学習用データ」、「検証用データ」の2つに分割します。
train_test_splittrain_test_split
を使用すると、ndarrayやリストなどを二分割することができます。第一引数には分割対象のデータをしてします。
test_size
はテストデータの割合、もしくは個数を指定します。割合の場合は0.0~1.0の数字が入ります。random_state
は乱数を固定し、同じ乱数であれば常に同じように分割されるようにします。デフォルトシャッフルが行われますが、シャッフルしたくない場合、
shuffle**=**False
という引数を指定することでシャッフルをせず分割します。 -
分割したデータのインデックスを振り直します。
reset_index
reset_index
はpandas.DataFrameやpandas.Seriesのインデックスを振り直します。drop=True
とすると元のindexを削除します。
Datasetを生成するクラスを定義する
class DatasetGenerator(Dataset): #--1
def __init__(self, data, tokenizer, label_weight):
self.data = data
self.tokenizer = tokenizer
self.class_weights = label_weight
self.sample_weights = [0] * len(data)
self.max_length = 256 #--2
def __len__(self):
return len(self.data)
def __getitem__(self, index):
data_row = self.data.iloc[index]
text = data_row['text']
labels = data_row['label']
#--3
encoding = self.tokenizer.encode_plus(
text,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
return dict(
input_ids=encoding["input_ids"].flatten(),
attention_mask=encoding["attention_mask"].flatten(),
token_type_ids=encoding["token_type_ids"].flatten(),
labels=torch.tensor(labels)
)
def get_sampler(self): #--4
for idx, row in self.data.iterrows():
label = row['label']
class_weight = self.class_weights[label]
self.sample_weights[idx] = class_weight
sampler = WeightedRandomSampler(self.sample_weights, num_samples=len(self.sample_weights), replacement=True)
return sampler
-
今回はDatasetを自作しています。
torch.utils.data.Dataset
Datasetを自作する場合、このクラスを継承する必要があります。このクラスを継承したサブクラスは
__getitem__()
を上書きして、指定したキーのデータサンプルを取得できるようにする必要があります(つまり、辞書形式で値を返す必要がある)。https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset
-
入力テキストをエンコードする時、トークン数を最大で何個にするか?を指定します。BERTの場合、最大512まで指定できますが、メモリに乗らなくなってしまったのでここでは256にしています。
-
入力テキストをエンコードします。
self.tokenizer.encode_plus
self.tokenizer.encode_plus
を使うことで、入力値をトークナイズし、トークナイズした入力値をidに変換しinput_ids
として出力されます)、BERTの入力値として必要なtoken_type_ids
、attention_mask
と一緒に出力してくれます。padding="max_length"
を指定するとmax_length
に指定したトークン数を最大値としてパディングをしてくれます。パディングとは、トークン数が文章によってバラバラになるので、空白部分を0で埋めることです。padding="longest"
とするとバッチ内で最も長い文章に合わせてパディングします。return_tensors='pt'
としておくことで、Pytorchのテンソルオブジェクトを返してくれます。 -
サンプラーを設定しています。サンプルが偏らないように、ラベルごとに重みを設定しておき、それに応じてDataloaderを作成してくれます。
class_weights
にはあらかじめラベルの比によって決めておいた重みが設定されています。 それを参照しながら各行のラベルの重みのリストをself.sample_weights
に格納します。WeightedRandomSampler
WeightedRandomSamplerは
weight
を確率とみなし、0~len(weights)-1
の間の数字からサンプリングをします。これをDataloaderに渡します。第一引数にweightを渡し、第二引数に取得したいサンプル数を指定します。
https://pytorch.org/docs/stable/data.html#torch.utils.data.WeightedRandomSampler
DataLoaderを生成するクラスを定義する
class DataModuleGenerator(pl.LightningDataModule): #--1
def __init__(self, train_df, val_df, test_df, tokenizer, batch_size, label_weight):
super().__init__()
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
self.tokenizer = tokenizer
self.batch_size = batch_size
self.label_weight = label_weight
def setup(self): #--2
self.train_dataset = DatasetGenerator(self.train_df, self.tokenizer, self.label_weight)
self.valid_dataset = DatasetGenerator(self.val_df, self.tokenizer, self.label_weight)
self.test_dataset = DatasetGenerator(self.test_df, self.tokenizer, self.label_weight)
self.train_sampler = self.train_dataset.get_sampler()
self.valid_sampler = self.valid_dataset.get_sampler()
def train_dataloader(self): #--3
return DataLoader(self.train_dataset, batch_size=self.batch_size["train"], num_workers=os.cpu_count(), sampler=self.train_sampler)
def val_dataloader(self): #--4
return DataLoader(self.valid_dataset, batch_size=self.batch_size["val"], num_workers=os.cpu_count(), sampler=self.valid_sampler)
def test_dataloader(self): #--5
return DataLoader(self.test_dataset, batch_size=self.batch_size["test"], num_workers=os.cpu_count())
DataLoaderとはDatasetからサンプルを取得し、ミニバッチを作成してくれるクラスです。
https://pytorch.org/docs/stable/data.html#module-torch.utils.data
-
LightningDataModuleを継承したクラスを定義します。このクラスがDataLoaderを作成します。
LightningDataModule
通常のPytorchのコードでは、データの前処理がさまざまなファイルに散らばりがちで、正確な分割や変換を共有、再利用することが困難であるため、クラス化しようという発想のもと用意されたクラスです。
https://pytorch-lightning.readthedocs.io/en/stable/data/datamodule.html#lightningdatamodule
-
このメソッドを実行することで、学習、検証、テスト用のDatasetを用意することができます。先ほど定義したDatasetGeneratorクラスを呼び出しています。また、samplerもこの時に一緒にセットしてしまいます。
-
学習用のDataLoaderを用意します。これは
Trainer.fit()
メソッドで使用するDataLoaderになります。num_workers
はワーカープロセス数を設定します。DataLoaderはデフォルトでシングルプロセスのデータロードを行います。Datasetが小さくてメモリに完全に乗り切る場合にはこれでいいですが、そうでない場合は効率が悪いです。num_workers
に正の整数を設定すると指定された数のワーカープロセスによるデータローディングをしてくれます(マルチプロセスで実行してくれるということ)。ここでは使用しているPCのCPUの数を指定しています(CPUというか、コア数?)。https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading
-
検証用のDataLoaderを用意します。
Trainer.fit()
やTrainer.validate()
で使用します。 -
テスト用のDataLoaderを用意します。
Trainer.test()
で使用します。
ちなみにpredict_dataloaderもある。
モデル生成を行うクラスを定義する
# Pytorch Lightningを使用するためのClass
class Model(pl.LightningModule): #--1
def __init__(self, model_name, num_labels, lr):
super().__init__()
self.save_hyperparameters() #--2
self.bert_sc = BertForSequenceClassification.from_pretrained(
model_name,
num_labels=num_labels
) #--3
def training_step(self, batch, batch_idx):
output = self.bert_sc(**batch) #--4
weights = torch.tensor(label_weight).cuda() #--5
cross_entropy_loss = nn.CrossEntropyLoss(weight=weights) #--5
loss = cross_entropy_loss(output.logits, batch['labels'])
self.log('train_loss', loss)
return loss
def validation_step(self, batch, batch_idx):
output = self.bert_sc(**batch)
val_loss = output.loss
self.log('val_loss', val_loss) #--7
def test_step(self, batch, batch_idx):
labels = batch.pop('labels')
output = self.bert_sc(**batch)
labels_predicted = output.logits.argmax(-1)
num_correct = ( labels_predicted == labels ).sum().item()
accuracy = num_correct/labels.size(0)
self.log('accuracy', accuracy)
def configure_optimizers(self): #--7
return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
-
pl.LightningModuleクラスを継承したクラスを定義します。
LightningModule
LightningModuleを使用することでPytorchのコードを以下の6つのセクションに整理でき、すっきりと書くことができます。
Computations (init). Train Loop (training_stepメソッド) Validation Loop (validation_stepメソッド) Test Loop (test_stepメソッド) Prediction Loop (predict_stepメソッド) Optimizers and LR Schedulers (configure_optimizersメソッド)
-
引数を保存します。
save_hyperparameters
__init__
メソッド内でsave_hyperparameters()
を使用すると、self.hparams
に全ての引数を保存することができます。self.hparams.引数名
という形で各メソッド内で引数を参照することができるようになります。 -
BertForSequenceClassificationの事前学習モデルの読み込みを行います。
-
BERTによる予測結果を取得します。この時の
**batch
の中身はid化された入力値(input_ids
)だけでなく、token_type_ids
、attention_mask
も渡します。 -
各ラベルの重みをテンソル化&cuda対応させて、
CrossEntropyLoss()
に渡します。CrossEntropyLoss()
は交差エントロピーですが、Pytorchの場合、この損失関数内でソフトマックス関数の処理まで行われている点に注意です。 -
損失を
val_loss
の名前でログを取ります。これは後ほどモデルのチェックポイントを生成するために使用します。 -
オプティマイザーの設定です。ここではAdamを使用しています。
チェックポイントの設定
checkpoint = pl.callbacks.ModelCheckpoint(
monitor='val_loss', #--1
mode='min', #--2
save_top_k=1, #--3
save_weights_only=True, #--4
dirpath='models', #--5
)
ModelCheckpoint
checkpointはトレーニングの過程でモデルの状態を保存します(=チェックポイントを生成する)。トレーニングが終了すると、トレーニング中に発見した最高パフォーマンスに対応するチェックポイントを使用することができます。
https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.callbacks.ModelCheckpoint.html
- 監視する指標を設定します。デフォルトはNoneで最後のエポックのみについてチェックポイントを保存します。
- 監視する指標が最大の時か最小の時か、どちらに基づいて保存ファイルを上書きするべきか?を設定します。
val_loss
の場合は損失なので、小さい方が良いため、min
を指定します。 - 上位何件のモデルを保存するか?を指定します。今回はベストモデルさえ取得できればよいので、1でいいです。
- Trueの場合、モデルの重みだけが保存されます。Falseの場合、オプティマイザの状態やlr-schedulerの状態などもチェックポイントに追加されます。
-
dirpath=
にはモデルファイルを保存するディレクトリのパスを指定します。
filename=
でファイル名を指定することもできます。
EarlyStoppingの設定
early_stopping = EarlyStopping(monitor='val_loss',verbose=True, mode="min")
EarlyStopping
lossがいい感じに下がり切ったところで学習を止めてくれるやつです。
EarlyStoppingを有効化するには以下の設定を行います。
-
from pytorch_lightning.callbacks import EarlyStopping
でインポートします。 - log()メソッドを使用して監視する指標をログに記録します。Modelクラスのvalidation_stepメソッド内で定義している
self.log('val_loss', val_loss)
などがそれです。この場合はval_loss
をval_loss
という名前で監視する指標に設定しています。 - コールバックを初期化し
monitor=
に指標を設定します。 - 監視対象の指標について、
mode=
を設定する。この場合はlossが最小になったところで止まってほしいのでmode="min"
とします。 - Trainer callbacksフラグにEarlyStoppingコールバックを渡します。
https://pytorch-lightning.readthedocs.io/en/stable/common/early_stopping.html
Trainerの設定
trainer = pl.Trainer(
gpus=1, #--1
max_epochs=20, #--2
callbacks = [checkpoint, early_stopping] #--3
)
Trainer
PytorchのコードをLightningModuleに整理すれば、あとばTrainerが全て自動化してくれる、という謳い文句のクラスです。
https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#
- 使用するGPUの数を設定します。
- エポック数の最大値を設定します。ここでは20としましたが、どうせEarlyStoppingを使用するので、もっと多めに設定しておいてもいいかもしれません。
- コールバックを設定します。
学習を実行する
model = Model(MODEL_NAME, num_labels=label_n, lr=1e-5)#--1
trainer.fit(model, train_dataloaders=data_module.train_dataloader(), val_dataloaders=data_module.val_dataloader())
-
BERTのモデルをロードします。
-
trainer.fit()
で学習を実行できます。train_dataloaders=
とするかもしくは第一引数にで学習対象のDataLoaderを、val_dataloaders=
とするかもしくは第二引数に検証用のDataLoaderを設定します。
結果
今回のモデルでは、precision、recall、f1-score共に0.92の高いスコアを出すことができています。