はじめに
DistributedDataParallel(以下、DDP)に関する、イントロの日本語記事がなかったので、自分の経験をまとめておきます。
pytorchでGPUの並列化、特に、DataParallelを行う場合、チュートリアルでは、DataParallel Module(以下、DP)が使用されています。
更新: DDPも公式のチュートリアルが作成されていました。
DDPを使う利点
しかし、公式ドキュメントをよく読むと、DistributedDataPararell(以下、DDP)の方が速いと述べられています。(ソース) (実験した方がいるようで確かに少し速いです。)
また、transformersを使う場合など、tokenizeがdictで返されるため、DPを使う場合は、tokenizeの中身を入力する必要がありますが、transformersのモデルごとにkeyが異なっているので、モデルのインタフェースごとに実装を変える必要があり、大変です。 (kwargを使えば問題なかったです)
DDPを使うデメリット
マルチプロセスになるので、メモリ消費が多いと思います。なお、公式ではmp.spawnを使っており、メモリ消費量を抑えるためと思われます。
詳細情報
英語には、こちらやこちらが実装例としてありますのでご参考ください。
また、詳しい違いはこちらやこちらを、DistributedDataPararellの肝であるall-reduceはこちらを参照してください。
コードでの違い
DataParallel(DP)
DPの場合は、DataLoader。例えば、以下のような書き方になります。(チュートリアルのコードを一部改変)
以下のような、モデルとデータセットで例を記載します。
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
class RandomDataset(Dataset):
def __init__(self, input_size, output_size, length):
self.len = length
self.data = torch.randn(length, input_size)
self.label = torch.randn(length, output_size)
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return self.len
class Model(nn.Module):
# Our model
def __init__(self, input_size, output_size):
super(Model, self).__init__()
self.fc = nn.Linear(input_size, output_size)
def forward(self, input):
output = self.fc(input)
print("\tIn Model: input size", input.size(),
"output size", output.size())
return output
この時、DataParallelを使うと以下のような書き方になります。
def main():
input_size=5
output_size=2
data_size = 100
batch_size = 30
device = torch.device("cuda:0")
rand_loader = DataLoader(dataset=RandomDataset(input_size, output_size, data_size),
batch_size=batch_size, shuffle=True)
model = Model(input_size, output_size)
model = nn.DataParallel(model)
model.to(device)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
for data, label in rand_loader:
input = data.to(device)
output = model(input)
loss_fn(output, label)
optimizer.step()
DistributedDataParallel(DDP)
一方、DistributedDataParallel(DDP)を使うと以下のようになります。(基本的な書き方はこちらを参考)
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
def train(rank, n_gpu, input_size, output_size, batch_size, train_dataset):
dist.init_process_group("gloo", rank=rank, world_size=n_gpu)
# create local model
model = Model(input_size, output_size)
# construct DDP model
model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
sampler = DistributedSampler(train_dataset, num_replicas=n_gpu, rank=rank, shuffle=True)
rand_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
for data, label in rand_loader:
input = data.to(rank)
output = model(input)
label = label.to(rank)
# backward pass
loss_fn(outputs, label).backward()
# update parameters
optimizer.step()
def main():
n_gpu = 2
input_size = 5
output_size = 2
batch_size = 30
data_size = 100
dataset = RandomDataset(input_size, data_size)
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
mp.spawn(train,
args=(n_gpu, input_size, output_size, batch_size, dataset),
nprocs=n_gpu,
join=True)
ポイント
- DDPは、1プロセスにつき一つgpuを割り当てています。そのため、
rank
がdeviceを表します。 - 必ず、
dist.init_process_group()
で初期化する必要があります。バックエンド(例ではgloo
)についてはこちらを参照してください。gpuで実行する場合はnccl
が速いとのことです。 -
torch.multiprocessing.spawn は、第一引数に実行するの関数を指定し、argで関数に値を代入します。そして、
nproc
分のプロセスを並列実行します。この時、関数はf(i, *args)
の形で呼び出されます。そのため、train
の最初の変数をrank
とする必要があります。 - 環境変数として
MASTER_PORT
とMASTER_ADDR
を指定する必要があります。(おそらく、DDPは、マルチノードの並列化にも対応しているからだと思われます。その他関連する環境変数についてはこちら) - DPでは、メインのcpuからデータが供給され、モデルに入力された時点で各gpuに
batch_size//n_gpu
分のバッチが割り当てられます。一方、DDPでは、gradientのみを通信します。この時、プロセス毎に異なるデータを入力するために、DistributedSamplerを用いて実現しています。
tips
- 公式によると、saveは計算負荷軽減のため1プロセスだけで実行するようです。
- データの順序をランダムに変えるためには、epoch毎に
sampler.set_epoch(epoch)
とするようです。 - learning rate schedulerを使うときは、最大step数
len(DataLoader) * epoch / n_gpu
にする。おそらく、schedulerが各プロセスで独立に設定されるため。
DDPの拡張
DDPのfp16対応
トレーニング関数部分を以下のように変更します。
import apex
from apex import amp
def train(rank, n_gpu, input_size, output_size, batch_size, train_dataset):
torch.cuda.set_device(rank)
dist.init_process_group("gloo", rank=rank, world_size=n_gpu)
# create local model
model = Model(input_size, output_size)
# construct DDP model
model = apex.parallel.DistributedDataParallel(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)
model, optimizer = amp.initialize(model, optimizer, opt_level='O2')
sampler = DistributedSampler(train_dataset, num_replicas=n_gpu, rank=rank, shuffle=True)
rand_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
for data, label in rand_loader:
input = data.to(rank)
output = model(input)
label = label.to(rank)
loss = loss_fn(outputs, label)
# backward pass
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
# update parameters
optimizer.step()
変更点
-
torch.cuda.set_device(rank)
をあらかじめ実行します。 -
torch.nn.parallel.DistributedDataParallel
からapex.parallel.DistributedDataParallel
に変更します。 - fp16化するために、
amp.initialize
を実行します。 - 勾配計算の時にスケールの調整をするため、backwardを以下のようにします。
with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
マルチノード対応
torch.distributed.launch
を使います。公式の通り、それぞれのノードで以下のように実施します。(すみません。自分では実行していません。)
- node1
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
- node2
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
ポイント
個人的所感
- DPの方が最初に使うには楽。(dist.init_process_groupでエラーが起きるなどがある。)
- DDPの方が、自分で作ったロス関数を使う場合に、batchsizeが合わないなどの問題が起こりづらい.
- 推論で、embedingだけ残したい場合にDDPだと保存番号を考える必要があり少し手間