なんの記事?
pytorchのDistributedDataParallel
についての日本語記事があまりにもなかったため,素人がまとめました.
並列処理がわからない人による,わからない人のための,とりあえず使えればいいや的なDDPの解説です.
基本的にABCIでの実行を前提に書かれていますが,それ以外の環境の人たちにも参考になれば幸いです.
はじめに
おなじみの機械学習フレームワークであるpytorch.気軽にDataParallel
で並列処理の学習もできます.
ですがfacebookなどの一流の機械学習エンジニアたちはDistributedDataParallel
なるものを使った実装がちらほらみられます.
そこでpytorchの解説記事を読むわけですが,これがびっくりするほどわからない.
というわけで,ABCI上でのDistributedDataParallel
(以下DDP)の使い方を自分なりに調べてまとめてみました.
並列処理がどのように動いているかとかの詳しい解説は正直筆者も理解していないので,コードベースでどのように書けば動くかについての解説になっています.
なぜDDP?
DDPは複数ノードでの並列学習のために実装されています.**つまり複数マシンあるいは複数ノードで一つのモデルを学習したい人以外は基本的に使う必要がないです.**ただ,本記事にたどりつく人は複数ノードを使いたいと言う特殊な需要を持っている方々であるかと思います.ちなみに自分は産総研のABCIを使用していてDDPをするようになりました.この解説もABCIを前提としているので所々他の環境では動かない・そもそも前提が違うなどの点もあるかと思いますがご了承ください.
最低限の知識
DDPを行う際のアーキテクチャです.マスターノードとそうでないノードがあり,それぞれに同じモデルを載せて学習します.ノードは別々のコンピューターと読み替えることもでき,DataParallel
では同一PCの異なるGPUの間で並列処理を行っていましたが,DistributedDataParallel
ではPCもGPUもまたいだ並列処理を行います.
二つのPCで一つのモデルの学習を行うので,ノード間の通信にTCPやその他通信を用いたりします.また,ノードごとのプロセスランク(図でいうところのGPUID)とノードを超えたプロセスランク(図でいうところのglobal rank)が考えられます.
コード
コードです.
シェルスクリプト上でmain.py
を実行する形式です.
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
mpirun --hostfile ./hostfile -np $NHOSTS python main.py
import torch
import torch.nn as nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
# DDPのセットアップ用関数
def setup(rank, world_size, port_num):
current_dir = os.getcwd()
with open(current_dir + "/hostfile") as f:
host = f.readlines()
host[0] = host[0].rstrip("\n")
dist_url = "tcp://" + host[0] + ":" + str(port_num)
print(dist_url)
# initialize the process group
dist.init_process_group(
"nccl", init_method=dist_url, rank=rank, world_size=world_size
)
print("tcp connected")
# main関数
def main(gpu_rank, world_size, node_rank, gpu, port_num):
""" Main """
# Setup Distributed Training
# gpu_rank: 0~4 in ABCI, i.e. intra gpu rank
# world size: total process num
print(node_rank)
rank = gpu * node_rank + gpu_rank # global gpu rank
if rank == 0:
print("num_gpu:{}".format(gpu))
print("global rank:{}".format(rank))
print("intra rank:{}".format(gpu_rank))
# set up communication setting between nodes
setup(rank, world_size, port_num)
##########ここに学習の処理を書く##############
##########################################
if __name__ == '__main__':
# ddp setup
node_rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) # Process number in MPI
size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) # The all size of process
print("node rank:{}".format(node_rank))
print("size of process:{}".format(size))
gpu = torch.cuda.device_count() # gpu num per node
world_size = gpu * size # total gpu num
print(world_size)
port_num = 50000
mp.spawn(main, nprocs=gpu, args=(world_size, node_rank, gpu, port_num))
main.py
の中に学習処理を書けば動きます.デバッグ用にあえてprint文は残してありますがなくてもいいです.
解説
multi.sh
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
早速ABCI限定の話です.
まずSGE_JOB_HOSTLIST
なる環境変数を./hostfile
というファイルに記録します.使用しているノード一覧を取得しているようです.
次にHOST
という環境変数をHOSTNAME
の一部からとってきています.ABCIではノードの名前になっていると思います.
mpirun --hostfile ./hostfile -np $NHOSTS python main.py
並列処理のためにopenmpiを使い,その中でmain.py
を実行しています.したがってopenmpiが必要です.ABCIではmodule load openmpi
で入れられます.
$NHOSTS
はホストの数,つまりノードの数です.
main.py
if __name__ == '__main__':
# ddp setup
node_rank = int(os.environ["OMPI_COMM_WORLD_RANK"]) # Process number in MPI
size = int(os.environ["OMPI_COMM_WORLD_SIZE"]) # The all size of process
print("node rank:{}".format(node_rank))
print("size of process:{}".format(size))
gpu = torch.cuda.device_count() # gpu num per node
world_size = gpu * size # total gpu num
print(world_size)
# ポート番号の指定
port_num = 50000
# プロセスのスポーン
mp.spawn(main, nprocs=gpu, args=(world_size, node_rank, gpu, port_num))
環境変数の取得→ポート番号の指定→プロセスのスポーン
という流れです.
ノードの数とノード毎のGPUの数を把握するために環境変数の取得を行います.
次にポート番号を指定します.ポート番号といきなり言われても困惑すると思いますが,通信のために必要になります.
最後にプロセスの実行です.mp.spawn()
によりmain
関数をnprocs
分だけ実行しています.今回はgpuの数だけプロセスを動かしていますね.
def main(gpu_rank, world_size, node_rank, gpu, port_num):
""" Main """
# Setup Distributed Training
# gpu_rank: 0~4 in ABCI, i.e. intra gpu rank
# world size: total process num
print(node_rank)
rank = gpu * node_rank + gpu_rank # global gpu rank
if rank == 0:
print("num_gpu:{}".format(gpu))
print("global rank:{}".format(rank))
print("intra rank:{}".format(gpu_rank))
# set up communication setting between nodes
setup(rank, world_size, port_num)
##########ここに学習の処理を書く##############
##########################################
mp.spawn()
により実行されるmain関数の中身です.args=()
で指定した引数が渡されています.
変数の中身の説明はコメントアウトの通りで,このうちrank
がDDPの肝になります.rank
はプロセスのIDになっていて,0がマスタープロセスです.
このmain関数はmp.spawn()によって指定したプロセスの数だけ実行されていて,それぞれのプロセスにIDがついているわけですね.
main関数でsetupを実行し,あとは通常通り学習を回します(model.to
の指定が変わったりデータロードにdistributedsampler
が必要だったりしますがそこら辺はすでにあるコードをみてもらえばいいかなと思うので割愛します.)
# DDPのセットアップ用関数
def setup(rank, world_size, port_num):
current_dir = os.getcwd()
with open(current_dir + "/hostfile") as f:
host = f.readlines()
host[0] = host[0].rstrip("\n")
dist_url = "tcp://" + host[0] + ":" + str(port_num)
print(dist_url)
# initialize the process group
dist.init_process_group(
"nccl", init_method=dist_url, rank=rank, world_size=world_size
)
print("tcp connected")
DDPのセットアップ部分です.シェルスクリプトで作成したhostfileからhostIDを読み込み,その一番初めのhostをマスターノードとします.
マスターノードとは並列処理を統括するノードのことで,dist.init_process_group
にそのアドレスを渡すことによりマスターノードを指定できます.このとき,tcpによる通信を使用しているのでポート番号を指定しています.(なぜtcpが出てくるのかと言うと,他のノード達の計算結果を通信によりマスターノードへ送る必要があるため.ここらへんの詳しい仕様は自分もよくわかっていないので詳しい方が解説記事を書いてくれたらなーとか思ってます)
解説は以上です.セットアップができればgithubの実装とかは割とDDP対応しているものが多いので実行できちゃうのではと思います.
個人的につまづいた点
モデルの保存について
モデルの保存は,マスタープロセスにおいてのみ行いましょう.
また,マスタープロセス以外のサブプロセスの処理が全て終了してから保存してあげる必要があります.したがって下の様に書きます.
###学習ループの中で####
##
学習処理etc...
##
dist.barrier()
if int(rank)==0:
torch.save(model.state_dict(), checkpoint_dir)
####################
dist.barrier()
により,全てのプロセスがここまで到達するのを待ち,rank
が0(マスタープロセス)においてのみ保存をします.
モデルの保存のみならず,チェックポイントや精度の記録などもマスタープロセスの時のみ行う様にしないと重複が起こるので気をつけましょう.
tcpのポートについて
dist_url = "tcp://" + host[0] + ":" + str(port_num)
ポート番号を乱数で生成することにより重複を避けるテクニックが英語の記事で紹介されていますが.全てのプロセスにおいてdist_url
は同じである必要があります.乱数を生成させるタイミングを間違えるとマスタープロセスとなっているノードの別々のポートにそれぞれのプロセスがアクセスすることになってしまったりするので気をつけましょう.(私はこれでTCP connection error
がでまくってました)
まとめ
実装ベースのDDP解説記事でした.ABCIを前提としているので汎用性は低いですが,これを機に少しでもDDPに関する日本語の解説記事が増えてくれればと思います.
ABCIのDDP必勝テンプレ
#!/bin/bash
#$ -l h_rt=72:00:00
#$ -l rt_F=8
#$ -j y
#$ -cwd
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
export PATH="path/to/anaconda"
conda activate envname
source /etc/profile.d/modules.sh
module load cuda/10.0/10.0.130.1
module load cudnn/7.6/7.6.2
module load openmpi
module load nccl/2.5/2.5.6-1
mpirun --hostfile ./hostfile -np $NHOSTS python main.py