LoginSignup
12
12

More than 5 years have passed since last update.

TensorFlow の複数プロセスで queue を共有

Posted at

はじめに

タイトル通りの内容です。
TensorFlowで分散処理をする場合queueだとかVariableだとか状態を持つような特殊なoperationを複数のプロセス間で共有する必要があります。
そのあたりの情報が少ないのでメモを残しておきます。
まずはqueueの場合です。

多分最小に近いサンプルコード

2つプロセスを立ち上げるので、2つコードを用意します。
enqueue.pyはqueueにひたすらデータを詰めていくコード、dequeue.pyはqueueからひたすらデータを取り出していくコードです。
重要なのはRandomShuffleQueueを作る時に両方のコードで同じshared_nameを指定しているところです。

enqueue.py
import tensorflow as tf

server = tf.train.Server(
    tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
    job_name="local",
    task_index=0
)

q = tf.RandomShuffleQueue(
    capacity=10,
    dtypes=[tf.float32, tf.string],
    name="q",
    shared_name="shared_queue",
    min_after_dequeue=0
)
enqueue_op = q.enqueue_many(vals=[[0, 1, 2], ["a", "b", "c"]])

with tf.Session(server.target) as sess:
    for _ in range(10):
        sess.run(enqueue_op)
dequeue.py
import tensorflow as tf

server = tf.train.Server(
    tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
    job_name="local",
    task_index=1
)

q = tf.RandomShuffleQueue(
    capacity=10,
    dtypes=[tf.float32, tf.string],
    name="q",
    shared_name="shared_queue",
    min_after_dequeue=0
)
dequeue_op = q.dequeue()

with tf.Session(server.target) as sess:
    for _ in range(10):
        print(sess.run([dequeue_op]))

それぞれ以下のように動かしてみましょう。

python enqueue.py
python dequeue.py

dequeue.pyを実行すると以下のようにenqueue.pyでqueueに詰めたデータを取り出すことができました。

[[2.0, 'c']]
[[0.0, 'a']]
[[2.0, 'c']]
[[2.0, 'c']]
[[0.0, 'a']]
[[1.0, 'b']]
[[2.0, 'c']]
[[1.0, 'b']]
[[1.0, 'b']]
[[0.0, 'a']]

やったね。

参考

12
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
12