1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるHugging Faceモデルの分散ファインチューニング

Posted at

こちらのサンプルノートブックをウォークスルーします。

このノートブックでは、Transformers APITorchDistributor APIを用いて、Hugging Faceのモデルをどのようにファインチューニングするのかの例を示します。本書のファインチューニングのガイドはこちらのHugging Face blog postをベースにしています。

要件

  • Databricks Runtime ML 13.0以降
  • (推奨) GPUインスタンス

使用するGPUの数の定義

この例では、4つのワーカーノードを持つクラスターを使います。異なる設定のクラスターを使う場合には、それに合わせてNUM_WORKERSを更新してください。

私は2つのワーカーノードにしています。

Screenshot 2024-07-26 at 14.26.56.png

import torch

#NUM_WORKERS = 4
NUM_WORKERS = 2

def get_gpus_per_worker(_):
  import torch
  return torch.cuda.device_count()

NUM_GPUS_PER_WORKER = sc.parallelize(range(4), 4).map(get_gpus_per_worker).collect()[0]
USE_GPU = NUM_GPUS_PER_WORKER > 0

データの前処理

データの前処理のためのtokenizerとcollatorを初期化します。

import torch
from transformers import TrainingArguments, Trainer
from transformers import AutoTokenizer, DataCollatorWithPadding
from sklearn.model_selection import train_test_split

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

IMDBデータセットのインポートと前処理

Hugging Faceのブログとこのノートブックの大きな違いの一つは、この例では3000データポイントではなく、すべてのIMDBデータを使うということです。

from datasets import load_dataset # huggingface Datasetsオプションのインポート
import pandas as pd

imdb = load_dataset("imdb")
train = pd.DataFrame(imdb["train"])
test = pd.DataFrame(imdb["test"])

texts = train["text"].tolist()
labels = train["label"].tolist()

train_texts, val_texts, train_labels, val_labels = train_test_split(
    texts, labels, test_size=0.2)

train_encodings = tokenizer(train_texts, truncation=True)
val_encodings = tokenizer(val_texts, truncation=True)

class ImdbDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

tokenized_train = ImdbDataset(train_encodings, train_labels)
tokenized_test = ImdbDataset(val_encodings, val_labels)

トレーニング関数のセットアップ

TorchDistributor APIには、シングルノードでのマルチGPUトレーニングやマルチノードのトレーニングのサポートがあります。

train()関数にシングルノードのコードをラッピングする際、Databricksではライブラリのpickle処理の問題を回避するために、train()関数にすべてのimport文を含めることをお勧めします。train_model()で任意のpickle可能なオブジェクトを返却することができますが、処理グループなしにはpickle処理を行うことができないため、これはTrainerを返却できないことを意味します。代わりに、ベストなチェックポイントのパスを返却し、外部でそれを使うことができます。

import numpy as np
from datasets import load_metric

from transformers import AutoModelForSequenceClassification
model = AutoModelForSequenceClassification.from_pretrained("distilbert-base-uncased", num_labels=2)

def compute_metrics(eval_pred):
   load_accuracy = load_metric("accuracy")
   load_f1 = load_metric("f1")

   logits, labels = eval_pred
   predictions = np.argmax(logits, axis=-1)
   accuracy = load_accuracy.compute(predictions=predictions, references=labels)["accuracy"]
   f1 = load_f1.compute(predictions=predictions, references=labels)["f1"]
   return {"accuracy": accuracy, "f1": f1}
 
#output_dir = "/dbfs/rithwik-db/imdb/finetuning-sentiment-model-v1" # Save to DBFS (required)
output_dir = "/Volumes/users/takaaki_yayoi/imdb/finetuning-sentiment-model-v1" # Volumeに保存 (必須)

def train_model():
    from transformers import TrainingArguments, Trainer

    training_args = TrainingArguments(
      output_dir=output_dir,
      learning_rate=2e-5,
      per_device_train_batch_size=16,
      per_device_eval_batch_size=16,
      num_train_epochs=2,
      weight_decay=0.01,
      save_strategy="epoch",
      report_to=[], # ここではMLFLOW連携は削除しています
      push_to_hub=False,  # ここではHUBにモデルをプッシュしません
      load_best_model_at_end=True, # 推奨
      metric_for_best_model="eval_loss", # 推奨
      evaluation_strategy="epoch" # 推奨
    )

    trainer = Trainer(
      model=model,
      args=training_args,
      train_dataset=tokenized_train,
      eval_dataset=tokenized_test,
      tokenizer=tokenizer,
      data_collator=data_collator,
      compute_metrics=compute_metrics,
    )
    trainer.train()
    return trainer.state.best_model_checkpoint

# 分散トレーニングで使用したトレーナーを使うのではなく、事前トレーニング済みモデルから個別のローカルトレーナーを作成することをお勧めします
def test_model(ckpt_path):
  model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
  local_trainer = Trainer(model=model,eval_dataset=tokenized_test,tokenizer=tokenizer,data_collator=data_collator,compute_metrics=compute_metrics)
  return local_trainer.evaluate()

def test_example(ckpt_path, inputs):
  from transformers import pipeline
  model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
  p = pipeline(task="sentiment-analysis", model=model, tokenizer=tokenizer)
  outputs = p(inputs)
  return ["Positive" if item["label"] == "LABEL_0" else "Negative" for item in outputs]

元のノートブックではチェックポイントをDBFSに出力していますが、私はUnity CatalogのVolumeにしています。

ローカルトレーニングの実行

single_node_ckpt_path = train_model()

約30分を要しました。
Screenshot 2024-07-26 at 12.44.55.png

test_model(single_node_ckpt_path)
{'eval_loss': 0.21723754703998566,
 'eval_accuracy': 0.9142,
 'eval_f1': 0.9113819458789506,
 'eval_runtime': 96.8592,
 'eval_samples_per_second': 51.621,
 'eval_steps_per_second': 6.453}

複数GPUを持つシングルノードでの分散トレーニングの実行

Distributorにlocal_mode=Trueを指定すると、Sparkクラスターのドライバーノードで直接train()関数が実行されます。

この実行で合計いくつのGPUを使うのかを設定するには、Distributorでnum_processes=Nを指定します。Nは、ドライバーノードで使いたいGPUの数となります。お使いのトレーニングコードには実際には何の変更も加える必要が無いことに注意してください。

from pyspark.ml.torch.distributor import TorchDistributor

NUM_PROCESSES = torch.cuda.device_count()
print(f"We're using {NUM_PROCESSES} GPUs")
single_node_multi_gpu_ckpt_path = TorchDistributor(num_processes=NUM_PROCESSES, local_mode=True, use_gpu=USE_GPU).run(train_model)
We're using 4 GPUs
Started local training with 4 processes
:

今度は9分程度で完了しました。
Screenshot 2024-07-26 at 14.29.48.png

test_model(single_node_multi_gpu_ckpt_path)
{'eval_loss': 0.2109755426645279,
 'eval_accuracy': 0.9188,
 'eval_f1': 0.9190590111642745,
 'eval_runtime': 100.246,
 'eval_samples_per_second': 49.877,
 'eval_steps_per_second': 6.235}

このノートブックでは、異なるアプローチでファインチューニングを複数実行しています。また、チェックポイントは同じ場所を指定しています。ですので、

  • OOMに遭遇する際には、クラスターを再起動し必要な部分のみ(ローカルトレーニングのみ、マルチGPUシングルノードの処理のみ、等)実行してください。
  • 都度、チェックポイントをクリーンアップしてください。

マルチノードでの分散トレーニングの実行

TorchDistributorにlocal_mode=False (デフォルト)を指定すると、Sparkクラスターのワーカーノードでtrain()関数を実行します。

この実行で、合計いくつのGPUを使うのかを設定するには、TorchDistributorにnum_processes=TOTAL_NUM_GPUSを指定します。使いのトレーニングコードには実際には何の変更も加える必要が無いことに注意してください。

from pyspark.ml.torch.distributor import TorchDistributor

NUM_PROCESSES = NUM_GPUS_PER_WORKER * NUM_WORKERS
print(f"We're using {NUM_PROCESSES} GPUs")
multi_node_ckpt_path = TorchDistributor(num_processes=NUM_PROCESSES, local_mode=False, use_gpu=USE_GPU).run(train_model)
We're using 2 GPUs
Started distributed training with 2 executor processes
:

こちらは、18分程度です。上にあるように合計2つのGPUしか使っていないことが影響していそうです。もう少し調べます。
Screenshot 2024-07-26 at 14.32.33.png

Transformersのpipeline APIでモデルをテスト

def test_example(ckpt_path, inputs):
  from transformers import pipeline
  model = AutoModelForSequenceClassification.from_pretrained(ckpt_path, num_labels=2)
  p = pipeline(task="sentiment-analysis", model=model, tokenizer=tokenizer)
  outputs = p(inputs)
  return [{i:"Positive"} if item["label"] == "LABEL_1" else {i:"Negative"} for i, item in zip(inputs, outputs)]

test_example(multi_node_ckpt_path, ["i love this movie", "this movie sucks!"])
[{'i love this movie': 'Positive'}, {'this movie sucks!': 'Negative'}]

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?