23
12

More than 3 years have passed since last update.

ABCI上でpytorch distributed data parallelによるマルチノード学習

Last updated at Posted at 2021-06-23

なんの記事?

pytorchのDistributedDataParallelについての日本語記事があまりにもなかったため,素人がまとめました.
並列処理がわからない人による,わからない人のための,とりあえず使えればいいや的なDDPの解説です.
基本的にABCIでの実行を前提に書かれていますが,それ以外の環境の人たちにも参考になれば幸いです.

はじめに

おなじみの機械学習フレームワークであるpytorch.気軽にDataParallelで並列処理の学習もできます.
ですがfacebookなどの一流の機械学習エンジニアたちはDistributedDataParallelなるものを使った実装がちらほらみられます.
そこでpytorchの解説記事を読むわけですが,これがびっくりするほどわからない.
というわけで,ABCI上でのDistributedDataParallel(以下DDP)の使い方を自分なりに調べてまとめてみました.
並列処理がどのように動いているかとかの詳しい解説は正直筆者も理解していないので,コードベースでどのように書けば動くかについての解説になっています.

なぜDDP?

DDPは複数ノードでの並列学習のために実装されています.つまり複数マシンあるいは複数ノードで一つのモデルを学習したい人以外は基本的に使う必要がないです.ただ,本記事にたどりつく人は複数ノードを使いたいと言う特殊な需要を持っている方々であるかと思います.ちなみに自分は産総研のABCIを使用していてDDPをするようになりました.この解説もABCIを前提としているので所々他の環境では動かない・そもそも前提が違うなどの点もあるかと思いますがご了承ください.

最低限の知識

DDPを行う際のアーキテクチャです.マスターノードとそうでないノードがあり,それぞれに同じモデルを載せて学習します.ノードは別々のコンピューターと読み替えることもでき,DataParallelでは同一PCの異なるGPUの間で並列処理を行っていましたが,DistributedDataParallelではPCもGPUもまたいだ並列処理を行います.
Untitled Diagram (2).jpg

二つのPCで一つのモデルの学習を行うので,ノード間の通信にTCPやその他通信を用いたりします.また,ノードごとのプロセスランク(図でいうところのGPUID)とノードを超えたプロセスランク(図でいうところのglobal rank)が考えられます.

コード

コードです.
シェルスクリプト上でmain.pyを実行する形式です.

multi.sh
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
mpirun --hostfile ./hostfile -np $NHOSTS python main.py 

main.py
import torch
import torch.nn as nn
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel

# DDPのセットアップ用関数
def setup(rank, world_size, port_num):
    current_dir = os.getcwd()
    with open(current_dir + "/hostfile") as f:
        host = f.readlines()
    host[0] = host[0].rstrip("\n")
    dist_url = "tcp://" + host[0] + ":" + str(port_num)
    print(dist_url)
    # initialize the process group
    dist.init_process_group(
        "nccl", init_method=dist_url, rank=rank, world_size=world_size
    )
    print("tcp connected")

# main関数
def main(gpu_rank, world_size, node_rank, gpu, port_num):
    """ Main """
    # Setup Distributed Training
    # gpu_rank: 0~4 in ABCI, i.e. intra gpu rank
    # world size: total process num
    print(node_rank)
    rank = gpu * node_rank + gpu_rank  # global gpu rank
    if rank == 0:
        print("num_gpu:{}".format(gpu))
    print("global rank:{}".format(rank))
    print("intra rank:{}".format(gpu_rank))
    # set up communication setting between nodes
    setup(rank, world_size, port_num)
    ##########ここに学習の処理を書く##############



    ##########################################


if __name__ == '__main__':
    # ddp setup
    node_rank = int(os.environ["OMPI_COMM_WORLD_RANK"])  # Process number in MPI
    size = int(os.environ["OMPI_COMM_WORLD_SIZE"])  # The all size of process
    print("node rank:{}".format(node_rank))
    print("size of process:{}".format(size))
    gpu = torch.cuda.device_count()  # gpu num per node
    world_size = gpu * size  # total gpu num
    print(world_size)

    port_num = 50000
    mp.spawn(main, nprocs=gpu, args=(world_size, node_rank, gpu, port_num))


main.pyの中に学習処理を書けば動きます.デバッグ用にあえてprint文は残してありますがなくてもいいです.

解説

multi.sh

cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}

早速ABCI限定の話です.
まずSGE_JOB_HOSTLISTなる環境変数を./hostfileというファイルに記録します.使用しているノード一覧を取得しているようです.
次にHOSTという環境変数をHOSTNAMEの一部からとってきています.ABCIではノードの名前になっていると思います.

mpirun --hostfile ./hostfile -np $NHOSTS python main.py 

並列処理のためにopenmpiを使い,その中でmain.pyを実行しています.したがってopenmpiが必要です.ABCIではmodule load openmpiで入れられます.
$NHOSTSはホストの数,つまりノードの数です.

main.py

if __name__ == '__main__':
    # ddp setup
    node_rank = int(os.environ["OMPI_COMM_WORLD_RANK"])  # Process number in MPI
    size = int(os.environ["OMPI_COMM_WORLD_SIZE"])  # The all size of process
    print("node rank:{}".format(node_rank))
    print("size of process:{}".format(size))
    gpu = torch.cuda.device_count()  # gpu num per node
    world_size = gpu * size  # total gpu num
    print(world_size)

    # ポート番号の指定
    port_num = 50000

    # プロセスのスポーン
    mp.spawn(main, nprocs=gpu, args=(world_size, node_rank, gpu, port_num))

環境変数の取得→ポート番号の指定→プロセスのスポーン
という流れです.
ノードの数とノード毎のGPUの数を把握するために環境変数の取得を行います.
次にポート番号を指定します.ポート番号といきなり言われても困惑すると思いますが,通信のために必要になります.
最後にプロセスの実行です.mp.spawn()によりmain関数をnprocs分だけ実行しています.今回はgpuの数だけプロセスを動かしていますね.

def main(gpu_rank, world_size, node_rank, gpu, port_num):
    """ Main """
    # Setup Distributed Training
    # gpu_rank: 0~4 in ABCI, i.e. intra gpu rank
    # world size: total process num
    print(node_rank)
    rank = gpu * node_rank + gpu_rank  # global gpu rank
    if rank == 0:
        print("num_gpu:{}".format(gpu))
    print("global rank:{}".format(rank))
    print("intra rank:{}".format(gpu_rank))
    # set up communication setting between nodes
    setup(rank, world_size, port_num)
    ##########ここに学習の処理を書く##############



    ##########################################

mp.spawn()により実行されるmain関数の中身です.args=()で指定した引数が渡されています.
変数の中身の説明はコメントアウトの通りで,このうちrankがDDPの肝になります.rankはプロセスのIDになっていて,0がマスタープロセスです.
このmain関数はmp.spawn()によって指定したプロセスの数だけ実行されていて,それぞれのプロセスにIDがついているわけですね.
main関数でsetupを実行し,あとは通常通り学習を回します(model.toの指定が変わったりデータロードにdistributedsamplerが必要だったりしますがそこら辺はすでにあるコードをみてもらえばいいかなと思うので割愛します.)

# DDPのセットアップ用関数
def setup(rank, world_size, port_num):
    current_dir = os.getcwd()
    with open(current_dir + "/hostfile") as f:
        host = f.readlines()
    host[0] = host[0].rstrip("\n")
    dist_url = "tcp://" + host[0] + ":" + str(port_num)
    print(dist_url)
    # initialize the process group
    dist.init_process_group(
        "nccl", init_method=dist_url, rank=rank, world_size=world_size
    )
    print("tcp connected")

DDPのセットアップ部分です.シェルスクリプトで作成したhostfileからhostIDを読み込み,その一番初めのhostをマスターノードとします.
マスターノードとは並列処理を統括するノードのことで,dist.init_process_groupにそのアドレスを渡すことによりマスターノードを指定できます.このとき,tcpによる通信を使用しているのでポート番号を指定しています.(なぜtcpが出てくるのかと言うと,他のノード達の計算結果を通信によりマスターノードへ送る必要があるため.ここらへんの詳しい仕様は自分もよくわかっていないので詳しい方が解説記事を書いてくれたらなーとか思ってます)
解説は以上です.セットアップができればgithubの実装とかは割とDDP対応しているものが多いので実行できちゃうのではと思います.

個人的につまづいた点

モデルの保存について

モデルの保存は,マスタープロセスにおいてのみ行いましょう.
また,マスタープロセス以外のサブプロセスの処理が全て終了してから保存してあげる必要があります.したがって下の様に書きます.

###学習ループの中で####
##
学習処理etc...
##
dist.barrier()
if int(rank)==0:
    torch.save(model.state_dict(), checkpoint_dir)
####################

dist.barrier()により,全てのプロセスがここまで到達するのを待ち,rankが0(マスタープロセス)においてのみ保存をします.
モデルの保存のみならず,チェックポイントや精度の記録などもマスタープロセスの時のみ行う様にしないと重複が起こるので気をつけましょう.

tcpのポートについて

dist_url = "tcp://" + host[0] + ":" + str(port_num)

ポート番号を乱数で生成することにより重複を避けるテクニックが英語の記事で紹介されていますが.全てのプロセスにおいてdist_urlは同じである必要があります.乱数を生成させるタイミングを間違えるとマスタープロセスとなっているノードの別々のポートにそれぞれのプロセスがアクセスすることになってしまったりするので気をつけましょう.(私はこれでTCP connection errorがでまくってました)

まとめ

実装ベースのDDP解説記事でした.ABCIを前提としているので汎用性は低いですが,これを機に少しでもDDPに関する日本語の解説記事が増えてくれればと思います.

ABCIのDDP必勝テンプレ

#!/bin/bash
#$ -l h_rt=72:00:00
#$ -l rt_F=8
#$ -j y
#$ -cwd
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
export PATH="path/to/anaconda"
conda activate envname

source /etc/profile.d/modules.sh
module load cuda/10.0/10.0.130.1
module load cudnn/7.6/7.6.2
module load openmpi
module load nccl/2.5/2.5.6-1
mpirun --hostfile ./hostfile -np $NHOSTS python main.py
23
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
12