58
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

horovodのご紹介 ~Tensorflowの分散処理(Distributed Tensorflow)を簡単に実現!!~

Last updated at Posted at 2018-05-24

Tensorflowの分散処理

現在、ニューラルネットワークのモデルは非常に巨大になっていて、1台のGPUマシンだと計算に多くの時間がかかってしまうことが多くなってきました。そこで、多くのGPUで並列処理を行い計算を高速化させる需要が増えています。Tensorflowにも分散実行させるしくみを利用して並列処理を行うことが可能です。しかし、Tensorflowで分散処理を行うには以下のことを理解して1GPU用のコードを書き換える必要があります

  • Parmeter server、Workerの配置
  • 各サーバーへのグラフ、データの配置

これらを意識してコードを書き換えることは可能ですが、学習コストと改造量が多くDistributedの敷居は高めです。
そこでユーザーの書いたコードの変更量をなるべく少なく高速に、分散実行を実現しようとhorovodが登場しました。(以上が前振りです。)

horovodとは??

horovodとはuberが開発しているDeep Learning用フレームワークを複数GPUで分散処理(バッチ並列)させるアプリケーションで現在(2018/5/24)、Tensorflow、keras、 pytorchに対応しています。128サーバー、各4GPUの計512GPUまでの動作実績があるようです。
まずは使用方法を見て、horovodの便利さを感じてください。

使い方

詳細はREADME見て頂くと書いてありますが、ここでも簡単に紹介します。

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)

主な変更は5箇所です。

  • horovodをimport
  • 初期化処理(hvd.init())
  • 学習率にhvd.size()(GPU数)をかける.
  • configの設定を追加
  • tf.train.optimizerをDistributedOptimizerでWrapする

これだけの変更量で分散実行できるようになるのは非常に魅力的だと感じます。
この記事をご覧になったみなさんもGPUを複数お持ちなら試してみてください。
次にhorovodの実装に関して書いていきます。

horovodの実装

どこまで細かく書いていいかわからないので、ある程度ざっくり書いています。
また論文には分散の仕組みとして、Baiduのring-all-reduceの考えを取り入れたと書いてあり,ncclを使うことでring-all-reduceを実現しています.

全体の流れ

初期化

horovodの分散処理は、OpenMPIとncclを利用して実現しています。まずOpenMPIを使って各GPUのランク付けを行います(ユーザーはGPUごとに設定を行う必要がないので楽です)、そしてプロセスとGPUのひもづけをTensorflowのvisible_device_listを利用することで実現しています。

###集約処理(allreduce)
1プロセス1GPUごとにグラフが構築されForward,Backward演算が実行されます.各プロセス(各GPU)でgradientsの計算(tf.train.Optimizer.compute_gradients)が完了したら、各プロセスのgradientをncclのallreduceを利用して更新します。(バッチ並列同期処理)
allreduceを各iterationごとに繰り返していくことで、分散処理を実現しています。

Tensor fusion

Tensor fusionというのがhorovodがアピールしているポイントでこれを利用することによって、大きく性能向上できたと言っています。
小さなTensor(データ)のallreduceを多く実行するようなモデルだと通信回数が多くなって時間がかかってしまう問題がありました。(例えばResNETなど)
そこでhorovodでは複数のTensorをくっつけて一つの塊にしてから、allreduceをすることによって通信回数を減らすような仕組みを導入しました。これがTensor fusionです。このTensor fusionを実現するためにOpenMPIとncclの2段組の通信構造をとっていると思われます。まずOpenMPIを利用して各GPUのTensorの情報をやりとりして、どのTensor同士がFusion(くっつけることが)可能か判別するデータを集めます。Fusion可能なTensorと判断したらallreduce前に各GPUで複数Tensorを一つのデータの塊にします(Tensor fusion)。このようにfusionしたデータをncclでallreduceすることで通信回数を減らし高速化を実現したそうです。
(自分でも複数サーバーを利用した時にTensor fusionによって性能向上していることを確認することができました(ResNet-101))

まとめ

TensorflowなどDeep Learning向けフレームワークで分散実行を簡単に高速に実行できるhorovodの紹介を行いました。ご質問やアドバイス、もっと詳細に知りたいなどありましたら、お気軽にご連絡いただけるとありがたいです。

58
30
6

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
58
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?