LoginSignup
7

More than 1 year has passed since last update.

PyTorch LightningとBERTを用いた文書多値分類

Last updated at Posted at 2022-12-23

概要

こちらに掲載しているライブドアニュースコーパスを利用した多値分類モデルのコードの解説になります。学習部分までを解説しており、予測部分は省略しています。

使用する環境・データ

GoogleColabを使用します。
データはLivedoorニュースコーパスを使用します。

事前準備

自身のGoogleドライブにライブドアニュースコーパスを配置してください。
必要に応じてデータ数を削減してください。

コード解説

コード内に#—1のような形でコメントを入れ、コメントに対応したインデックス番号で該当のコードについて説明しています。

Googleドライブをマウント

from google.colab import drive
drive.mount("/content/drive/")

まずGoogleColabに自分のGoogleドライブをマウントします。

cudaのバージョン確認

!nvcc --version

cudaのバージョンを確認しておきます。cudaのバージョンによってインストールが必要なPytorchのバージョンが異なってきます。

各種ライブラリインストール

!pip install protobuf==3.9.2
!pip install transformers==4.20.1 fugashi==1.1.2 ipadic==1.0.0 torchtext==0.11.0 pytorch-lightning==1.6.4 numpy==1.21.6 openpyxl
!pip install torch==1.10.0+cu111 torchvision==0.11.0+cu111 torchaudio==0.10.0 -f https://download.pytorch.org/whl/torch_stable.html

各種ライブラリインポート

import glob
import os
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from torch import nn 
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from transformers import BertJapaneseTokenizer, BertForSequenceClassification
import pytorch_lightning as pl
import random
import numpy as np
from tqdm import tqdm
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, StochasticWeightAveraging

# 日本語の事前学習モデル
MODEL_NAME = 'cl-tohoku/bert-base-japanese-whole-word-masking'

東北大学 乾・鈴木研究室により公開されている事前学習モデルを利用させてもらいます。

データの前処理

path = "/content/drive/My Drive/機械学習/text/"  #--1

#--2
dir_files = os.listdir(path=path)
dir_names = [f for f in dir_files if os.path.isdir(os.path.join(path, f))]

#--3
label_n = len(dir_names)
print(f'=={label_n}==')

#--4
text_list = []
label_list = []
label_dic = {}

#--5
for i, dir_name in enumerate(dir_names):
    label_dic[i] = dir_name
    file_names = glob.glob(path + dir_name + "/*.txt") 
    for file_name in file_names:
        if os.path.basename(file_name) == "LICENSE.txt":
            continue
        with open(file_name, "r") as f:
            text = f.readlines()[3:]
            text = "".join(text)
            text = text.translate(str.maketrans({"\n":"", "\t":"", "\r":"", "\u3000":""})) 
            text_list.append(text)
            label_list.append(i)
  1. データが格納されているフォルダを指定します。ここは自分で作成したディレクトリ名を指定してください。
  2. ライブドアニュースコーパスは分類ごとにディレクトリが分かれており、ディレクトリ名がそのまま分類名になっています。そのため、ラベル名を取得する目的で、ディレクトリ名を取得します。
  3. ラベル数の確認をします。私はメモリに乗り切らなかったため、ラベル数を大幅に削減しており、今回はラベル数3になっています。
  4. dataframe作成用にテキストとラベルのリストを作ります。text_listは入力値であるテキストのリスト、label_listは正解ラベルのリストになります。label_dicはどのラベル名がどのラベル番号に変換されているか?を辞書で管理します。
  5. dir_namesで繰り返し処理をすることで各ディレクトリにアクセスします。その中でfile_namesを取得し、繰り返し処理することで、ディレクトリ配下にあるファイルにアクセスし、テキスト取得します。ニュースの本文は4行目以降からなので、f.readlines()[3:]として取得しています。

Dataframeを作成する

document_df = pd.DataFrame(data={'text': text_list,'label': label_list})

入力値のリストであるtext_listと正解ラベルのリストであるlabel_listを使用してdataframeを作成します。document_dfの中身は以下のようになります。

text  label
0  東京工業大学とソニーは世界最高速6.3 Gb/sのミリ波無線データ伝送を実現する高周波 (...      0
1  就職活動や外回りの仕事中もインターネットが使えることで面接や取引が有利に進んだという場面...      0
2  行楽の季節は残念なことに過酷な交通渋滞の季節でもある運転者の体力的消耗は道路を共有する...      0
3  テレビCMも多く放送されているため誰もが見たことがあるだろうauの人気プランと言えばau...      0
4  先日気象庁より関東甲信地方は梅雨入りしたとみられると発表された平年より1日遅く統計...      0

ラベル重み計算

#--1
label_count = document_df['label'].value_counts(sort=False)
label_count_dict = label_count.to_dict()
print(f'各ラベル数:{label_count_dict}')

#--2
label_weight = []
for i in range(label_n):
    rate = (label_count_dict[i] / len(document_df))*100
    weight = 100 - rate
    label_weight.append(weight)
print(f'ラベル重み:{label_weight}')

今回のデータはそうでもないのですが、データによってはラベルごとにデータ数の偏りがある場合があります。その場合、dataloaderを作成する際にsamplerを渡してラベルの偏りがないようにすること、損失関数にラベルの重みを渡してデータ数の少ないラベルの重みをあげてあげることが必要になります。
今回はラベル数に偏りはないですが、今後の応用も考えてラベルの重み計算をし、適用していきます。

  1. 各ラベル数をカウントしています。value_countsはpandas.Seriesのメソッドで、ユニークな要素の出現回数をSeries形式で返します。to_dict()はそれを辞書に変換します。label_count_dictの中身は以下のようになります。

    各ラベル数:{0: 398, 1: 504, 2: 390}
    
  2. 各ラベルの重みを計算しリスト化します。
    重みは全体に対する比で計算しており、100-((ラベルiのラベル数/全体のラベル数)*100)で計算しています。
    リストを作成する時、リストのインデックスと数値化されたラベルの値が対応するように並び替えをします(この例では辞書のラベル値は順番に並んでいますが並ばないこともあります)。label_weightの値は以下のようになります。

    ラベル重み[69.19504643962848, 60.9907120743034, 69.81424148606811]
    

データを分割する

#--1
train_val_df, test_df = train_test_split(document_df, test_size=0.2, random_state=42)
train_df, val_df = train_test_split(train_val_df, test_size=0.1, random_state=42)

#--2
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)
  1. データを「学習用」「検証用」「テスト用」の3つに分割します。まず「学習&検証用データ」と「テストデータ」の2つに分割し、それをさらに「学習用データ」、「検証用データ」の2つに分割します。
    train_test_split

    train_test_splitを使用すると、ndarrayやリストなどを二分割することができます。

    第一引数には分割対象のデータをしてします。

    test_sizeはテストデータの割合、もしくは個数を指定します。割合の場合は0.0~1.0の数字が入ります。

    random_stateは乱数を固定し、同じ乱数であれば常に同じように分割されるようにします。

    デフォルトシャッフルが行われますが、シャッフルしたくない場合、shuffle**=**Falseという引数を指定することでシャッフルをせず分割します。

  2. 分割したデータのインデックスを振り直します。

    reset_index

    reset_indexはpandas.DataFrameやpandas.Seriesのインデックスを振り直します。drop=Trueとすると元のindexを削除します。

Datasetを生成するクラスを定義する

class DatasetGenerator(Dataset): #--1
    def __init__(self, data, tokenizer, label_weight):
        self.data = data
        self.tokenizer = tokenizer
        self.class_weights = label_weight
        self.sample_weights = [0] * len(data)
        self.max_length = 256 #--2

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        data_row = self.data.iloc[index]
        text = data_row['text']
        labels = data_row['label']
        
				#--3
        encoding = self.tokenizer.encode_plus(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )

        return dict(
            input_ids=encoding["input_ids"].flatten(),
            attention_mask=encoding["attention_mask"].flatten(),
            token_type_ids=encoding["token_type_ids"].flatten(),
            labels=torch.tensor(labels)
        )
    
    def get_sampler(self): #--4
        for idx, row in self.data.iterrows():
            label = row['label']
            class_weight = self.class_weights[label]
            self.sample_weights[idx] = class_weight
        sampler = WeightedRandomSampler(self.sample_weights, num_samples=len(self.sample_weights), replacement=True)
        return sampler
  1. 今回はDatasetを自作しています。

    torch.utils.data.Dataset

    Datasetを自作する場合、このクラスを継承する必要があります。このクラスを継承したサブクラスは__getitem__()を上書きして、指定したキーのデータサンプルを取得できるようにする必要があります(つまり、辞書形式で値を返す必要がある)。

    https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset

  2. 入力テキストをエンコードする時、トークン数を最大で何個にするか?を指定します。BERTの場合、最大512まで指定できますが、メモリに乗らなくなってしまったのでここでは256にしています。

  3. 入力テキストをエンコードします。

    self.tokenizer.encode_plus

    self.tokenizer.encode_plusを使うことで、入力値をトークナイズし、トークナイズした入力値をidに変換しinput_idsとして出力されます)、BERTの入力値として必要なtoken_type_idsattention_maskと一緒に出力してくれます。

    padding="max_length"を指定するとmax_lengthに指定したトークン数を最大値としてパディングをしてくれます。パディングとは、トークン数が文章によってバラバラになるので、空白部分を0で埋めることです。padding="longest"とするとバッチ内で最も長い文章に合わせてパディングします。

    return_tensors='pt'としておくことで、Pytorchのテンソルオブジェクトを返してくれます。

    https://huggingface.co/docs/transformers/v4.25.1/en/internal/tokenization_utils#transformers.PreTrainedTokenizerBase.encode_plus

  4. サンプラーを設定しています。サンプルが偏らないように、ラベルごとに重みを設定しておき、それに応じてDataloaderを作成してくれます。class_weightsにはあらかじめラベルの比によって決めておいた重みが設定されています。 それを参照しながら各行のラベルの重みのリストをself.sample_weightsに格納します。

    WeightedRandomSampler

    WeightedRandomSamplerはweightを確率とみなし、0~len(weights)-1の間の数字からサンプリングをします。これをDataloaderに渡します。

    第一引数にweightを渡し、第二引数に取得したいサンプル数を指定します。

    https://pytorch.org/docs/stable/data.html#torch.utils.data.WeightedRandomSampler

DataLoaderを生成するクラスを定義する

class DataModuleGenerator(pl.LightningDataModule): #--1
    def __init__(self, train_df, val_df, test_df, tokenizer, batch_size, label_weight):
        super().__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df
        self.tokenizer = tokenizer
        self.batch_size = batch_size
        self.label_weight = label_weight
        
    def setup(self): #--2
        self.train_dataset = DatasetGenerator(self.train_df, self.tokenizer, self.label_weight)
        self.valid_dataset = DatasetGenerator(self.val_df, self.tokenizer, self.label_weight)
        self.test_dataset = DatasetGenerator(self.test_df, self.tokenizer, self.label_weight)
        self.train_sampler = self.train_dataset.get_sampler()
        self.valid_sampler = self.valid_dataset.get_sampler()

    def train_dataloader(self): #--3
        return DataLoader(self.train_dataset, batch_size=self.batch_size["train"], num_workers=os.cpu_count(), sampler=self.train_sampler)

    def val_dataloader(self): #--4
        return DataLoader(self.valid_dataset, batch_size=self.batch_size["val"], num_workers=os.cpu_count(), sampler=self.valid_sampler)

    def test_dataloader(self): #--5
        return DataLoader(self.test_dataset, batch_size=self.batch_size["test"], num_workers=os.cpu_count())

DataLoaderとはDatasetからサンプルを取得し、ミニバッチを作成してくれるクラスです。

https://pytorch.org/docs/stable/data.html#module-torch.utils.data

  1. LightningDataModuleを継承したクラスを定義します。このクラスがDataLoaderを作成します。

    LightningDataModule

    通常のPytorchのコードでは、データの前処理がさまざまなファイルに散らばりがちで、正確な分割や変換を共有、再利用することが困難であるため、クラス化しようという発想のもと用意されたクラスです。

    https://pytorch-lightning.readthedocs.io/en/stable/data/datamodule.html#lightningdatamodule

  2. このメソッドを実行することで、学習、検証、テスト用のDatasetを用意することができます。先ほど定義したDatasetGeneratorクラスを呼び出しています。また、samplerもこの時に一緒にセットしてしまいます。

  3. 学習用のDataLoaderを用意します。これはTrainer.fit()メソッドで使用するDataLoaderになります。num_workersはワーカープロセス数を設定します。DataLoaderはデフォルトでシングルプロセスのデータロードを行います。Datasetが小さくてメモリに完全に乗り切る場合にはこれでいいですが、そうでない場合は効率が悪いです。num_workersに正の整数を設定すると指定された数のワーカープロセスによるデータローディングをしてくれます(マルチプロセスで実行してくれるということ)。ここでは使用しているPCのCPUの数を指定しています(CPUというか、コア数?)。

    https://pytorch.org/docs/stable/data.html#single-and-multi-process-data-loading

  4. 検証用のDataLoaderを用意します。Trainer.fit()Trainer.validate()で使用します。

  5. テスト用のDataLoaderを用意します。Trainer.test()で使用します。

ちなみにpredict_dataloaderもある。

モデル生成を行うクラスを定義する

# Pytorch Lightningを使用するためのClass
class Model(pl.LightningModule): #--1
        
    def __init__(self, model_name, num_labels, lr):
        super().__init__()
        self.save_hyperparameters() #--2
        self.bert_sc = BertForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels
        ) #--3

        
    def training_step(self, batch, batch_idx):
        output = self.bert_sc(**batch) #--4
        weights = torch.tensor(label_weight).cuda() #--5
        cross_entropy_loss = nn.CrossEntropyLoss(weight=weights) #--5
        loss = cross_entropy_loss(output.logits, batch['labels'])
        self.log('train_loss', loss)
        return loss
        
    def validation_step(self, batch, batch_idx):
        output = self.bert_sc(**batch)
        val_loss = output.loss
        self.log('val_loss', val_loss) #--7

    def test_step(self, batch, batch_idx):
        labels = batch.pop('labels')
        output = self.bert_sc(**batch)
        labels_predicted = output.logits.argmax(-1)
        num_correct = ( labels_predicted == labels ).sum().item()
        accuracy = num_correct/labels.size(0)
        self.log('accuracy', accuracy)

    def configure_optimizers(self): #--7
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
  1. pl.LightningModuleクラスを継承したクラスを定義します。

    LightningModule

    LightningModuleを使用することでPytorchのコードを以下の6つのセクションに整理でき、すっきりと書くことができます。

    Computations (init).
    Train Loop (training_stepメソッド)
    Validation Loop (validation_stepメソッド)
    Test Loop (test_stepメソッド)
    Prediction Loop (predict_stepメソッド)
    Optimizers and LR Schedulers (configure_optimizersメソッド)
    

    LightningModule - PyTorch Lightning 1.8.6 documentation

  2. 引数を保存します。

    save_hyperparameters

    __init__メソッド内でsave_hyperparameters()を使用すると、self.hparamsに全ての引数を保存することができます。self.hparams.引数名という形で各メソッド内で引数を参照することができるようになります。

  3. BertForSequenceClassificationの事前学習モデルの読み込みを行います。

  4. BERTによる予測結果を取得します。この時の**batchの中身はid化された入力値(input_ids)だけでなく、token_type_idsattention_maskも渡します。

  5. 各ラベルの重みをテンソル化&cuda対応させて、CrossEntropyLoss()に渡します。CrossEntropyLoss()は交差エントロピーですが、Pytorchの場合、この損失関数内でソフトマックス関数の処理まで行われている点に注意です。

  6. 損失をval_lossの名前でログを取ります。これは後ほどモデルのチェックポイントを生成するために使用します。

  7. オプティマイザーの設定です。ここではAdamを使用しています。

チェックポイントの設定

checkpoint = pl.callbacks.ModelCheckpoint(
    monitor='val_loss', #--1
    mode='min', #--2
    save_top_k=1, #--3
    save_weights_only=True, #--4
    dirpath='models', #--5
)

ModelCheckpoint

checkpointはトレーニングの過程でモデルの状態を保存します(=チェックポイントを生成する)。トレーニングが終了すると、トレーニング中に発見した最高パフォーマンスに対応するチェックポイントを使用することができます。
https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.callbacks.ModelCheckpoint.html

  1. 監視する指標を設定します。デフォルトはNoneで最後のエポックのみについてチェックポイントを保存します。
  2. 監視する指標が最大の時か最小の時か、どちらに基づいて保存ファイルを上書きするべきか?を設定します。val_lossの場合は損失なので、小さい方が良いため、minを指定します。
  3. 上位何件のモデルを保存するか?を指定します。今回はベストモデルさえ取得できればよいので、1でいいです。
  4. Trueの場合、モデルの重みだけが保存されます。Falseの場合、オプティマイザの状態やlr-schedulerの状態などもチェックポイントに追加されます。
  5. dirpath=にはモデルファイルを保存するディレクトリのパスを指定します。
    filename=でファイル名を指定することもできます。

EarlyStoppingの設定

early_stopping = EarlyStopping(monitor='val_loss',verbose=True, mode="min")

EarlyStopping
lossがいい感じに下がり切ったところで学習を止めてくれるやつです。
EarlyStoppingを有効化するには以下の設定を行います。

  • from pytorch_lightning.callbacks import EarlyStoppingでインポートします。
  • log()メソッドを使用して監視する指標をログに記録します。Modelクラスのvalidation_stepメソッド内で定義しているself.log('val_loss', val_loss)などがそれです。この場合はval_lossval_lossという名前で監視する指標に設定しています。
  • コールバックを初期化しmonitor=に指標を設定します。
  • 監視対象の指標について、mode=を設定する。この場合はlossが最小になったところで止まってほしいのでmode="min"とします。
  • Trainer callbacksフラグにEarlyStoppingコールバックを渡します。

https://pytorch-lightning.readthedocs.io/en/stable/common/early_stopping.html

Trainerの設定

trainer = pl.Trainer(
    gpus=1, #--1
    max_epochs=20, #--2
    callbacks = [checkpoint, early_stopping] #--3
)

Trainer
PytorchのコードをLightningModuleに整理すれば、あとばTrainerが全て自動化してくれる、という謳い文句のクラスです。
https://pytorch-lightning.readthedocs.io/en/stable/common/trainer.html#

  1. 使用するGPUの数を設定します。
  2. エポック数の最大値を設定します。ここでは20としましたが、どうせEarlyStoppingを使用するので、もっと多めに設定しておいてもいいかもしれません。
  3. コールバックを設定します。

学習を実行する

model = Model(MODEL_NAME, num_labels=label_n, lr=1e-5)#--1
trainer.fit(model, train_dataloaders=data_module.train_dataloader(), val_dataloaders=data_module.val_dataloader())
  1. BERTのモデルをロードします。

  2. trainer.fit()で学習を実行できます。

    train_dataloaders=とするかもしくは第一引数にで学習対象のDataLoaderを、val_dataloaders=とするかもしくは第二引数に検証用のDataLoaderを設定します。

    https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html#pytorch_lightning.trainer.trainer.Trainer.fit

結果

今回のモデルでは、precision、recall、f1-score共に0.92の高いスコアを出すことができています。

参考

https://qiita.com/tchih11/items/7e97db29b95cf08fdda0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7