どうもこんにちは。
今回は、以前投稿したBERTモデルのトレーニングをチェックポイントありの状態でやってみました。
とりあえずBERTモデルのトレーニングをやってみたいという方は以下の記事の通りにやっていただくことをお勧めします。
チェックポイントって何?
めちゃんこ大量のデータを使用してトレーニングを行いたい時が人によってはあると思います。その時になんらかの不具合が発生して、トレーニングが途中で止まってしまうということがあります。
チェックポイントを使用してトレーニングを行うと、止まってしまった部分からトレーニングを再開することができます。
他にも、定期的にシステムからデータを取得して追加でモデルをトレーニングさせたい という時にも、チェックポイントを使用することで実現できます。
BERTってなに?
そもそもBERTモデルの話をしていなかったので、ここでしておきます。
BERTモデルとは、Googleが開発した自然言語処理モデルのことです。
現在のGoogleの検索エンジンには、このBERTモデルが使用されており、検索ボックスに入力された内容を自然言語的に解釈することが可能になっています。
BERTモデルは簡単に使用できる
BERTモデルは、HuggingFaceという開発者向けプラットフォームから簡単に使用することができます。
以下の記事を見ながら、Google Colabで試すことができます。(トレーニングには、結構時間かかりました。)
BERTモデルのトレーニング(チェックポイントあり)
SageMakerノートブックインスタンスの立ち上げは過去の記事であげているので省略します。
手順0. S3にバケットに必要になるディレクトリを作成しておきましょう
バケット名: sagemaker-bert-test
|- dataset # ここに用意したデータを保存しておく
|- comment_data.csv
|- sample_train_data
|- train # トレーニング用データを格納するパス
|- test # 評価データを格納するパス
手順1. 必要なライブラリのインポート
!pip install sagemaker --upgrade --quiet
!pip install datasets transformers
import boto3
import sagemaker
from sagemaker.huggingface import HuggingFace
import torch
手順2. 必要情報の取得
トレーニング用データをS3に保管していたので、データを引っ張ってくるためにboto3
を使用します。
また、SageMakerノートブックインスタンスからS3にアクセスできるように、IAM
をいじっておきましょう。(AWSコンソール>SageMaker>NoteBooks>任意のノートブック>IAMロールARN>IAMいじる
)
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
aws_region = boto3.Session().region_name
bucket_name = '<任意の文字列バケット名>'
手順3. トレーニング実行コードを記述
# トレーニング用データと評価データを格納するパスを宣言しておく
training_input_path = f's3://{bucket_name}/sample_train_data/train'
test_input_path = f's3://{bucket_name}/sample_train_data/test'
# ハイパーパラメータを設定
hyperparameters = {
'model_name_or_path': 'bert-base-uncased', # たくさんのBERTモデルから好きなものを選択
'task_name': 'text-classification', # たくさんのタスクの中からテキスト分類を選択
'do_train': True, # 学習しまっせ
'do_eval': True, # 評価しまっせ
'train_batch_size': 20, # 学習時の並列処理の数(大きくすると処理は早くなるがメモリを多く消費します。)
'eval_batch_size': 20, # 評価時の並列処理の数(大きくすると処理は早くなるがメモリを多く消費します。)
'num_train_epochs': 3, # 学習時のエピソード数を設定します
'learning_rate': 5e-5, # 学習率を設定します
'output_dir': '/opt/ml/checkpoints' # チェックポイントを使用するには、このように設定します。
}
huggingface_estimator = HuggingFace(
entry_point='train.py', # トレーニングスクリプトを指定します
source_dir='./scripts', # トレーニングスクリプトの保存されているディレクトリを指定します
instance_type='ml.g4dn.xlarge', # トレーニングを行う時のインスタンスタイプを指定します。(そこそこのGPUを持ったインスタンスでないと動作しません。)
instance_count=1, # インスタンスの数を指定します。
checkpoint_s3_uri=f's3://{bucket_name}/checkpoints', # チェックポイントを保存する場所を指定します。
role=role,
transformers_version='4.26',
pytorch_version='1.13',
py_version='py39',
hyperparameters=hyperparameters # 上で設定したハイパーパラメータを渡します。
)
# いざトレーニング実行!!
huggingface_estimator.fit({'train': training_input_path, 'test': test_input_path})
このまま実行しても、/scripts/train.py
を作ってないのでエラーが起きるはずです。作っていきましょー
手順4. トレーニングスクリプトの作成
4-1. 必要なライブラリをインポート
from transformers import AutoModelForSequenceClassification, Trainer, TrainingArguments, AutoTokenizer
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from datasets import load_from_disk, load_dataset, DatasetDict
import random
import logging
import sys
import argparse
import os
import boto3
import pandas as pd
from io import StringIO
from urllib.parse import urlparse
from transformers.trainer_utils import get_last_checkpoint
4-2. 必要な関数を作成する
必要な関数は、以下です。
- データにラベルづけするための関数
- テキストをトークン化する関数
- メトリクス操作を行う関数
- ファイルをS3にアップロードする関数
データにラベルづけするための関数
def determine_label
comment = row['comment']
good_phrase = ['嬉しい', '楽しい', '好き']
normal_phrase = ['普通', 'まあまあ']
bad_phrase = ['うれしくない', '楽しくない', '嫌い']
# Goodコメント
if any(phrase in comment for phrase in good_phrase):
return 2
# ノーマルコメント
if any(phrase in comment for phrase in normal_phrase):
return 1
# Badコメント
if any(phrase in comment for phrase in bad_phrase):
return 0
# 分類なしコメント
retuen 3
トークン化関数
def tokenize_comment(batch):
return tokenizer(batch['comment'], padding='max_length', truncation=True)
メトリクス操作関数
def compute_metrics(pred):
labels = pred.label_ids
preds = pred.predictions.argmax(-1)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted")
acc = accuracy_score(labels, preds)
return {"accuracy": acc, "f1": f1, "precision": precision, "recall": recall}
S3アップロード関数
def upload_to_s3(file_path, bucket, object_name):
with open(file_path, 'rb') as f:
s3_client.put_object(Bucket=bucket, Key=object_name, Body=f.read())
4-3. メインコード記述
if __name__ == "__main__":
# ハイパーパラメータのパース
parser = argparse.ArgumentParser()
parser.add_argument("--model_name_or_path", type=str, default='bert-base-uncased')
parser.add_argument("--task_name", type=str, default='text-classification')
parser.add_argument("--do_train", type=bool, default=True)
parser.add_argument("--do_eval", type=bool, default=True)
parser.add_argument("--train_batch_size", type=int, default=20)
parser.add_argument("--eval_batch_size", type=int, default=20)
parser.add_argument("--num_train_epochs", type=int, default=3)
parser.add_argument("--learning_rate", type=float, default=5e-5)
parser.add_argument("--output_dir", type=str, default='/opt/ml/model')
parser.add_argument("--checkpoint_path", type=str, default=None)
parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--n_gpus", type=int, default=os.environ.get("SM_NUM_GPUS", 0))
parser.add_argument("--training_dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
parser.add_argument("--test_dir", type=str, default=os.environ["SM_CHANNEL_TEST"])
args, _ = parser.parse_known_args()
# ロガー定義
logger = logging.getLogger(__name__)
# ログ設定定義
logging.basicConfig(
level=logging.getLevelName("INFO"),
handlers=[logging.StreamHandler(sys.stdout)],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# boto3を使用してS3のコメントデータCSVを取得
s3_client = boto3.client('s3')
bucket_name = '<任意の文字列>'
file_name = 'dataset/comment_data.csv'
# S3からファイルを読み込む
response = s3_client.get_object(Bucket=bucket_name, Key=file_name)
file_content = response['Body'].read()
file_str = file_content.decode('utf-8')
# CSVデータからpandasデータフレームを作成
df = pd.read_csv(StringIO(file_str), header=0)
# CSVデータにlabelカラムを追加してラベル付け
df.insert(0, 'label', df.apply(determine_label, axis=1))
# ノートブックインスタンスにcsv_datasフォルダがない場合に作成
os.makedirs('/opt/ml/input/data/csv_datas', exist_ok=True)
# 学習用データ
## ノートブックインスタンスにデータを保存
training_data_path = '/opt/ml/input/data/csv_datas/training_data.csv'
df.to_csv(training_data_path, index=False)
## S3へアップロード
upload_to_s3(training_data_path, bucket_name, 'csv_data/training_data.csv')
# 評価用データ
## ノートブックインスタンスにデータを保存
test_data_path = '/opt/ml/input/data/csv_datas/test_data.csv'
df.sample(1000).to_csv(test_data_head10_path, index=False)
## S3へアップロード
upload_to_s3(test_data_path, bucket_name, 'csv_data/test_data.csv')
# データをデータセットとしてロード
train_dataset = load_dataset('csv', data_files=train_csv_path, split='train')
test_dataset = load_dataset('csv', data_files=test_data_path, split='train')
# DatasetDictに変換
dataset_dict = DatasetDict({
'train': train_dataset,
'test': test_dataset
})
# トークナイザーを定義
tokenizer = AutoTokenizer.from_pretrained(arg.model_name_or_path)
# トレーニングできる形に変換
dataset_dict = dataset_dict.map(tokenize_comment, batched=True)
dataset_dict = dataset_dict.rename_column("label", "labels")
dataset_dict.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
# トレーニング用データをディスク上に保存
training_input_path = f's3://{bucket_name}/sample_train_data/train'
dataset_dict['train'].save_to_disk(training_input_path)
# 評価用データをディスク上に保存
test_input_path = f's3://{bucket_name}/sample_train_data/test'
dataset_dict['test'].save_to_disk(test_input_path)
# テキスト分類のために使用するモデルとラベルの数を指定する(今回は0,1,2,3の4つです。)
model = AutoModelForSequenceClassification.from_pretrained(args.model_name_or_path, num_labels=4)
# ハイパーパラメータの定義
training_args = TrainingArguments(
output_dir=args.output_dir,
num_train_epochs=args.num_train_epochs,
per_device_train_batch_size=args.train_batch_size,
per_device_eval_batch_size=args.eval_batch_size,
evaluation_strategy="epoch",
logging_dir=f"{args.output_dir}/logs",
learning_rate=float(args.learning_rate),
do_train=args.do_train,
do_eval=args.do_eval,
save_strategy='steps', # チェックポイントを保存するタイミングを定義します
save_steps=100, # チェックポイントを保存する細かいタイミングを定義します。(100ステップのトレーニングが終わるたびにチェックポイントを保存します。)
save_total_limit=5 # チェックポイントを保存できる数を指定します。(チェックポイントが最新の5個以外保存できません。そのため、どんどん上書きされていきます。
)
# トレーナークラスの定義
trainer = Trainer(
model=model,
args=training_args,
compute_metrics=compute_metrics,
train_dataset=dataset_dict["train"],
eval_dataset=dataset_dict["test"],
tokenizer=tokenizer
)
# チェックポイントがある場合は途中から、ない場合は最初からトレーニングを開始します。
if get_last_checkpoint(args.output_dir) is not None:
logger.info("***** continue training *****")
last_checkpoint = get_last_checkpoint(args.output_dir)
trainer.train(resume_from_checkpoint=last_checkpoint)
else:
logger.info("***** new training *****")
trainer.train()
# トレーニング後のモデルを評価します。
eval_result = trainer.evaluate(eval_dataset=dataset_dict["test"])
# 評価結果をテキストで保存しています。
with open(os.path.join(args.output_data_dir, "eval_results.txt"), "w") as writer:
print(f"***** Eval results *****")
for key, value in sorted(eval_result.items()):
writer.write(f"{key} = {value}\n")
# モデルを保存します。
trainer.save_model(args.output_dir)
ヒェ〜長い
手順5. モデルのデプロイ
RailsアプリケーションやLambdaなどのコードからエンドポイントを通じて推論ができるようにデプロイします。
今回は、推論リクエストを送った時だけ動作するサーバレスエンドポイントを使用します。
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=6144,
max_concurrency=2
)
# モデルデプロイ
predictor = huggingface_estimator.deploy(
initial_instance_count=1,
instance_type='ml.g4dn.xlarge',
endpoint_name='serverless-endpoint-01', # ここはお好きにどうぞ
serverless_inference_config=serverless_config
)
手順6. 動作確認
# 評価コード
input_comment= {"inputs":"嬉しいです"}
predictor.predict(input_comment)
# => [{'label': 'LABEL_2', 'score': 0.7034105062484741}]みたいな結果が返ってくればOK
手順7. 使用したらちゃんとお片付け
# モデルとエンドポイントの削除
predictor.delete_model()
predictor.delete_endpoint()
まとめ
トレーニングスクリプトを書くのが大変だと思いますが、