1. Qiita
  2. 投稿
  3. 機械学習

Distributed TensorFlow でデータ並列を試してみる

  • 31
    いいね
  • 0
    コメント

データ並列とは

前回はDistributed TensorFlowのビルドからモデル並列までを行いましたが、今回はデータ並列による学習を試してみます。並列化にはモデル並列とデータ並列の2種類がありますが、大雑把に言うと下記のようになります。

  • モデル並列: データ1000個に対する巨大な演算1回を100人で分担する
  • データ並列: 1人あたりデータ10個ずつ小分けにして100人で分担する

モデル並列は当然ながらモデルに依存するので、データ並列で一度に扱うデータを減らすほうが汎用性は高いといえるでしょう。

パラメータの共有

学習におけるデータ並列化では、同じパラメータを持ったモデルのコピーを複数作り、バッチを小分けにしてそれぞれのモデルのコピーに渡し、各々に微分を計算させます。つまり同じパラメータをもったモデルをデバイスごとに持たせなければならないのですが、そのあたりの扱いが少しわかりにくいです。今回はGPUは使いませんが、複数デバイスでの計算や、パラメータの共有については、公式のHow To のGPU周りの記述 (Using GPUs,Sharing Variables) が参考になります。

tf.variable_scope()を使うと変数のスコープの定義ができます。同じスコープで同名の変数を使いたい場合はreuseフラグを立てた状態でget_variable()を呼べばよいようです。get_variable()は、reuseフラグが立っていなければ新規作成、立っていれば既存の同名の変数へのリンクを返すような動作をします。これを使えばパラメータの共有ができることになります。

グラフをあとから参照する際にはコレクションというものを使います。

クラスタ構成

Distrubuted Tensorflowのホワイトペーパでは、パラメータデバイスなるVariableを管理するサーバがパラメータの管理と更新を行って、masterが各workerに仕事を投げる、というあたりの記載があります(すみません、ちゃんと読んでません...)。そのあたりを斜め読みしつつ、masterで一括してVariableを管理することにして、小分けされたバッチをこなすのはworker二人、という構成にします。別途パラメータ用のサーバ(ps)を分けてもいいのですが、今回はmasterに内包させます。

  • master デバイス上で、psスコープの変数を作る
  • 各worker デバイス上で、ps スコープの変数をreuseして微分を計算する
  • masterデバイスで、各workerの演算した微分の平均を使ってパラメータをアップデートする

という分担で記述してみます。図にすると下記のようになります。

data_parallel.png

動かしてみる

grpcサーバを起動します。クラスタ構成は masterひとり、workerふたり、なので、

grpc_tensorflow_server --cluster_spec='master|localhost:2222,worker|localhost:2223,worker_|localhost:2224' --job_name=master --task_index=0 &
grpc_tensorflow_server --cluster_spec='master|localhost:2222,worker|localhost:2223,worker_|localhost:2224' --job_name=worker --task_index=0 &
grpc_tensorflow_server --cluster_spec='master|localhost:2222,worker|localhost:2223,worker_|localhost:2224' --job_name=worker_ --task_index=0 &

ですかね。

前回同様、$y=e^x$ の近似をデータ並列で学習させてみます。コードはこちらを参照ください。ついでにおなじところに、シングルCPU版、モデル並列版、も置いておきました。

master, worker でのパラメータの共有は、デバイスとスコープの指定をして、tf.Variable()で作っていた変数の作成部分をget_variable()に置き換えるだけです。使い回しをしたいので、すべてpsスコープで統一します。変数の初期化にはイニシャライザを渡すようです。

W1 = tf.Variable(tf.random_uniform([1,16], 0.0, 1.0))  # before
W1 = tf.get_variable("W1",shape=[1,16],initializer=tf.random_uniform_initializer(0,1)) # after

同一スコープでreuseフラグを立てて呼べば再利用されます。ここで軽く混乱したのですが、ここでいうスコープはvariableのスコープなので、再利用されるのはvariableだけで、グラフは個別のインスタンスになります。

各workerにバッチを渡すのは後ほどメインループで行うのですが、そのときにどのworkerのplacefolderなのか特定できるようにコレクションを使います。

tf.add_to_collection("x",x)  # xをあとから使えるようにコレクションしておく
...
x0= tf.get_collection("x")[0] # xコレクションの0番目を取り出す

同様に、微分もコレクションしておきます。

最適化は、コスト計算〜微分計算〜パラメータ更新、という手順なわけですが、普段は optimizer.minimize() などで一気に微分の演算からパラメータ更新までやると思います。ところが今回は得られた微分を即使わないので、compute_gradients()ののちに apply_gradiends() を行います。全体処理としては、

  • 各workerで微分演算 compute_gradients()
  • masterでそれらの微分を平均
  • 平均した微分を使って apply_gradients()

という手順で行います。微分を平均するところはTensorFlowのサンプルコードから流用しました。

収束の具合を比較してみます。

compare.png

青がシングルCPU版、赤がデータ並列版です。乱数のシードを揃えたこともあってほとんど重なってます。(まったく同じ値ではない)

サーバマシンを増やしたわけではないですし、諸々のオーバヘッドがでかい、そもそもバッチ演算が重くない、などなどこの例ではいろいろ不利なので、並列版のほうが遅いです。手元だとざっと倍ぐらい遅くなりました。

最後に

ここまでは高速化より仕組みの理解が目的なので、毎度並列化で遅くなる結果ばかり出てしまっていますが、次回はいよいよこれをDockerコンテナにして、Google Clout Platformで動かしてみます。無料枠の\$300を使い切る勢いで山ほどコンテナを作れば、ちゃんと速くなるはず。話題沸騰のAlphaGoは1200CPUとのことですが、\$300でどこまでできるか楽しみです。