LoginSignup
26
13

More than 3 years have passed since last update.

ChainerMNでデータパラレル、モデルパラレル分散学習と分散学習のあれこれ

Last updated at Posted at 2019-12-23

はじめに

本記事はChainerMNを用いて分散学習を行ってみようという記事です。内容としては分散学習について、データパラレル分散学習、モデルパラレル分散学習の順番になっています。
以下のスライドの拡張版です。
分散学習のあれこれ~データパラレルからモデルパラレルまで~

Chainerは素晴らしいフレームワークだったのですが、ついにメンテナンスフェーズへと移行してしまい、その役割を終えてPyTorchへと吸収されました。
分散学習をする際にChainerはとても明解であり、スムーズに研究を進められました。最後に恩返しの気持ちを込めて分散学習についてのあれこれを書いていきます。
最後にポエミーなことも書きますが、もしよかったら最後まで見ていってください。Chainerへの最後のクリスマスプレゼントとなれば幸いです。それでは見ていきましょう。
コードは後程Githubで公開します。

分散学習とは?

ここでは分散学習についての概念的なものから説明していきます。分散学習は知ってるよ!という人はデータパラレル実装のセクションから見ていってください。
Parallel_Computation_1.png
上図のように1GPUで訓練したのでは訓練の収束までに10日掛かってしまうモデルがあるとします。しかし、10日も1試行で待っていては研究は進められません。そこで、GPUを沢山用意して計算時間を短縮しようという試みが出てきます。これが分散学習です。

データパラレル分散学習

data_para.png

分散学習には大別して2種類の方式があります。1種類目はデータパラレル分散学習といいます。上図のようにGPUに載せるモデルは全て同一のモデルを載せ、データはGPUごとに異なるものを載せることで扱うバッチサイズを増やし、訓練時間を短縮することができます。オプティマイザでの勾配更新は全てのGPUの推論結果を1つのGPUに集約し、1つのGPUでバックワードを行って勾配更新します。ただし、ここで分散学習の高速化について認識の注意点があります。

  • 1GPUに載せるバッチサイズは変わらず、GPUの数を増やすことでトータルバッチサイズを増やしてトータルイテレーション数を減らす1
  • トータルバッチサイズをGPU数で分割することで1GPU当たりのバッチサイズを減らして高速化する

この2つは混乱しやすいですが、問題解決の過程が全く異なります。
前者はウィークスケーリングと言い、トータルバッチサイズを増加させてトータルイテレーション数は減らすことで訓練時間を減らすのが目的です。しかし、ウィークスケーリングは学習モデルの性能が下がったり2、既存研究の実験設定と変わってしまうといった問題が発生します。

後者はストロングスケーリング言い、トータルバッチサイズは変えずにGPU数で分割したバッチを用いることで1GPUあたりの計算量を減らして訓練時間を減らすのが目的です。しかし、ストロングスケーリングは学習モデルの性能や既存研究の実験設定は変えませんが、ある程度1GPUで効率的に計算できるバッチサイズ以下になってしまうとそれ以上高速化はしません。
まとめると以下のような関係になります。

ウィークスケーリング(トータルバッチサイズ可変) ストロングスケーリング(トータルバッチサイズ不変)
Pros GPU数が多けければ多いほど訓練時間減少 既存研究の実験設定を変えずに訓練が可能
Cons モデルの性能が低下したり、既存研究と実験設定が変わる 1GPUで効率的に計算できるバッチサイズ以下では高速化不可

余談ですが、Pros and Consとは賛否両論の意であり、英語でProsと書いたらメリット、Consとか書いたらデメリットとして表現されます。
この後、モデルパラレルの説明の後にデータパラレルとモデルパラレルのPros and Consを記述します。

モデルパラレル分散学習

model_para.png

2種類目の分散学習がモデルパラレル分散学習と言います。上図のようにデータは全てのGPUで同じものを使い、モデルを分割することで1GPUあたりのモデルサイズを小さくしてモデルをGPUに載せられるようにします。モデルパラレルの目的は1GPUで載らないモデルを載せれるようにすることなので、高速化は基本的には3行えません。
また、モデルパラレルの仕方によって実装の仕方が異なり(CUDAでゴリゴリ書く必要がある場合も)、超簡単に使用できるラッパーも非常に少ないです。
以下にいくつかモデルパラレルの種類を記載します。

ノードごとの分割

model_para_node.png

上図はノードごとに分割することで1GPUあたりのモデルサイズを減らします。ここで通信ボトルネックとなるのは、各層でのつなぎ目部分の出力通信です。アーキテクチャ依存なので、恐らく一番これが実装が厄介です。

層ごとの分割

model_para_layer.png

上図は層ごとに分割することで1GPUあたりのモデルサイズを減らします。ここで、通信ボトルネックとなるのは、分割した層の出力通信です。

チャンネル方向の分割

model_para_channel.png

上図はチャンネルごとに分割することで1GPUあたりのモデルサイズを減らします。ここで通信ボトルネックとなるのは、次の層に進む際に一度特徴マップを集約する部分です。これについては後程モデルパラレルの実装の部分で詳しく述べます。
また、チャンネル間のみが関係しているので、畳み込みの処理に手を加えればよく、今回紹介するモデルパラレルの中でこれが最もシンプルな実装になると思います(とは言っても今回はChainer公式のチャンネルパラレル畳み込みのコードを利用するので、私が実装したわけではありません、、、)。

空間方向の分割

model_para_spacial.png

上図は画素を分割することで1GPUあたりのデータサイズを減らします。今回紹介するモデルパラレルにおいてこの手法のみ特殊であり、モデルサイズではなくデータサイズを減らします。空間方向の分割になるので、パラメータ数は変わらずデータが分割されて載せられるような形になります。訓練データは同一ですが、画像の空間方向でパラレルなので、ある意味データパラレルの亜種のような形になっています。
一見、データパラレルのメリットもモデルパラレルのメリットも享受できていないように見えますが、キモはデータパラレルのストロングスケーリングのデメリットにあります。
ストロングスケーリングのデメリットにおいて1GPUで効率的に計算できるバッチサイズ以下では高速化不可とありました。このデメリットを克服できる可能性を示しています。このままではよくわからないので、空間方向分割の処理内容を見ていきましょう。

model_para_spacial_2.png

一番大きな外側の緑枠は画像の端っこと考えてください。畳み込みのカーネルは重みを共有しているので、4分割の場合は中心のオレンジの分割線から±2画素分だけ各GPUで共有していれば問題ないです。
例)
画像または特徴マップのサイズが1024x1024、畳み込みのカーネルを3x3とした場合、4分割すると512x512になります。左上の分割された画像または特徴マップを考えた時、共有すべきなのは右側の2画素の列、左下の2画素の行、右下の角4画素であり、最終的に1GPUで処理するのは514x514の画像または特徴マップということになります。

以上が処理内容ですが、入力サイズが大きければ大きいほど、この手法の威力は発揮されます。いくら入力サイズが大きくても共有するのは±2画素だからです。ストロングスケーリングは1GPUあたりの効率的な処理数以下のバッチサイズでは威力を発揮しませんが、ストロングスケーリングに加えて空間方向分割を取り入れることで、ストロングスケーリングの限界以上の高速化が可能になるかもしれません。ただし、GPU間通信はかなり遅いので、ある程度入力サイズが小さい場合には威力を発揮しないどころか低速化になってしまう可能性があります(CIFAR-10の32x32など)。
今回の記事では実装はしていませんが、いずれCUDAでゴリゴリとPyTorchで実装したいなという気持ちなので、CUDA詳しい人はもしよかったら一緒に実装しませんか?

データパラレルとモデルパラレル分散学習のまとめ

それでは長くなってしまいましたが、ここでデータパラレルとモデルパラレルをまとめていきましょう。

データパラレル(WS) データパラレル(SS) モデルパラレル
Pros 高速化 高速化 モデルサイズ縮小
Cons 性能劣化、実験設定変更 効率的なバッチサイズ以下は効果無し 訓練の低速化

WSはウィークスケーリング、SSはストロングスケーリングを表しています。こうしてみるとなんとなく分散学習の全貌が見えてきたのではないでしょうか?
より詳しい基礎講座資料は以下の山崎さんの資料を見てください。
分散学習基礎講座

それではここからは実装に入っていきます。

分散学習実装

このセクションではChainerMNを用いて分散学習の実装を行っていきます。先に実行するにあたって必要なものを列挙していきます。

用意するもの

  1. ChainerMN(Chainer5.0以降はChainerMNは同梱)
  2. NCCL(マルチGPUの集団通信ライブラリ)
  3. OpenMPI(並列コンピューティング用ライブラリ)
  4. Multiple GPU(2枚以上)4

ChainerMNはver5.0以降ではChainerに同梱されているので、pip installでChainerを入れましょう。
NCCLやOpenMPIについては様々な記事があるので、そちらを参考にしてみてください。
Multiple GPUは諭吉君に頑張ってもらいましょう。
ちなみにGPUの型であったり、ドライバーのバージョン、Chainerのバージョンなど左右される要素が多きので、そこは各自確認をお願いします。
今回実験にはABCI(AI Bridging Cloud Infrastructure、AI橋渡しクラウド)を利用します。ABCIは国内最大規模のスーパーコンピューターであり、研究目的であれば申請することで使うことができます。GPUはTesla V100が4352基搭載されているお化けマシンです。
いずれABCIの使い方まとめみたいのも需要がありそうなら書いてみます。
ABCIのホームページ

以下に今回の私の環境を載せます。

Chainer NCCL OpenMPI GPU CUDA CUDNN
6.5 2.4.8-1 2.1.6 Tesla V100 10.0.130.1 7.6.3

データパラレル分散学習実装

ChainerでNN(ニューラルネットワーク)を訓練する際、基本的には実行に必要なファイルは3つ用意します。

  • net.py
  • updater.py
  • train.py

この3つです。
今回データパラレルにおいてはtrain.pyに5ステップ、net.pyに1ステップの計6ステップ変更を加えるだけです。
画像認識モデルの分散学習はちょいちょい見かけるので、今回はストロングスケーリング(もし、ここから読んでいる人はストロングスケーリングについては前項のデータパラレル分散学習の項を参照してください)でのGAN(Generative Adversarial Networks、GAN)の訓練を行ってみようと思います。
それではやっていきましょう。まずはnet.pyから見ていきます。あとから細かく説明するので、最初はサラッと流し見してください。

net.pyの変更

net.py
import numpy as np

import chainer
import chainer.links as L
import chainer.distributions as D
from chainer import functions as F
from chainer import backend

import chainermn.links as mnL

# from https://github.com/pfnet-research/sngan_projection/blob/master/source/miscs/random_samples.py
# Make latent vector
def sample_continuous(dim, batchsize, distribution='normal', xp=np):
    if distribution == "normal":
        return xp.random.randn(batchsize, dim).astype(xp.float32)
    elif distribution == "uniform":
        return xp.random.uniform(-1, 1, (batchsize, dim)).astype(xp.float32)
    else:
        raise NotImplementedError


class Generator(chainer.Chain):
    def __init__(self, n_hidden=128, bottom_width=4, ch=512, wscale=0.02, comm=None):
        super(Generator, self).__init__()
        self.n_hidden = n_hidden
        self.ch = ch
        self.bottom_width = bottom_width
        self.comm = comm

        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.l0 = L.Linear(self.n_hidden,
             bottom_width * bottom_width * ch, initialW=w)
            self.c1 = L.Convolution2D(ch, ch // 2, 3, 1, 1, initialW=w)
            self.c2 = L.Convolution2D(ch // 2, ch // 4, 3, 1, 1, initialW=w)
            self.c3 = L.Convolution2D(ch // 4, ch // 8, 3, 1, 1, initialW=w)
            self.c4 = L.Convolution2D(ch // 8, 3, 3, 1, 1, initialW=w)
            self.bn0 = mnL.MultiNodeBatchNormalization(bottom_width * bottom_width * ch, self.comm)
            self.bn1 = mnL.MultiNodeBatchNormalization(ch // 2, self.comm)
            self.bn2 = mnL.MultiNodeBatchNormalization(ch // 4, self.comm)
            self.bn3 = mnL.MultiNodeBatchNormalization(ch // 8, self.comm)

    def forward(self, z):
        h = F.reshape(F.relu(self.bn0(self.l0(z))),
                      (len(z), self.ch, self.bottom_width, self.bottom_width))
        h = F.unpooling_2d(h, ksize=2, outsize=(8,8)) # 4x4 -> 8x8
        h = F.relu(self.bn1(self.c1(h)))
        h = F.unpooling_2d(h, ksize=2, outsize=(16,16)) # 8x8 -> 16x16
        h = F.relu(self.bn2(self.c2(h)))
        h = F.unpooling_2d(h, ksize=2, outsize=(32,32)) # 16x16 -> 32x32
        h = F.relu(self.bn3(self.c3(h)))
        x = F.tanh(self.c4(h))
        return x


class Discriminator(chainer.Chain):
    def __init__(self, n_latent=256, ch=256, bottom_width=4, wscale=0.02, comm=None):
        super(Discriminator, self).__init__()
        self.comm = comm

        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.c0_0 = L.Convolution2D(3, ch // 8, 3, 1, 1, initialW=w)
            self.c0_1 = L.Convolution2D(ch // 8, ch // 4, 3, 1, 1, initialW=w)
            self.c1_0 = L.Convolution2D(ch // 4, ch // 4, 3, 1, 1, initialW=w)
            self.c1_1 = L.Convolution2D(ch // 4, ch // 2, 3, 1, 1, initialW=w)
            self.c2_0 = L.Convolution2D(ch // 2, ch // 2, 3, 1, 1, initialW=w)
            self.c2_1 = L.Convolution2D(ch // 2, ch // 1, 3, 1, 1, initialW=w)
            self.c3_0 = L.Convolution2D(ch // 1, ch // 1, 3, 1, 1, initialW=w)
            self.l4 = L.Linear(ch * bottom_width**2, 1, initialW=w)
            self.bn0_1 = mnL.MultiNodeBatchNormalization(ch // 4, self.comm, use_gamma=False)
            self.bn1_0 = mnL.MultiNodeBatchNormalization(ch // 4, self.comm, use_gamma=False)
            self.bn1_1 = mnL.MultiNodeBatchNormalization(ch // 2, self.comm, use_gamma=False)
            self.bn2_0 = mnL.MultiNodeBatchNormalization(ch // 2, self.comm, use_gamma=False)
            self.bn2_1 = mnL.MultiNodeBatchNormalization(ch // 1, self.comm, use_gamma=False)
            self.bn3_0 = mnL.MultiNodeBatchNormalization(ch // 1, self.comm, use_gamma=False)

    def forward(self, x):
        h = F.leaky_relu(self.c0_0(x))
        h = F.leaky_relu(self.bn0_1(self.c0_1(h)))
        h = F.average_pooling_2d(h, 2) # 32x32 -> 16x16
        h = F.leaky_relu(self.bn1_0(self.c1_0(h)))
        h = F.leaky_relu(self.bn1_1(self.c1_1(h)))
        h = F.average_poaoling_2d(h, 2) # 16x16 -> 8x8
        h = F.leaky_relu(self.bn2_0(self.c2_0(h)))
        h = F.leaky_relu(self.bn2_1(self.c2_1(h)))
        h = F.average_pooling_2d(h, 2) # 8x8 -> 4x4
        h = F.leaky_relu(self.bn3_0(self.c3_0(h)))
        h = self.l4(h)
        return h

今回のnet.pyの構成要素は以下の3点です。

  • sample_continuous
  • Generator
  • Discriminator

sample_continuousはGeneratorの入力となる潜在変数$z$を生成する関数です。
GeneratorはGANの生成を担うモデル部分です。
DiscriminatorはGANの真偽判別を担うモデル部分です。
データパラレル分散学習を行うにあたって変更する部分はGeneratorDiscriminatorのBN(Batch Normalization)の部分です。
なぜBNに変更が必要かというと、データパラレル分散学習を行う際にBNは各GPUで計算されてしまうので、正しいBNとはならないからです。
例)
バッチサイズ64、GPU数16だとした場合に1GPUあたりに乗るデータサイズは8であり、このときBNは64データ分ではされずに4データずつでのバラバラのBNを掛けた結果となる。

このように1GPUあたりのデータが少ない場合にはBNの威力は発揮されず、性能向上には繋がらない場合がほとんどです。また、1GPUあたりのデータが十分にある場合でも既存研究の再現性が取れないことが発生しかねません。
なので、model.py中の冒頭のimport chainermn.links as mnLでマルチノード用のlinksを呼べるようにしておき、GeneratorとDiscriminatorの__init__のBN部分(L.BatchNormalization)をmnL.MultiNodeBatchNormalizationに書き変えます。引数については第1引数はチャンネルサイズを指定し、第2引数はコミュニケーターを指定します。ちなみにノードとはマシンのことを指しています。NNのノードとは異なります。
mnL.MultiNodeBatchNormalizationはテスト推論時にもそのままで問題なく通常のBNとして機能します。素晴らしい!

train.pyの変更

続いて、train.pyを見ていきましょう。先に変更点だけをさらっていくゆえコードの中身は見なくてもいいので、下図の緑枠緑文字を見てください。
data_para_code1.png
data_para_code2.png

コミュニケーター作成

まず一番最初にやることは上図1枚目のGPU情報を格納するオブジェクトを作成というものです。
メイン関数の冒頭に以下のコードを入れてください。

make_communicator.py
comm = chainermn.create_communicator(communicator_name="pure_nccl")

このcommunicatorというものがChainerMNを用いた分散学習においてコアになっています。
引数であるcommunicator_nameですが、これは基本的に"pure_nccl"のままで問題ありません。NCCLのバージョンが2以前だったりするとダメですが、基本的にレイテストバージョンのNCCLであれば全く問題ありません。

オプティマイザをマルチGPU対応に

続いて、オプティマイザをマルチGPUに対応させます。
上図1枚目の既存のオプティマイザーをマルチノード用モジュールでラップするというところです。
これは非常に簡単な操作であり、ChainerMNのある関数でラップするだけです。以下にラップ後のコードを示します。

make_optimizer.py
def make_optimizer(comm, model, alpha=0.0002, beta1=0., beta2=0.9):
    optimizer = chainermn.create_multi_node_optimizer(
        chainer.optimizers.Adam(alpha=alpha, beta1=beta1, beta2=beta2), comm)
    optimizer.setup(model)
    return optimizer

今回オプティマイザーはAdamを使用する体で話を進めていきます。既存のAdamを関数chainermn.create_multi_node_optimizer()でラップしてやるだけです。
第1引数には既存のオプティマイザー、第2引数にはコミュニケーターを入れます。このようにラップしてやることで、各GPUで計算後にGPU0番(ホストとなるGPU)にロス計算後のデータを集約し、バックワードを行って勾配更新が行えるようになります。

各GPUにデータ分配

次は各GPUに学習データを分配します。
上図1枚目の最初のGPU0番にのみデータセットを載せた後に各GPUに分配するというところです。
ChainerMNはGPU0番(ホストGPU)に一度全てのデータを載せ、それを各GPUに分配するスタイルを取ります。
ここではCIFAR-10を使った実験を行うので、Tesla V100ならば1台に全ての画像データが載りますが、もっとデータが大きくなるとそうはいきません。なので、その場合にはデータのパスをGPU0番にまとめた後に各GPUにパスを分配して読み込みを行います。以下にデータの載せ方を箇条書きします。

  • GPU0番にデータを全て載せて、データを各GPUに載せる
  • GPU0番にデータのパスを全て載せて、パスを各GPUに載せる

後者はパスを分配させた後に各GPUで全てのデータを載せるもよし、学習中に都度読み込みの形式にしてもよしですが、私は今回は学習の時間ロスを少しでも減らしたいので、パスを分配した後に全てのデータを各GPUに載せます。
少し話が脱線してしまいましたが、データセット読み込み部分を以下のように書き変えます。

dataset_loader.py
if comm.rank==0:
    train, test = chainer.datasets.get_cifar10(withlabel=False, scale=255.)
else:
    train, test = None, None
train = chainermn.scatter_dataset(train, comm, shuffle=True)

comm.rankとは全GPUのランク(GPUのIDのようなもの)を取得して戻り値を返します。GPU0番のみはデータを読み込みますが、その他のGPUは全てデータはNoneにしておきます。その後に、chainermn.scatter_datasetを利用して各GPUにデータを分配します。第1引数は分配するデータセット、第2引数はコミュニケーターです。これだけで各GPUへのデータの分配は完了です。

各GPUにモデルを載せる

次は上図2枚目の各GPUにモデルを載せるという部分です。
1GPUの場合はGPU0番のみを指定してhoge.to_gpu()でモデルをGPUに載せていたと思います。
分散学習をする際はGPU番号をGPUごとに取得してモデルを載せます。
モデルを載せる部分を以下のように書き換えます。

deploy_model.py
devices = comm.intra_rank
chainer.cuda.get_device(devices).use()
gen.to_gpu()
dis.to_gpu()

comm.intra_rankでGPU番号を取得します。おや、待てよと。なぜ、comm.rankを使わないのかと疑問に思われたかもしれません。comm.rankを用いてしまうと全GPUの割り振られた番号が取得されてしまいます。chainer.cuda.get_device()では1ノード内での番号を割り当てなくてはいけないので、comm.intra_rankで1ノード内のGPU番号を取得します。2ノード8GPU(1ノードあたり4GPU)などの場合ではエラーが発生します。

記録用のextensions達をGPU0番のみで動かす

最後に上図2枚目のログの保存などはGPU0番のみ行うように変更の部分です。
ログの保存は全てのGPUでやっては完全に無駄なので、GPU0番にのみこれをやらせます。これは以下のような非常に単純な処理で問題ありません。

save_log.py
if comm.rank==0:
    snapshot_interval = (snapshot_interval, "iteration")
    evaluation_interval = (evaluation_interval, "iteration")
    display_interval = (display_interval, "iteration")
    trainer.extend(extensions.snapshot_object(s_gen, 'gen_iter_{.updater.iteration}.npz'), trigger=snapshot_interval)

hoge_intervalは事前に決めておいた保存用の間隔なので、ここでは無視してください。このようにif文で分岐させてやるだけです。

以上が変更の6ステップです。
多く見えますが、少し書き換えるだけなので5分ほどで終わります。それでは実行していきましょう。

実際に動かしてみよう!

では、実際に動かしてみようと思います。
以下のコードをターミナルに打ち込むだけです。

$ mpiexec -np 4 python train.py

$マークはターミナルの入力開始位置を表しているので省いてください。windowsでは>です。
mpiexecは並列で計算する際のOpenMPIのコマンドです。mpirunもありますが、どちらでも大丈夫です(正直違いがよくわかってないです)-npの後に続くのはGPU数です。
たったこれだけで分散学習がスタートできます。便利ですね。
結果は以下の通りです。

cifar10_data_para.png

車らしきものや馬らしきものが生成されていますね(CIFAR-10のカエルクラスとか人間にもわからなくないですか?)

複数ノードにまたがった訓練を行ってみる

続いて、単一ノードではなく、複数ノードにまたがった訓練を行ってみようと思います。用意すべきファイルが1つだけ増えます。ホストファイルというノードの名前が記載されたファイルが必要になります。拡張子は必要なく、自分で作ることになります。例えば、hostfileという名前で作ったとして、ノード名がnode1, node2の2つのノードがあるとした場合のファイルの中身を下に記述します。

hostfile.
node1 slots=N
node2 slots=N

Qiitaの仕様上.を打たないとファイル名として表示されないので、.を打っていますが、実際に使うときに.は打たないでください。
上記のようにノード名の後に半角スペースでslots=Nを記載します。Nは1ノードあたり使うGPU数を指定しています。ちなみにABCIでは毎回割り当てられるノードが異なるので動的に処理できるようにしなくてはいけません。以下にABCI上で動的にホストファイルを作成するプログラムを記載します。

make_hostfile.sh
cat $SGE_JOB_HOSTLIST > ./hostfile
HOST=${HOSTNAME:0:5}
sed -i "s/\$/ slots=N/" ./hostfile

1行目の$SGE_JOB_HOSTLISTはABCIの現在割り当てられているホスト名を取得する環境変数になっており、デフォルトで使えます。
1行目の意味は$SGE_JOB_HOSTLISTの中身をカレントディレクトリのhostfileに書き込むという処理です。カレントディレクトリにhostfileが無ければ自動で作成されます。
2行目なのですが、、、以前自分で作ったのにも関わらず意味を忘れてしまいました、、、(とりあえずこれで動くようにしたのは覚えているので、入れておいてください)
3行目の意味はsedコマンドを用いてslots=Nを各ノードの名前の後に記載するという処理です。
これを訓練を実行するシェルスクリプトの冒頭に入れることで自動的にホストファイルが作成されます。
では、以下に実行するコマンドを記述します。

$ mpiexec --hostfile hostfile -np 8 python train.py

ホストファイルさえ準備してしまえば上記のように簡単にマルチノードに拡張できます。

では実際に実行してみようと思います。今回はLSUN bedroomというGANでは広くベンチマークとして使われるデータセットを用いてやってみようと思います。画像枚数は300万枚とちょっとで、画像サイズは256x256にしてあります。デカいですね。ちなみに縦横で同一サイズにするとアスペクト比が崩れてしまうので、やや変形したベッドルームになります。
折角なので、バッチサイズ2048、16ノード64GPUで実行します(この量でないと全データを載せてバッチサイズ2048にできない)。
諸事情により実験諸元とコードは公開できないのですが、実行結果を載せます。訓練時間は18時間ほどです。

lsun_test.png

一部とろけてるヤバい画像が散見されますが、ある程度はいい感じにベッドルームが生成できていることが確認できます(チェリーピックはしてません。完全にランダムです)。

モデルパラレル分散学習実装

さて、やっとモデルパラレルのセクションまで来ました。
ここではセクション分散学習とは?で前述の通り、チャンネルパラレルを実装してみます。
チャンネルパラレルは通信ボトルネックが各チャンネルを集約する部分にあると言いましたが、理由は以下の図です。

channel_para.png

このように各チャンネルをパラレルで畳み込んだ後に次の畳み込みに進む際、一度全てのチャンネルを集約する必要があります。このゆえ、実行速度は非常に遅いですが、1GPUあたりのモデルサイズは縮小することが可能になります。今回のDCGAN(CIFAR-10生成)では1GPUのDCGANの約2倍、今回は実装していませんがVGG(CIFAR-10)認識では1GPUの約3倍かかります。
それでは実装に進むのですが、先に変更点をさらっておきましょう。

data_para_code1.png
channel_para_code1.png
channel_para_code2.png
channel_para_code3.png

少々ややこしくなっていますが、データパラレルの場合とよく似ています。相違点は以下の通りです。

  • データを各GPUに分配しない
  • 訓練用のイテレータをマルチノード用のモジュールでラップ
  • 通常のL.Convolution2Dをラップするチャンネルパラレル用の畳み込みを定義
  • 通常の畳み込みをチャンネルパラレル用の畳み込みに入れ替え

2項目目から説明していきます。

train.pyの変更

ここはデータパラレルの変更と似ているので、train.pyから変更していきます。
訓練用イテレータとなるtrain_iterを以下のようにラップしてあげます。

train_iter.py
train_iter = chainermn.iterators.create_multi_node_iterator(
    chainer.iterators.SerialIterator(train, batchsize), comm)

chainermn.iterators.create_multi_node_iterator()でラップしてあげるだけです。第1引数は1GPU訓練用イテレータ、第2引数はコミュニケーターです。

net.pyの変更

まず、チャンネルパラレル用の畳み込みを定義します。

channel_para_conv.py
class ParallelConvolution2D(chainer.links.Convolution2D):
    def __init__(self, comm, in_channels, out_channels, *args, **kwargs):
        self.comm = comm
        self.in_channels_ = in_channels
        self.out_channels_ = out_channels
        super(ParallelConvolution2D, self).__init__(
            self._in_channel_size, self._out_channel_size, *args, **kwargs)

    def _channel_size(self, n_channel):
        # Return the size of the corresponding channels.
        n_proc = self.comm.size
        i_proc = self.comm.rank
        return n_channel // n_proc + (1 if i_proc < n_channel % n_proc else 0)

    @property
    def _in_channel_size(self):
        return self._channel_size(self.in_channels_)

    @property
    def _out_channel_size(self):
        return self._channel_size(self.out_channels_)

    @property
    def _channel_indices(self):
        # Return the indices of the corresponding channel.
        indices = np.arange(self.in_channels_)
        indices = indices[indices % self.comm.size == 0] + self.comm.rank
        return [i for i in indices if i < self.in_channels_]

    def __call__(self, x):
        x = x[:, self._channel_indices, :, :]
        y = super(ParallelConvolution2D, self).__call__(x)
        ys = chainermn.functions.allgather(self.comm, y)
        return F.concat(ys, axis=1)

これをコピペするだけです。元ソースは以下のページにあります。
Channel-wise Parallel Convolution
※しかし、ここで注意点があります!※
上記のページの公式コードをそのままコピペした場合エラーが発生します。原因は初期化部分にあります。

bad_init.py
self.in_channels = in_channels
self.out_channels = out_channels

初期化の部分で上記のようになっていますが、このままだとラップされているchainer.links.Convolution2Dの中のself.in_channelsself.out_channelsが呼ばれてしまいます。同一の名前だとエラーが発生してしまうので、以下のように適当に書き変えましょう。

fixed_init.py
self.in_channels_ = in_channels
self.out_channels_ = out_channels

末尾に_を入れてあげました。これで正常に動くようになります。
では、このチャンネルパラレル用の畳み込みを通常の畳み込みと置換します。

net.py
import numpy as np

import chainer
import chainer.links as L
import chainer.distributions as D
from chainer import functions as F
from chainer import backend

import chainermn.links as mnL

# from https://github.com/pfnet-research/sngan_projection/blob/master/source/miscs/random_samples.py
# Make latent vector
def sample_continuous(dim, batchsize, distribution='normal', xp=np):
    if distribution == "normal":
        return xp.random.randn(batchsize, dim).astype(xp.float32)
    elif distribution == "uniform":
        return xp.random.uniform(-1, 1, (batchsize, dim)).astype(xp.float32)
    else:
        raise NotImplementedError


class Generator(chainer.Chain):
    def __init__(self, n_hidden=128, bottom_width=4, ch=512, wscale=0.02, comm=None):
        super(Generator, self).__init__()
        self.n_hidden = n_hidden
        self.ch = ch
        self.bottom_width = bottom_width
        self.comm = comm

        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.l0 = L.Linear(self.n_hidden,
             bottom_width * bottom_width * ch, initialW=w)
            self.c1 = ParallelConvolution2D(self.comm, ch, ch // 2, 3, 1, 1, initialW=w)
            self.c2 = ParallelConvolution2D(self.comm, ch // 2, ch // 4, 3, 1, 1, initialW=w)
            self.c3 = ParallelConvolution2D(self.comm, ch // 4, ch // 8, 3, 1, 1, initialW=w)
            self.c4 = ParallelConvolution2D(self.comm, ch // 8, 3, 3, 1, 1, initialW=w)
            self.bn0 = L.BatchNormalization(bottom_width * bottom_width * ch)
            self.bn1 = L.BatchNormalization(ch // 2)
            self.bn2 = L.BatchNormalization(ch // 4)
            self.bn3 = L.BatchNormalization(ch // 8)

    def forward(self, z):
        h = F.reshape(F.relu(self.bn0(self.l0(z))),
                      (len(z), self.ch, self.bottom_width, self.bottom_width))
        h = F.unpooling_2d(h, ksize=2, outsize=(8,8)) # 4x4 -> 8x8
        h = F.relu(self.bn1(self.c1(h)))
        h = F.unpooling_2d(h, ksize=2, outsize=(16,16)) # 8x8 -> 16x16
        h = F.relu(self.bn2(self.c2(h)))
        h = F.unpooling_2d(h, ksize=2, outsize=(32,32)) # 16x16 -> 32x32
        h = F.relu(self.bn3(self.c3(h)))
        x = F.tanh(self.c4(h))
        return x


class Discriminator(chainer.Chain):
    def __init__(self, n_latent=256, ch=256, bottom_width=4, wscale=0.02, comm=None):
        super(Discriminator, self).__init__()
        self.comm = comm

        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.c0_0 = ParallelConvolution2D(self.comm, 3, ch // 8, 3, 1, 1, initialW=w)
            self.c0_1 = ParallelConvolution2D(self.comm, ch // 8, ch // 4, 3, 1, 1, initialW=w)
            self.c1_0 = ParallelConvolution2D(self.comm, ch // 4, ch // 4, 3, 1, 1, initialW=w)
            self.c1_1 = ParallelConvolution2D(self.comm, ch // 4, ch // 2, 3, 1, 1, initialW=w)
            self.c2_0 = ParallelConvolution2D(self.comm, ch // 2, ch // 2, 3, 1, 1, initialW=w)
            self.c2_1 = ParallelConvolution2D(self.comm, ch // 2, ch // 1, 3, 1, 1, initialW=w)
            self.c3_0 = ParallelConvolution2D(self.comm, ch // 1, ch // 1, 3, 1, 1, initialW=w)
            self.l4 = L.Linear(ch * bottom_width**2, 1, initialW=w)
            self.bn0_1 = L.BatchNormalization(ch // 4, use_gamma=False)
            self.bn1_0 = L.BatchNormalization(ch // 4, use_gamma=False)
            self.bn1_1 = L.BatchNormalization(ch // 2, use_gamma=False)
            self.bn2_0 = L.BatchNormalization(ch // 2, use_gamma=False)
            self.bn2_1 = L.BatchNormalization(ch // 1, use_gamma=False)
            self.bn3_0 = L.BatchNormalization(ch // 1, use_gamma=False)

    def forward(self, x):
        h = F.leaky_relu(self.c0_0(x))
        h = F.leaky_relu(self.bn0_1(self.c0_1(h)))
        h = F.average_pooling_2d(h, 2) # 32x32 -> 16x16
        h = F.leaky_relu(self.bn1_0(self.c1_0(h)))
        h = F.leaky_relu(self.bn1_1(self.c1_1(h)))
        h = F.average_poaoling_2d(h, 2) # 16x16 -> 8x8
        h = F.leaky_relu(self.bn2_0(self.c2_0(h)))
        h = F.leaky_relu(self.bn2_1(self.c2_1(h)))
        h = F.average_pooling_2d(h, 2) # 8x8 -> 4x4
        h = F.leaky_relu(self.bn3_0(self.c3_0(h)))
        h = self.l4(h)
        return h

先ほどのデータパラレルのBNを通常のBNに戻し、畳み込みを全てParallelConvolution2Dに置換するだけです。第1引数にコミュニケーターを入れるのをお忘れなく。
以下が実行コードです。

$ mpiexec -np 4 python train.py

データパラレルの時と変わりませんね。
実行なのですが、ABCIのポイントの関係でこれだけ実行できませんでした、、、
動くのは確認してあるので、もしよかったら使ってみてください。
実装はここまでです。この後は踏んだバグについてやポエムになります。

踏んできた探すのがとてもしんどかったバグ

ここでは折角なので、マルチGPU実験で踏んできたバグを書き残しておこうと思います。

ParallelConvolution2Dの初期化の罠

これは先ほどモデルパラレル分散学習のセクションでも述べましたが、初期化においてself.in_channelsself.out_channelsがラップされているL.Convolution2Dのほうのself.in_channelsself.out_channelsを呼んでしまっていたということです。
これはほんと理由がわからず頭を抱えていたのですが、Chainerエヴァンジェリストの梅澤さんの助けを借りながらなんとか発見してfixしました。さすがChainerエヴァンジェリスト様と頭があがりませんでした。

logの一時ファイルへの書き込みが失敗する in ABCI

これはABCI使用に限ったことかもしれませんが、常に書き出し続けていると途中で稀にlogの一時ファイルへの書き込みが失敗して訓練が終了してしまうバグがありました。NFSでは起きうるバグなのですが、ABCIはABCI内で環境が閉じているので、非常に謎のバグです。
対処法はABCIの計算ノードのローカルディレクトリに一時保管し、プログラムの終了後に自分のディレクトリに書き出すという処理にすると回避できます。
ABCIの計算ノードのローカルディレクトリへのアクセス方法は$SGE_LOCALDIRで直接ディレクトリを参照できます。
なので、実行時に以下のように書きます。

mpiexec -np 4 python train.py --out_dir $SGE_LOCALDIR

--out_dirはプログラムの出力ディレクトリです。argparseで自分で記述します。
加えてプログラムの末尾に訓練終了後書き出す処理を以下のようにshutilglobを使って書きます。

import glob
import shutil

os.makedirs(final_out, exist_ok=True)
local_files = glob.glob(args.out + "/*")
for i in range(len(local_files)):
  shutil.move(local_files[i], final_out)

os.makedirsの第1引数は最終的な出力したい自分のディレクトリです。
これでバグは回避できます(最後にしか画像の結果が見れないので、少々怖いですが)。

Linux側からsignal 6が飛んでくる

これは一番苦労したバグでした。
マルチGPUで動かす際に以下のエラー文と共にLinuxからsignal 6が飛んできます。

--------------------------------------------------------------------------
A process has executed an operation involving a call to the
"fork()" system call to create a child process.  Open MPI is currently
operating in a condition that could result in memory corruption or
other system errors; your job may hang, crash, or produce silent
data corruption.  The use of fork() (or system() or other calls that
create child processes) is strongly discouraged.
The process that invoked fork was:
  Local host:          [[43747,1],0] (PID 53683)
If you are *absolutely sure* that your application will successfully
and correctly survive a call to fork(), you may disable this warning
by setting the mpi_warn_on_fork MCA parameter to 0.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node g0034 exited on signal 6 (Aborted).
--------------------------------------------------------------------------

signal 9であれば、「あ、メモリが足りないんだな」となりますが、signal 6はもう何もわかりません。これはChainerMNの開発者である福田さんの多大な助力をいただきましたが、なかなかエラーを発見できませんでした。
結論としてはextensionsのある関数が原因でした。それは以下の関数です。

trainer.extend(extensions.DumpGraph("loss_dis", filename="dis.dot"))

trainer.extendのグラフをダンプする関数extensions.DumpGraphが原因でした。loss_disはGANのDiscriminatorのロスです。
これは福田さん曰く

DumpGraphはSubprocessモジュールを使って外部プロセスを呼んでいるので、確かにChainerMNと合わせ使うと危険な可能性もありますね・・・

とのことでした。
これはOpenMPI ver 2で発生するバグでした(ver 3では以下のおまじないをプログラム冒頭に書くことでこのバグは回避できるようです)。

omajinai.py
multiprocessing.set_start_method('forkserver')
p = multiprocessing.Process()
p.start()
p.join()

しかし、ABCIでは他のモジュールとの兼ね合いでOpenMPI ver 3はロードできないようにされています。なので、このDumpGraphはマルチGPUの際は使うことをあきらめるしかないようです、、、
少ないデータ、小バッチサイズで1GPUで試してグラフを出力するなどで対処するのがベストだと思います。
死ぬほどデバッグがつらかったバグだったので、永遠とデバッグ内容を書き連ねたいくらいですが、この辺までにしておきます。

ポエム

ずっとChainerの分散学習については記事化したかったので、やっとできました。
2019/12/5にPFNの、Chainerがメンテナンスフェーズに移行する旨のツイートを投稿30秒後で見つけ、その数時間後にTA業務のある僕は喫煙所で言葉を失ってしまってました。その後のTA業務はしっかりこなしましたが、心ここにあらずという感じでした。
わなわなと唇は震え、いつ涙が零れてもおかしくありませんでした。

初めてディープラーニングをするときTensorFlowに触れてデバッグがしんどいなぁと思っていたところでChainerに出会いました。
それからというものChainerの使いやすさに惚れ込み、がむしゃらに色々なコードを書いていました。
PyTorchが登場後はChainerを絶対一世風靡させると分散学習講習会を受けて分散学習のスライドを書いたり、論文再現実装を行ったりしてきました。ChainerやChainerMNのバグも見つけたら積極的にfixしに行ったつもりです(僕の技術力が足りずそこまでコミットできませんでしたが、、、悲しい、、、)。
ICLR2020ではトップレビューを獲得したPyTorch論文では2階微分楽にできるようにしたよ!という内容も見受けられましたが、Chainerではもっと前から2階微分に対応してました。自分で開発したわけではないのにすごい誇らしかったのを覚えています。
僕自身がコミットできたことはChainerの歴史からしたらほんのわずかなものでしたが、とてもChainerが好きでした。

そんなときにあの一報が舞い込んできたのです。
僕はそのときなぜ自分がそんなに泣きそうなのかが理解できませんでした。
自分と共に戦ってきてくれた相棒がそこでお別れをして手を振っているようでした。

僕とChainerの最後の思い出はCVPR2020投稿のときです。僕の実験試行速度はChainerがあってこそのものだったと思っています。
本当に本当に助かったし、楽しかったです。
これからはPyTorchにいいところは吸収されていき、PyTorchの中で生き続けると信じています。

ありがとう、Chainer。そして、さよならChainer。
これからよろしく、PyTorch。

こんな傍から見たら激痛ポエムですが、僕の精いっぱいの気持ちのつもりです。
空回りだとしても最後に気持ちを残せてよかったです。

僕はChainerのサンタさんになれたでしょうか?

参考文献

[1] S. L. Smith, P. J. Kindermans, C. Ying and Q. V. Le. "Don't Decay the Learning Rate, Increase the Batch Size", In proc of ICLR 2018.
[2] Preferred Networks、深層学習の学習速度において世界最速を実現,” https://www.preferred-networks.jp/ja/news/pr20171110”.
[3] 世界最高速を達成!ディープラーニングの高速化技術を開発,” https://pr.fujitsu.com/jp/news/2019/04/1.html”.


  1. 識別モデルではノイズスケール[1]というバッチサイズと学習率の関係式があったりしますが、GANではそれが成り立つとは証明されておらず、バッチサイズを上げる場合にはイテレーション数は減ることがあってもトータルエポックは増えてしまうなどのことが起きます。 

  2. Don't Decay the Learning Rate[1]という論文があり、学習率を上げたりなど工夫をしなくてはラージバッチの場合に学習モデルの性能は劣化します。少し古い情報になりますが、PFNが32kバッチまで精度低下を防いでいます[2]。ImageNet学習速度世界一争いは実は計算資源投入だけでは成り立たないという厳しい世界なのです[3]。 

  3. モデルが分割されているので、GPU間通信が多く発生し、通信ボトルネックとなるので、速度が基本的に低下します。GPU内通信は900Gbpsほどですが、GPU間通信はInfiniBandでも100Gbpsほどです。 

  4. GPU同士の接続に通信が遅いケーブルが用いられると計算速度の向上度合は低くなります。場合によっては全くスケールしないという場合があります。 

26
13
7

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
13