こちらのサンプルノートブックをウォークスルーします。
このノートブックでは、Transformers APIとTorchDistributor APIを用いて、Hugging Faceのモデルをどのようにファインチューニングするのかの例を示します。本書のファインチューニングのガイドはこちらのHugging Face blog postをベースにしています。
要件
- Databricks Runtime ML 13.0以降
- (推奨) GPUインスタンス
使用するGPUの数の定義
この例では、4つのワーカーノードを持つクラスターを使います。異なる設定のクラスターを使う場合には、それに合わせてNUM_WORKERS
を更新してください。
私は2つのワーカーノードにしています。
import torch
#NUM_WORKERS = 4
NUM_WORKERS = 2
def get_gpus_per_worker(_):
import torch
return torch.cuda.device_count()
NUM_GPUS_PER_WORKER = sc.parallelize(range(4), 4).map(get_gpus_per_worker).collect()[0]
USE_GPU = NUM_GPUS_PER_WORKER > 0
データの前処理
データの前処理のためのtokenizerとcollatorを初期化します。
import torch
from transformers import TrainingArguments, Trainer
from transformers import AutoTokenizer, DataCollatorWithPadding
from sklearn.model_selection import train_test_split
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
IMDBデータセットのインポートと前処理
Hugging Faceのブログとこのノートブックの大きな違いの一つは、この例では3000データポイントではなく、すべてのIMDBデータを使うということです。
from datasets import load_dataset # huggingface Datasetsオプションのインポート
import pandas as pd
imdb = load_dataset("imdb")
train = pd.DataFrame(imdb["train"])
test = pd.DataFrame(imdb["test"])
texts = train["text"].tolist()
labels = train["label"].tolist()
train_texts, val_texts, train_labels, val_labels = train_test_split(
texts, labels, test_size=0.2)
train_encodings = tokenizer(train_texts, truncation=True)
val_encodings = tokenizer(val_texts, truncation=True)
class ImdbDataset(torch.utils.data.Dataset):
def __init__(self, encodings, labels):
self.encodings = encodings
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item['labels'] = torch.tensor(self.labels[idx])
return item
def __len__(self):
return len(self.labels)
tokenized_train = ImdbDataset(train_encodings, train_labels)
tokenized_test = ImdbDataset(val_encodings, val_labels)
トレーニング関数のセットアップ
TorchDistributor APIには、シングルノードでのマルチGPUトレーニングやマルチノードのトレーニングのサポートがあります。
train()
関数にシングルノードのコードをラッピングする際、Databricksではライブラリのpickle処理の問題を回避するために、train()関数にすべてのimport文を含めることをお勧めします。train_model()
で任意のpickle可能なオブジェクトを返却することができますが、処理グループなしにはpickle処理を行うことができないため、これはTrainer
を返却できないことを意味します。代わりに、ベストなチェックポイントのパスを返却し、外部でそれを使うことができます。
import numpy as np
from datasets import load_metric
from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)
def compute_metrics(eval_pred):
load_accuracy = load_metric("accuracy")
load_f1 = load_metric("f1")
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
accuracy = load_accuracy.compute(predictions=predictions, references=labels)["accuracy"]
f1 = load_f1.compute(predictions=predictions, references=labels)["f1"]
return {"accuracy": accuracy, "f1": f1}
#output_dir = "/dbfs/rithwik-db/imdb/finetuning-sentiment-model-v1" # Save to DBFS (required)
output_dir = "/Volumes/users/takaaki_yayoi/imdb/finetuning-sentiment-model-v1" # Volumeに保存 (必須)
def train_model():
from transformers import TrainingArguments, Trainer
training_args = TrainingArguments(
output_dir=output_dir,
learning_rate=2e-5,
per_device_train_batch_size=16,
per_device_eval_batch_size=16,
num_train_epochs=2,
weight_decay=0.01,
save_strategy="epoch",
report_to=[], # ここではMLFLOW連携は削除しています
push_to_hub=False, # ここではHUBにモデルをプッシュしません
load_best_model_at_end=True, # 推奨
metric_for_best_model="eval_loss", # 推奨
evaluation_strategy="epoch" # 推奨
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized_train,
eval_dataset=tokenized_test,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics,
)
trainer.train()
return trainer.state.best_model_checkpoint
# 分散トレーニングで使用したトレーナーを使うのではなく、事前トレーニング済みモデルから個別のローカルトレーナーを作成することをお勧めします
def test_model(ckpt_path):
model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
local_trainer = Trainer(model=model,eval_dataset=tokenized_test,tokenizer=tokenizer,data_collator=data_collator,compute_metrics=compute_metrics)
return local_trainer.evaluate()
def test_example(ckpt_path, inputs):
from transformers import pipeline
model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
p = pipeline(task="sentiment-analysis", model=model, tokenizer=tokenizer)
outputs = p(inputs)
return ["Positive" if item["label"] == "LABEL_0" else "Negative" for item in outputs]
元のノートブックではチェックポイントをDBFSに出力していますが、私はUnity CatalogのVolumeにしています。
ローカルトレーニングの実行
single_node_ckpt_path = train_model()
test_model(single_node_ckpt_path)
{'eval_loss': 0.21723754703998566,
'eval_accuracy': 0.9142,
'eval_f1': 0.9113819458789506,
'eval_runtime': 96.8592,
'eval_samples_per_second': 51.621,
'eval_steps_per_second': 6.453}
複数GPUを持つシングルノードでの分散トレーニングの実行
Distributorにlocal_mode=True
を指定すると、Sparkクラスターのドライバーノードで直接train()
関数が実行されます。
この実行で合計いくつのGPUを使うのかを設定するには、Distributorでnum_processes=N
を指定します。N
は、ドライバーノードで使いたいGPUの数となります。お使いのトレーニングコードには実際には何の変更も加える必要が無いことに注意してください。
from pyspark.ml.torch.distributor import TorchDistributor
NUM_PROCESSES = torch.cuda.device_count()
print(f"We're using {NUM_PROCESSES} GPUs")
single_node_multi_gpu_ckpt_path = TorchDistributor(num_processes=NUM_PROCESSES, local_mode=True, use_gpu=USE_GPU).run(train_model)
We're using 4 GPUs
Started local training with 4 processes
:
test_model(single_node_multi_gpu_ckpt_path)
{'eval_loss': 0.2109755426645279,
'eval_accuracy': 0.9188,
'eval_f1': 0.9190590111642745,
'eval_runtime': 100.246,
'eval_samples_per_second': 49.877,
'eval_steps_per_second': 6.235}
このノートブックでは、異なるアプローチでファインチューニングを複数実行しています。また、チェックポイントは同じ場所を指定しています。ですので、
- OOMに遭遇する際には、クラスターを再起動し必要な部分のみ(ローカルトレーニングのみ、マルチGPUシングルノードの処理のみ、等)実行してください。
- 都度、チェックポイントをクリーンアップしてください。
マルチノードでの分散トレーニングの実行
TorchDistributorにlocal_mode=False
(デフォルト)を指定すると、Sparkクラスターのワーカーノードでtrain()
関数を実行します。
この実行で、合計いくつのGPUを使うのかを設定するには、TorchDistributorにnum_processes=TOTAL_NUM_GPUS
を指定します。使いのトレーニングコードには実際には何の変更も加える必要が無いことに注意してください。
from pyspark.ml.torch.distributor import TorchDistributor
NUM_PROCESSES = NUM_GPUS_PER_WORKER * NUM_WORKERS
print(f"We're using {NUM_PROCESSES} GPUs")
multi_node_ckpt_path = TorchDistributor(num_processes=NUM_PROCESSES, local_mode=False, use_gpu=USE_GPU).run(train_model)
We're using 2 GPUs
Started distributed training with 2 executor processes
:
こちらは、18分程度です。上にあるように合計2つのGPUしか使っていないことが影響していそうです。もう少し調べます。
Transformersのpipeline
APIでモデルをテスト
def test_example(ckpt_path, inputs):
from transformers import pipeline
model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
p = pipeline(task="sentiment-analysis", model=model, tokenizer=tokenizer)
outputs = p(inputs)
return [{i:"Positive"} if item["label"] == "LABEL_1" else {i:"Negative"} for i, item in zip(inputs, outputs)]
test_example(multi_node_ckpt_path, ["i love this movie", "this movie sucks!"])
[{'i love this movie': 'Positive'}, {'this movie sucks!': 'Negative'}]