33
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Slurmでpytorch distributed trainingをする

Last updated at Posted at 2020-01-11

Introduction

環境

  • slurm 18.08
  • pytorch 1.3

What is Slurm?

Slurmは、スパコンやコンピュータクラスタなどで主に科学技術計算用途で用いられるジョブスケジューラの一種。SGE, Torque, LSFなどを使ったことがあれば同様のものと思ってもらっていい。
私は過去、SGEとLSFは使ったことがあるが、簡単にSlurmのいいところをあげると

  • srunが便利(submit用のscriptを作らなくても、インタラクティブにコマンドを実行できる)
  • GPUのリソース管理ができる(GPUを使用するプログラムで排他的にDeviceを確保できる)
  • 複数ノード・複数プロセスでの並列実行のサポートが充実している。

今回の話は3つ目の特徴について。

What is PyTorch?

Facebookが開発したDeep learningのフレームワーク。

なぜSlurmでPyTorch distributed trainingをするのか

そこにSlurmがたまたまあって、私もたまたまPyTorchを使っているから。
まあ、あえていうなら、PyTorchのdistributed trainingはMPI likeに実装されているが、Slurmに限らずジョブスケジューラはMPIを想定して設計されており、相性が良いから。

DNNのdistributed trainingと言われているものは、私の認識では2種類あって、1つはTensorflowのように計算をするワーカーとパラメータサーバをわけて別々に起動するタイプで、もう一つがPyTorchやChainerMNなどで採用されているMPI型のもの。MPIはいわゆるSPMDモデルといわれるもので、基本的に並列に同じプログラムを走らせる。

Tensorflowでは、ワーカとパラメータサーバでそれぞれ異なる処理をするし、またそれぞれのワーカも異なる計算グラフを処理できるように設計されているので、プロセス毎に最適なリソースをわりあてるのが理想的。しかし、Slurmでそのように細かい条件まで指定するのは難しい。
これに対しMPI型では全てのプロセスが同じ処理をするので、同じスペックのマシンで動作させればよく、単純にノード数と合計プロセス数の指定だけでいい。

基本的に、ジョブスケジューラは不特定多数のユーザが条件の異なる大量のジョブを、別々のタイミングでサブミットした時に、自動的に効率よく割り当てすることを目的としているので、考え方としてMPIの方が相性が良い。
Tensorflowはどちらかというと、長期的に同じプログラムで、リソースを占有するような状況を想定しているのだと思われる。

Slurmの挙動の確認

Srunの並列処理に関するOption

Slurmにはsrunというインタラクティブにコマンド実行をするコマンドがある。srunはmpirunを意識してデザインされており並列に複数ノード・複数プロセスをたてることができる。また適切にbuildすればmpirunの代わりにMPIのプログラムを動作させることもできる。

次のoptionうち、1-2個を指定することでNode数とTask数を制御してコマンドを並列実行できる。

  • -n, --ntasks: Taskというのは実行されるプロセスを指す。MPIでいうとnumber-of-tasks = world_sizeになる。
  • -N, --nodes=<min[-max]>: Node(ホスト)数を指定する。-でつなぐと、min-max数を指定できる。 e.g. 3-5だと最小3nodesで、最大5nodes。
  • --ntasks-per-node: そのまま。
  • --nodelist: Nodeのリストを直接指定する。

以下のようにhostnameコマンドをsrunで投げれば挙動は把握できると思う。

# 3nodes-3tasks
srun -n3 -N3 hostname
# 3nodes-3tasks
srun -n3 --ntasks-per-node 1 hostname
# 2nodes-4tasks
srun --nodelist host1,host2 -n4 hostname

-Nよりも大きな-nを指定した場合、実際に各nodeに割り当てるられるtaskの数はschedulerによって自動的に決められる。リソースが余っている場合に、一つのNodeになるべく多くのtaskを割り当てるか、または均等に割り当てるかは、Slurmの設定ファイルで制御できる。

この他に

  • -c, --cpus-per-task: 各TaskごとにAllocateするCPUの数

も利用することになると思う。1Taskからprocessをforkする場合はこの数をかえなくてはいけない。

--gres gpuの挙動

DNNのprogramは多くの場合、CUDAでGPUを利用するように書かれている。
SlurmでGPUをallocateする場合、--gres gpu:<gpuの数>というoptionをつければよい。

gresは使ってみるとわかるが、どうやら挙動的にはngpu-per-nodeとして動作する。例えば次のように実行すると、

srun -n3 -N2 --gres gpu:2 bash -c 'echo $(hostname), $(nvidia-smi -L)'

同じNodeでallocateされたGPUは1個目と2個目のTaskで同一のUUIDであることが確認できるはず。
(※slurmのcgroupsを有効化させた状態である場合は、nvidia-smiはallocateされたGPUだけを表示する。)

この仕様上、各ノードに異なるGPU数を確保することはできない。

Slurm19.05以降のversionだと、gpuに関するoptionが強化されており、--num-gpus-per-taskといった指定ができるようになったので、Nodeごとに異なるGPUを確保することができるようになっているらしい。今回の説明ではこれは用いない。

Environment variable

Slurmでlaunchされたプロセスはいくつかのenvironment variableが設定される。
man sbatchで確認すればそれらの意味が確認できる。

普通のMPIプログラムをSlurmで実行するときはRANKなどの情報は自動的に認識されるので関係ないが、PyTorchの分散並列をする場合は自分で適切にinitializeする必要があり、このときにSlurmが設定する環境変数から情報を取得する必要がある。

以下の表に参照しそうな環境変数を示す。"または"と説明に書いているのは過去のversionで使われていた変数を示し、backward compatiblitiyのために最新のversionでも設定されているとのこと。Slurmはversion updateで環境変数を変更しているので、実際何が存在するかは自分の環境で確認した方が良い。

Variable 説明
SLURM_NTASKS Task数(=MPI World size)
SLURM_PROCID MPI Rank
SLURM_LOCALID 同じNode内のTaskに割り振られたID。0から始まる。LOCAL_RANKに対応する。
SLURM_NODEID Nodeに割り振られたID。0から始まる
SLURM_NTASKS_PER_NODE --ntasks-per-nodeが与えられた場合のみ、その値が設定される
SLURM_TASKS_PER_NODE 各NodeのそれぞれのTask数。e.g. "3,1(x3)"となっていたら、4nodeにそれぞれ3,1,1,1のTask数になる。
SLURMD_NODENAME Scriptを実行しているNode。hostnameコマンド等で取得するのと同じ値になるはず。
SLURM_JOB_NUM_NODES(または SLURM_NNODES ) Node数
SLURM_JOB_NODELIST(または SLURM_NODELIST) Nodeの名前リスト
SLURM_STEP_NUM_NODES SLURM_JOB_NUM_NODESのstep版。srunのときはこちらを参照する
SLURM_STEP_NODELIST SLURM_JOB_NODELISTのstep版。srunのときはこちらを参照する

例えば

% srun -n5 -N3 bash -c 'echo SLURM_NTASKS=$SLURM_NTASKS SLURM_NODEID=$SLURM_NODEID, SLURM_LOCALID=$SLURM_LOCALID, SLURM_PROCID=$SLURM_PROCID'
SLURM_NTASKS=5, SLURM_NODEID=0, SLURM_LOCALID=0, SLURM_PROCID=0
SLURM_NTASKS=5, SLURM_NODEID=0, SLURM_LOCALID=1, SLURM_PROCID=1
SLURM_NTASKS=5, SLURM_NODEID=1, SLURM_LOCALID=0, SLURM_PROCID=2
SLURM_NTASKS=5, SLURM_NODEID=1, SLURM_LOCALID=1, SLURM_PROCID=3
SLURM_NTASKS=5, SLURM_NODEID=2, SLURM_LOCALID=0, SLURM_PROCID=4

みたいになる。このあたりの変数を主に参照することになると思う。

JOBとSTEPについて

上の環境変数一覧にあった、SLURM_JOB_NODELISTとSLURM_STEP_NODELISTの違いについて説明する。
SLURM_STEP_*という変数はsrunからコマンドをなげた場合にのみ設定される。SLURM_JOB_はsbatch, sallocate, srunいずれの場合も設定される。以下は、細かい話になるので、読み飛ばしてもいい。

まずSlurmにはJobとStepという概念が存在する。

名前 説明
Job sbatch, srun, sallocateによって作られる
Step srunから作られる

(参考: https://slurm.schedmd.com/job_launch.html )

(この説明が正しいのか、よくわからないが)Jobの下にstepが作られ、stepから複数のTaskがlaunchされるという関係にある。単にsrunを使った時は1Job-1Stepになる。Sbatchでコマンドを使った時はstepは作られない。

単純に使っていると2step以上の状況は発生しないと思うが、以下のようにsbatchで投げるスクリプトの中で(またはsallocateした状態で)、複数回srunを呼び出した場合、1jobの中で複数stepがぶらさがることになる、と理解した。

#!/bin/bash
#SBATCH -n2
#SBATCH -N2
srun -n1 -N1 <command> &
srun -n1 -N1 <command> &
wait

この時の挙動は独特だが、sbatchで確保されたリソース内で、srunがさらにstepごとにリソースを確保する。つまり、sbatchでJobがallocateされた時点で、-n2 -N2のリソースが確保されているわけだが、例えばここでhost1,host2というhostが確保された場合、srunはこのうちhost1またはhost2のどちらかをallocateしたstepが生成されることになる。

SlurmでPyTorchのdistributed trainingをする

Slurmに関係する話はここまでで、以下、PyTorchに興味がある人だけ見ていってほしい。

PyTorchのdistributed trainingをする選択肢としては現状2種類ある。

  • PyTorch自体に含まれているオフィシャルのパッケージ。
  • horovodのPyTorchサポート

PyTorchのDistributed trainingのAPIについて

PyTorchのDistributed関係のクラスの使い方はドキュメント等を見ればわかると思うので、ある程度知っているものとして話をする。

ここでは、Data Parallel型の分散並列を想定する。つまりMini-batchをプロセス数で分割して、それぞれのプロセスごとにLossを計算する方法。絵にするとこんな感じ。

- Single processのとき
             Loss
               ^
               |
             Model
               ^
               |
|Sample0,Sample1,....,SampleN| <= Mini-batch

- 2 Processのとき

                               Loss
                                 ^
                                 |
               -------------------------------------
               |                                   |
             Loss                                Loss                              
               ^                                   ^  
               |                                   |
             Model                               Model
               ^                                   ^
               |                                   |
|Sample0,Sample1,....,SampleN/2-1|   |SampleN/2,SampleN/2+1,....,SampleN|
          (Process0)                            (Process1)

本来は、通常のMPIプログラムと同様にall_reduceやbroadcastなどを使って、上の処理を実装する必要があるが、PyTorchではData Parallelに関しては、この処理をwrapした以下のAPIを提供しており、かなり簡単にこれを行うことができる。

DistributedDataParallelは複雑だが、DistributedSamplerは短いコードなので中身を確認した方がいい。

exampleとして最小構成のスクリプトを作った。

import os
import torch
import torch.nn
import torch.utils.data.distributed

class Dataset:
    def __getitem__(self, x):
        return torch.randn(10)

    def __len__(self):
        return 128

local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
torch.distributed.init_process_group(backend="nccl", init_method="env://")

print(
    "hostname={}, LOCAL_RANK={}, RANK={}, WORLD_SIZE={}".format(
        os.uname()[1], 
        local_rank,
        torch.distributed.get_rank(),
        torch.distributed.get_world_size(),
    )
)

model = torch.nn.Linear(10, 10).cuda()
model = torch.nn.parallel.DistributedDataParallel(
    model, device_ids=[local_rank], output_device=local_rank,
)

train_dataset = Dataset()
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset,
    num_replicas=torch.distributed.get_world_size(), 
    rank=torch.distributed.get_rank(),
)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=16, sampler=train_sampler
)

for data in train_loader:
    data = data.cuda()
    model(data).sum().backward()
print("DONE")
  • 上のプログラムを複数プロセス立ち上げることで使用する(起動の方法については後述)。
    • DistributedSamplerによってMini-batchが自動的にプロセス数で分割されることでData Parallelが実現されている。
  • backward()を行うと、自動的に各プロセスごとのGradinetがreduceされプロセスの数で平均される
    • 平均なのでLearning rateは1Processのときと変更する必要はない。
    • batch sizeが、プロセス数で割り切れないときは、Normalizeしないと結果が変わってしまうことに注意。
  • init_process_group()のbackendは今回はGPUを使うのでncclを選択する。
  • LOCAL_RANKはNode内に存在するProcessにわりあてたIDで、GPU device IDに対応させる。
    • 私は最初、CUDA_VISIBLE_DEVICES=$LOCAL_RANKとすれば、プログラム側でLOCAL_RANKを処理する必要ないので楽だと考えたが、そうすると"nccl runtime error: unhandled system error"がでた。他のGPUと通信するのに、他のGPUが見えないとダメってことか。まあそりゃそうか。
    • 通常のMPIプログラムのWORLD_SIZEとRANKに加えて、LOCAL_RANKが必要になるのが特徴的。

上の例では、1Processあたりに1GPUを使う例として書いているが、DistributedDataParallel()device_idsに複数のdevice idを指定すると、Node間ではSingle ProcessでMulti ThreadsによるMultiGPUsを使うModeに切り替えられる。これは1Nodeでのtorch.nn.parallel.DataParallel()と同等の働きになる。

model = torch.nn.parallel.DistributedDataParallel(
    model, device_ids=[0,1,2],
)

PyTorchのドキュメントに書いてあるが、この方法のDataParallelはProcessごとにGPUを使う場合に比べ高速ではないらしい。

私の理解としては

  • PythonはGILがあり、multi threadingを使う場合でも複数のCPU coreを同時に動作させることができない(CPUでの並列処理はThreadではできない)。
    • 全ての処理がGPU上で計算されるなら、このことは問題にならないはずだが、どこかにCPUボトルネックが存在するのだろう。
  • DataParallel()はforward-backward部分しか並列化されないが、DistributedDataParallel()はData Loader等も並列化できる。

少し面倒になるのは否めないが、たとえSingle nodeでMulti GPUsを使う場合でもDataParallel()よりもDistributedDataParallel()でProcessごとにGPUを使った方がいい。

init_methodについて

MPIはプロセス間通信の機能とプロセスの管理機能(プロセスのグループを作り、RANKの割り当てやWORLD_SIZEの設定などをする)の2つの機能で構成されている。
PyTorch distributedが現状提供しているのは、このうちの通信の部分だけ。RANKやWORLD_SIZEなどを行うためのInitializationについては手動で行う必要がある(後述するHorovodでは、このinitializationにMPIを利用することができるので、これが自動化される)。

PyTorchが用意しているinitializationの方法は、
https://pytorch.org/docs/stable/distributed.html#initialization

  • Specify store, rank, and world_size explicitly.
  • Specify init_method

の2種類あると書かれている。前者のstoreの使い方の説明は見つけられなかった。いやほんとうに見つけられない。init_methodの方法についても、さらに3種類が紹介されている。

  1. 環境変数を使う

    torch.distributed.init_process_group("nccl", init_method="env://")
    

    このときは、MASTER_ADDRとMASTER_PORTとWORLD_SIZEとRANKの環境変数が参照される。
    MASTERというのはRANK0のプロセスを立てたホストのこと。MASTER_PORTは、空いているポート番号ならなんでもいい。

  2. TCP initializtion

    torch.distributed.init_process_group(
        "nccl", init_method="tcp://10.1.1.20:23456",
        rank=0, world_size=4)
    

    この場合、環境変数が参照されず、rankとworld_sizeの引数を渡す必要がある。内部処理としてはやっていることは"env://"と同じだと思われる(ソースコードは確認していない)。

  • Shared file-system initialization

    torch.distributed.init_process_group(
        "nccl", init_method="file:///mnt/nfs/sharedfile",
        rank=0, world_size=4)
    

    この機能を使うためには、NFSなどのShared file-systemを使っていて、共通のfileにアクセスできる状況で、かつそのfile-systemはfcntlをsupportしていなければならない(NFSなど、多くのfile systemでsupportするので、これはあまり気にしなくていいと思う。)。
    また、ここで使用するファイルはプロセス開始時に新規作成されて、終了時に削除しようとするが、もしもなんらかの理由で前のプロセスで使ったfileが残ってしまっていて、かつ、それを再利用しようとすると予期しない挙動になる可能性がある。
    たぶん、ベストプラクティスとしては、毎回ランダム文字列のファイル名を指定することだと思う。

Shared file-systemがもし使えるなら、これが楽だと思う。
現実的には、RANK0のホスト名を取得するのはわりと面倒だし、また、空いているポートを安全に取得するのは、実際にはかなり難しい。

PyTorchのDistributed trainingをするときのプロセスの起動方法について

普通のMPIのプログラムの場合、mpirunでプログラムを起動させるが、PyTorchでは(bakcend=mpiではない場合は)特別な起動スクリプトはなくても動作させることができる。
Primitiveな例として、下のようにsshなどでホストにそれぞれログインして、background jobでプロセスを立ち上げてみる。

% ssh host1
host1% MASTER_ADDR=host1 MASTER_PORT=20000 WORLD_SIZE=4 RANK=0 LOCAL_RANK=0 python train.py &
host1% MASTER_ADDR=host1 MASTER_PORT=20000 WORLD_SIZE=4 RANK=1 LOCAL_RANK=1 python train.py &
% ssh host2
host2% MASTER_ADDR=host1 MASTER_PORT=20000 WORLD_SIZE=4 RANK=2 LOCAL_RANK=0 python train.py &
host2% MASTER_ADDR=host1 MASTER_PORT=20000 WORLD_SIZE=4 RANK=3 LOCAL_RANK=1 python train.py &

上の例では、backgroundにプロセスを立てたが、このlaunchを行う方法として、
https://pytorch.org/docs/stable/distributed.html#launch-utility
 で2種類の方法が紹介されている。

  • torch.distributed.launchを使う
  • torch.multiprocessing.spawn()を使う: Imagenetのexampleの方式

私はmpirunみたいに、ssh経由で各ノードごとにプロセスを立てる手段を提供しているのだと最初思ったが、この2つはどちらも各ノード内でプロセスをたてる手段のみを提供する。sshなどのホスト間でプロセスを立ち上げる手段と組みわせて使用しなければいけない。

launch.py

だいたいこんな感じで使う。

% ssh host1 python -m torch.distributed.launch \
  --nproc_per_node=2 --nnodes=2 --node_rank=0 \
  --master_addr="host1" \
  --master_port=20000 main.py ... &
% ssh host2 python -m torch.distributed.launch \
  --nproc_per_node=2 --nnodes=2 --node_rank=1 \
  --master_addr="host1" \
  --master_port=20000 main.py ... &

launch.pyのソースコードは短いので読めば、なにをやっているかわかると思う。別にたいしたことはなくて、環境変数の設定とsubprocessで引数でもらったコマンドを複数起動させているだけである。

launch.pyは環境変数LOCAL_RANKまたは--local_rankというoptionを与えた状態でスクリプトを起動する。

torch.multiprocessing.spawn()を使う

imagenetのexampleにtorch.multiprocessing.spawn()を使った方法が紹介されている。

これには--multiprocessing-distributedというoptionがある。これはlaunch.pyでプロセスを複数起動する代わりにspawnによってプロセスを起動させる。単にプロセスの立て方の違いで、本質的にはこの2つはほぼ同じ。

また--multiprocessing-distributedを使わない場合はプロセスをたてずにThreadを使ったMulti-GPUsを行う。

Slurmでpytorch distributedを用いるパターン

以上を踏まえると、Slurmで分散並列をするやり方としては以下の3種類が考えられると思う。

  • srunでlaunch.pyのようなスクリプトを各ノードにばらまき、launch.pyからプログラムを複数起動させる
|          Host0        |          Host1        |
|          Task0        |          Task1        |  <= srun -c2 --ntasks-per-node 1 -N2 --gres gpu:2
|        Process0       |        Process1       |
|Process0   |Process1   |Process0   |Process1   |  <= launch.py --nproc_per_node=2
|RANK0      |RANK1      |RANK2      |RANK3      |
|LOCAL_RANK0|LOCAL_RANK1|LOCAL_RANK0|LOCAL_RANK1|
|GPU0       |GPU1       |GPU0       |GPU1       |
  • srunでGPUの数と同じだけ、プロセスをたてる
|         Host0         |         Host1         |
|Task0      |Task1      |Task2      |Task3      |  <= srun -c1 --ntasks-per-node 2 -N2 --gres gpu:2   
|Process0   |Process1   |Process0   |Process1   |   
|RANK0      |RANK1      |RANK2      |RANK3      |
|LOCAL_RANK0|LOCAL_RANK1|LOCAL_RANK0|LOCAL_RANK1|
|GPU0       |GPU1       |GPU0       |GPU1       |
  • srunでノード毎にプロセスを立て、single ProcessでMulti GPUsをつかう。
|      Host0    |      Host1    |
|      Task0    |      Task1    |  <= srun -c2 --ntasks-per-node 1 -N2 --gres gpu:2
|    Process0   |   Process1    |
|     RANK0     |     RANK1     |
|Thread0|Thread1|Thread0|Thread1|  <= Pythonのthreading
|GPU0   |GPU1   |GPU0   |GPU1   |

この場合、1Processで複数のGPUを処理するので、LOCAL_RANKが必要ないことに注意。

srunからlaunch.py経由でプロセスを立てるパターン

さきほどのlaunch.pyの例では--nnodeなどの引数を渡していたが、これをslurmが設定する環境変数を読み取って設定する必要がある。

例えば以下のようにする。

#!/bin/bash
# launch_in_slurm_step.sh
set -euo pipefail
export NCCL_DEBUG=INFO

# Derive the first node
MASTER_ADDR=$(scontrol show nodelist ${SLURM_STEP_NODELIST} | head -n 1)

# Launch the equal number of processes to the number of allocated devices
NGPU=$(nvidia-smi -L | wc -l)

python -m torch.distributed.launch \
  --nproc_per_node="${NGPU}" \
  --nnodes="${SLURM_JOB_NUM_NODES}" \
  --node_rank="${SLURM_NODEID}" \
  --master_addr="${MASTER_ADDR}" $@
srun -c<GPU数> -N<node数> --num_tasks_per_node 1 --gres gpu:<GPU数> launch_in_slurm_step.sh --master_port 20000 main.py ...

NCCL_DEBUG=INFOについてはhttps://docs.nvidia.com/deeplearning/sdk/nccl-developer-guide/docs/env.html を参照。とくに理由がなければ、設定した方がいい。

srunで直接プロセスを立てるパターン

#!/bin/bash
# main.sh
set -euo pipefail
export NCCL_DEBUG=INFO

# Derive the first node
export MASTER_ADDR=$(scontrol show nodelist ${SLURM_STEP_NODELIST} | head -n 1)
export WORLD_SIZE="${SLURM_NTASKS}"
export RANK="${SLURM_PROCID}"
export LOCAL_RANK="${SLURM_LOCALID}"

python main.py $@
# slurmはdefaultではすべての環境変数を伝搬させる
MASTER_PORT=20000 srun -c<GPU数> -N<node数> --num_tasks_per_node 1 --gres gpu:<GPU数> main.sh ...

Horovodを使う場合

Horovodはuberが開発したDNN distributed trainingののMPI実装のbackend。
記事の冒頭でのべたように、TensorFlowの分散並列は構成の自由度が高すぎて敷居が高いため、TensorFlowでもMPI baseの分散並列をできるようにする、というコンセプトで開発しているらしい。
Tensorflow,Keras以外にもPyTorch, MXNetもサポートしている。意識高い。
MPIでinitializeしてNCCLで通信するという構成の模様。ChainerMNと似たような感じ?

Horovodのinstall

  1. Open MPIをinstallする

    1. Download: https://www.open-mpi.org/software/ompi/v4.0/
    2. Install: https://www.open-mpi.org/faq/?category=building#easy-build

    MPIは必須ではないらしく、MPIをInstallせずにhorovodをinstallすると、glooが使われる。ただしREADMEにはglooのサポートはDeveloment中と書いてある。

  2. NCCLをInstallする
    NCCLもoptionだが、GPUを使う場合はこちらのほうが高速なので入れたほうがいい。https://developer.nvidia.com/nccl/nccl2-download-survey からDownloadしてくる。

  3. pip install horovod

    HOROVOD_NCCL_HOME=<ncclのdirectory> HOROVOD_GPU_ALLREDUCE=NCCL HOROVOD_WITHOUT_TENSORFLOW=1 HOROVOD_WITHOUT_MXNET=1 pip install horovod
    

    NCCLを使ってbuildする方法についてはhttps://github.com/horovod/horovod/blob/master/docs/gpus.rst を参照した。
    デフォルトではtensorflowやmxnet用のbuildも行うが、今回はPyTorchのみでいいので環境変数で制御している。

以下のようにするとbuildの状態を確認できる。

% horovodrun --check-build
Horovod v0.18.2:

Available Frameworks:
    [ ] TensorFlow
    [X] PyTorch
    [ ] MXNet

Available Controllers:
    [X] MPI
    [X] Gloo

Available Tensor Operations:
    [x] NCCL
    [ ] DDL
    [ ] MLSL
    [X] MPI
    [X] Gloo  

正直、こんだけ依存関係が複雑だとハマりまくると予想していたが、pipで簡単にinstallできた。

HorovodのPyTorch API

PyTorchのdistributedのドキュメントに比べたらREADMEがわかりやすいので、Horovodの使い方は読めばわかると思う。
一部抜粋してまとめると、まずPyTorch向けのAPIの使い方はここにある。

このままだと動かないので、てきとうに補完した。

import os

import horovod.torch as hvd
import torch
import torch.utils.data

# Initialize Horovod
hvd.init()

class Dataset:
    def __getitem__(self, x):
        return torch.randn(10)

    def __len__(self):
        return 128

print(
    "hostname={}, LOCAL_RANK={}, RANK={}, WORLD_SIZE={}".format(
        os.uname()[1], hvd.local_rank(), hvd.rank(), hvd.size(),
    )
)

# Pin GPU to be used to process local rank (one GPU per process)
torch.cuda.set_device(hvd.local_rank())

train_dataset = Dataset()
# Partition dataset among workers using DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
)

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=16, sampler=train_sampler
)

model = torch.nn.Linear(10, 5).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)
# Add Horovod Distributed Optimizer
optimizer = hvd.DistributedOptimizer(
    optimizer, named_parameters=model.named_parameters()
)

# Broadcast parameters from rank 0 to all other processes.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for data in train_loader:
    data = data.cuda()
    optimizer.zero_grad()
    model(data).sum().backward()
    optimizer.step()
print("DONE")

とくにhvd.local_rank()でLOCAL_RANKを取得できるのが重要。これは通常のMPIでは(たぶん)取得することはできない。

Launch

SlurmでHorovodを実行する場合は、"Slurmで直接プロセスを立てる"パターンに対応する。

Horovodでは、プロセスをlaunchするには、horovodrunというコマンドを使う。

  • To run on a machine with 4 GPUs:
$ horovodrun -np 4 -H localhost:4 python train.py
  • To run on 4 machines with 4 GPUs each:
$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py

https://github.com/horovod/horovod/blob/master/docs/mpirun.rst によるとhorovodrunmpirunにoptionを付け加えたものと等価らしい。

horovodrun -np 4 python train.py
# Equivalent Open MPI command:
mpirun -np 4 \
    -bind-to none -map-by slot \
    -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
    -mca pml ob1 -mca btl ^openib \
    python train.py

そして、SlurmはMPIをsupportしているので、sbatchでmpirunをそのまま実行すればよい。

#!/bin/bash
#SBATCH -N 2
#SBATCH --ntasks-per-node 4
#SBATCH --gres=gpu:4
mpirun \
    -bind-to none -map-by slot \
    -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH \
    -mca pml ob1 -mca btl ^openib \
    python train.py

という感じ。Slurmを使うときは-npは与えなくてもいい。簡単だ。

ただし状況に応じて、mpirunのnic関係のoptionの追加が必要になると思う。

おわりに

長々とSlurmの説明をしてきたが、Horovodならmpirunを使えるので、Slurm環境下で透過的に使用することができるため、この記事の大半の内容は理解する必要はなかった。

プログラム対応はHorovodの方が楽なのは間違いないが、そもそもMPIを動かせる環境を作ること自体が結構面倒なので、どっちが楽とも言えない気がする。

関係ないが、PyTorch1.5のドキュメントをたまたまのぞいていたら、
https://pytorch.org/docs/master/rpc.html#distributed-rpc-framework
というのができていた。RPCと言っているからDistributed Tensorflowみたいなのかな。

参考

33
25
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
25

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?