TensorFlow の複数プロセスで queue を共有

  • 11
    いいね
  • 0
    コメント

はじめに

タイトル通りの内容です。
TensorFlowで分散処理をする場合queueだとかVariableだとか状態を持つような特殊なoperationを複数のプロセス間で共有する必要があります。
そのあたりの情報が少ないのでメモを残しておきます。
まずはqueueの場合です。

多分最小に近いサンプルコード

2つプロセスを立ち上げるので、2つコードを用意します。
enqueue.pyはqueueにひたすらデータを詰めていくコード、dequeue.pyはqueueからひたすらデータを取り出していくコードです。
重要なのはRandomShuffleQueueを作る時に両方のコードで同じshared_nameを指定しているところです。

enqueue.py
import tensorflow as tf

server = tf.train.Server(
    tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
    job_name="local",
    task_index=0
)

q = tf.RandomShuffleQueue(
    capacity=10,
    dtypes=[tf.float32, tf.string],
    name="q",
    shared_name="shared_queue",
    min_after_dequeue=0
)
enqueue_op = q.enqueue_many(vals=[[0, 1, 2], ["a", "b", "c"]])

with tf.Session(server.target) as sess:
    for _ in range(10):
        sess.run(enqueue_op)
dequeue.py
import tensorflow as tf

server = tf.train.Server(
    tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
    job_name="local",
    task_index=1
)

q = tf.RandomShuffleQueue(
    capacity=10,
    dtypes=[tf.float32, tf.string],
    name="q",
    shared_name="shared_queue",
    min_after_dequeue=0
)
dequeue_op = q.dequeue()

with tf.Session(server.target) as sess:
    for _ in range(10):
        print(sess.run([dequeue_op]))

それぞれ以下のように動かしてみましょう。

python enqueue.py
python dequeue.py

dequeue.pyを実行すると以下のようにenqueue.pyでqueueに詰めたデータを取り出すことができました。

[[2.0, 'c']]
[[0.0, 'a']]
[[2.0, 'c']]
[[2.0, 'c']]
[[0.0, 'a']]
[[1.0, 'b']]
[[2.0, 'c']]
[[1.0, 'b']]
[[1.0, 'b']]
[[0.0, 'a']]

やったね。

参考