はじめに
タイトル通りの内容です。
TensorFlowで分散処理をする場合queueだとかVariableだとか状態を持つような特殊なoperationを複数のプロセス間で共有する必要があります。
そのあたりの情報が少ないのでメモを残しておきます。
まずはqueueの場合です。
多分最小に近いサンプルコード
2つプロセスを立ち上げるので、2つコードを用意します。
enqueue.py
はqueueにひたすらデータを詰めていくコード、dequeue.py
はqueueからひたすらデータを取り出していくコードです。
重要なのはRandomShuffleQueueを作る時に両方のコードで同じshared_name
を指定しているところです。
enqueue.py
import tensorflow as tf
server = tf.train.Server(
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
job_name="local",
task_index=0
)
q = tf.RandomShuffleQueue(
capacity=10,
dtypes=[tf.float32, tf.string],
name="q",
shared_name="shared_queue",
min_after_dequeue=0
)
enqueue_op = q.enqueue_many(vals=[[0, 1, 2], ["a", "b", "c"]])
with tf.Session(server.target) as sess:
for _ in range(10):
sess.run(enqueue_op)
dequeue.py
import tensorflow as tf
server = tf.train.Server(
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]}),
job_name="local",
task_index=1
)
q = tf.RandomShuffleQueue(
capacity=10,
dtypes=[tf.float32, tf.string],
name="q",
shared_name="shared_queue",
min_after_dequeue=0
)
dequeue_op = q.dequeue()
with tf.Session(server.target) as sess:
for _ in range(10):
print(sess.run([dequeue_op]))
それぞれ以下のように動かしてみましょう。
python enqueue.py
python dequeue.py
dequeue.py
を実行すると以下のようにenqueue.py
でqueueに詰めたデータを取り出すことができました。
[[2.0, 'c']]
[[0.0, 'a']]
[[2.0, 'c']]
[[2.0, 'c']]
[[0.0, 'a']]
[[1.0, 'b']]
[[2.0, 'c']]
[[1.0, 'b']]
[[1.0, 'b']]
[[0.0, 'a']]
やったね。