48
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

pytorch DistributedDataParallel 事始め

Last updated at Posted at 2021-01-22

はじめに

DistributedDataParallel(以下、DDP)に関する、イントロの日本語記事がなかったので、自分の経験をまとめておきます。

pytorchでGPUの並列化、特に、DataParallelを行う場合、チュートリアルでは、DataParallel Module(以下、DP)が使用されています。

更新: DDPも公式のチュートリアルが作成されていました。

DDPを使う利点

しかし、公式ドキュメントをよく読むと、DistributedDataPararell(以下、DDP)の方が速いと述べられています。(ソース) (実験した方がいるようで確かに少し速いです。)
また、transformersを使う場合など、tokenizeがdictで返されるため、DPを使う場合は、tokenizeの中身を入力する必要がありますが、transformersのモデルごとにkeyが異なっているので、モデルのインタフェースごとに実装を変える必要があり、大変です。 (kwargを使えば問題なかったです)

DDPを使うデメリット

マルチプロセスになるので、メモリ消費が多いと思います。なお、公式ではmp.spawnを使っており、メモリ消費量を抑えるためと思われます。

詳細情報

英語には、こちらこちらが実装例としてありますのでご参考ください。

また、詳しい違いはこちらこちらを、DistributedDataPararellの肝であるall-reduceはこちらを参照してください。

コードでの違い

DataParallel(DP)

DPの場合は、DataLoader。例えば、以下のような書き方になります。(チュートリアルのコードを一部改変)

以下のような、モデルとデータセットで例を記載します。

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

class RandomDataset(Dataset):

    def __init__(self, input_size, output_size, length):
        self.len = length
        self.data = torch.randn(length, input_size)
        self.label = torch.randn(length, output_size)

    def __getitem__(self, index):
        return self.data[index], self.label[index]

    def __len__(self):
        return self.len

class Model(nn.Module):
    # Our model

    def __init__(self, input_size, output_size):
        super(Model, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, input):
        output = self.fc(input)
        print("\tIn Model: input size", input.size(),
              "output size", output.size())

        return output

この時、DataParallelを使うと以下のような書き方になります。

def main():
    input_size=5
    output_size=2
    data_size = 100
    batch_size = 30
    device = torch.device("cuda:0")
    rand_loader = DataLoader(dataset=RandomDataset(input_size, output_size, data_size),
                             batch_size=batch_size, shuffle=True)
    model = Model(input_size, output_size)
    model = nn.DataParallel(model)
    model.to(device)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)

    for data, label in rand_loader:
        input = data.to(device)
        output = model(input)
        loss_fn(output, label)
        optimizer.step()

DistributedDataParallel(DDP)

一方、DistributedDataParallel(DDP)を使うと以下のようになります。(基本的な書き方はこちらを参考)

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP


def train(rank, n_gpu, input_size, output_size, batch_size, train_dataset):
    dist.init_process_group("gloo", rank=rank, world_size=n_gpu)
    # create local model
    model = Model(input_size, output_size)
    # construct DDP model
    model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    sampler = DistributedSampler(train_dataset, num_replicas=n_gpu, rank=rank, shuffle=True)
    rand_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)
    

    for data, label in rand_loader:
        input = data.to(rank)
        output = model(input)
        label = label.to(rank)
        # backward pass
        loss_fn(outputs, label).backward()
        # update parameters
        optimizer.step()

def main():
    n_gpu = 2
    input_size = 5
    output_size = 2
    batch_size = 30
    data_size = 100
    dataset = RandomDataset(input_size, data_size)
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    mp.spawn(train,
        args=(n_gpu, input_size, output_size, batch_size, dataset),
        nprocs=n_gpu,
        join=True)

ポイント

  1. DDPは、1プロセスにつき一つgpuを割り当てています。そのため、rank がdeviceを表します。
  2. 必ず、 dist.init_process_group() で初期化する必要があります。バックエンド(例ではgloo)についてはこちらを参照してください。gpuで実行する場合はncclが速いとのことです。
  3. torch.multiprocessing.spawn は、第一引数に実行するの関数を指定し、argで関数に値を代入します。そして、nproc分のプロセスを並列実行します。この時、関数は f(i, *args) の形で呼び出されます。そのため、trainの最初の変数をrankとする必要があります。
  4. 環境変数として MASTER_PORTMASTER_ADDRを指定する必要があります。(おそらく、DDPは、マルチノードの並列化にも対応しているからだと思われます。その他関連する環境変数についてはこちら)
  5. DPでは、メインのcpuからデータが供給され、モデルに入力された時点で各gpuに batch_size//n_gpu 分のバッチが割り当てられます。一方、DDPでは、gradientのみを通信します。この時、プロセス毎に異なるデータを入力するために、DistributedSamplerを用いて実現しています。

tips

  • 公式によると、saveは計算負荷軽減のため1プロセスだけで実行するようです。
  • データの順序をランダムに変えるためには、epoch毎にsampler.set_epoch(epoch)とするようです。
  • learning rate schedulerを使うときは、最大step数 len(DataLoader) * epoch / n_gpu にする。おそらく、schedulerが各プロセスで独立に設定されるため。

DDPの拡張

DDPのfp16対応

トレーニング関数部分を以下のように変更します。

import apex
from apex import amp

def train(rank, n_gpu, input_size, output_size, batch_size, train_dataset):
    torch.cuda.set_device(rank)
    dist.init_process_group("gloo", rank=rank, world_size=n_gpu)
    # create local model
    model = Model(input_size, output_size)
    # construct DDP model
    model = apex.parallel.DistributedDataParallel(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    model, optimizer = amp.initialize(model, optimizer, opt_level='O2')
    sampler = DistributedSampler(train_dataset, num_replicas=n_gpu, rank=rank, shuffle=True)
    rand_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=sampler)

    for data, label in rand_loader:
        input = data.to(rank)
        output = model(input)
        label = label.to(rank)
        loss = loss_fn(outputs, label)
        # backward pass
        with amp.scale_loss(loss, optimizer) as scaled_loss:
            scaled_loss.backward()
        # update parameters
        optimizer.step()

変更点

  • torch.cuda.set_device(rank) をあらかじめ実行します。
  • torch.nn.parallel.DistributedDataParallelから apex.parallel.DistributedDataParallelに変更します。
  • fp16化するために、 amp.initializeを実行します。
  • 勾配計算の時にスケールの調整をするため、backwardを以下のようにします。
    with amp.scale_loss(loss, optimizer) as scaled_loss:
        scaled_loss.backward()

マルチノード対応

torch.distributed.launch を使います。公式の通り、それぞれのノードで以下のように実施します。(すみません。自分では実行していません。)

  • node1
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=0 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  • node2
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node_rank=1 --master_addr="192.168.1.1"
           --master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

ポイント

  • master_addrmaster_port は全ノードで同じ値にする必要がありませす。

  • node_rank はノード毎に変える必要があります。

  • 公式, 参考1

個人的所感

  • DPの方が最初に使うには楽。(dist.init_process_groupでエラーが起きるなどがある。)
  • DDPの方が、自分で作ったロス関数を使う場合に、batchsizeが合わないなどの問題が起こりづらい.
  • 推論で、embedingだけ残したい場合にDDPだと保存番号を考える必要があり少し手間
48
21
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
48
21

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?