horovod
horovodはuberが開発しているDeep Learningの分散学習向けのフレームワークです.
現在,Tensorflow, pytorch, kerasに対応しています.(現在MXNetのPRが出ているのでもうじき対応するかもしれません.)
詳しくは紹介記事があるので,horovodに関する説明はそちらを見てほしいです.
現状のhorovodはgraphモードのみに対応していて、sessionを使う前提になっています。eagerには対応していません.
eagerモードはtensorflow2.0でdefaultになる予定など今後重要度が増してくるので対応した方が良いなと思ったのが動機です.
今回の記事はhorovodに対してeager向けのPRで自分がした変更について軽く書いていこうと思います.
(あまり大した変更してないのと,まだPRの段階なのでmergeされるとは限りません.....)
実際に出したPRです
基本方針
horovodはTensorflowや各Deep Learning向けフレームワークとうまく分離できていて,データ取得は各フレームワークごとに実装されていて,核となるncclやmpiを使った分散処理はフレームワークに依存しない形で実装されています.実際の集合演算処理はTensorflowとは別thread,別streamで動作します。なのでeagerモードの対応もinterfaceのみの変更で対応しています.
対応内容
以下簡単に対応内容を書きます.
最近Contributed.mdが追加されて,GPU,CPUのテストのunittestを追加する必要があることが明記されました.
broadcast(初期値の共有)
horovodでは各GPUごとに初期化した初期値を統一するために,rootが初期化したvariablesを各GPUにばらまきます.(MPIのbroadcastを利用しています.)
従来の方法はsessionにhookさせる形でsessionが作成された後に1回だけ実行されます.eagerだとsessionは使用しないので新たに関数を定義してあげました.
allreduce(gradientsの平均化)
allreduceはtf.train.Optimizerをwrapする形で実装されていて,compute_gradientsで計算を行ったgradientsを受け取ってhorovodでallreduceを実行しています.
eager modeでbackwardを実行するためにはtf.GradientTape
のgradient関数を利用します.eager対応も従来同様,tf.GradientTapeをwrapするclassを実装する形で実現しました.
with tf.GradientTape() as tape:
logits = mnist_model(images, training=True)
loss_value = tf.losses.sparse_softmax_cross_entropy(labels, logits)
# Horovod: broadcast initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
hvd.bcast(0, mnist_model.variables) if batch == 0 else None
# Horovod: add Horovod Distributed GradientTape.
tape = hvd.DistributedGradientTape(tape)
grads = tape.gradient(loss_value, mnist_model.variables)
unit test
horovodはTensorflowのtf.test.TestCaseを利用してunit testを書いています.
tf.test.TestCaseはpythonのunittestを継承して作られているので使い方はほとんど同じですが,
tf.test.TestCaseを利用するとsessionの管理が簡単になったり
eagerのテストもgraphモードのテストと同一のTest関数内に定義することができました.
def test_horovod_allreduce_cpu_fused(self):
...
with self.eager_mode:
tests = graph_and_eager()
with self.subTest(msg='eager mode'):
self.assertTrue(tf.reduce_all(tests),
"hvd.allreduce produces incorrect results")
with self.test_session(config=self.config) as session:
tests = graph_and_eager()
with self.subTest(msg='graph mode'):
self.assertTrue(session.run(tf.reduce_all(tests)),
"hvd.allreduce produces incorrect results")
まとめ
今回初めてOSSに対してPRを出したので、その内容に関して簡単に書きました.
アドバイス、ご感想などありましたらお気軽にコメントお願いします.