はじめに
TensorFlowの柔軟な使用法を想定しているので、TensorFlowでmultiprocessを行うときの失敗したパターンを記載したいと思います。
基本的なルールは一つだけです。
親プロセスでTensorFlow使用し、子プロセス内で再度TensorFlowを使用するとプロセスが止まる。
Import
import time
import tensorflow as tf
import tensorflow.keras.layers as kl
import numpy as np
import random
from multiprocessing import Process, Pool
import multiprocessing
NeuralNetwork
class NeuralNetwork(tf.keras.Model):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.action_space = 1
self.dense1 = kl.Dense(128, activation="relu")
self.dense2 = kl.Dense(128, activation="relu")
self.out = kl.Dense(self.action_space)
self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
@tf.function
def call(self, x):
x = self.dense1(x)
x = self.dense2(x)
out = self.out(x)
return out
関数編
OKパターン
OK
def worker(i):
net = NeuralNetwork()
arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
print(net(arr))
with multiprocessing.Pool(processes=4) as pool:
pool.map(worker, range(4))
標準出力
tf.Tensor([[0.645734]], shape=(1, 1), dtype=float32)
tf.Tensor([[-0.4795481]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.18732838]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.29219586]], shape=(1, 1), dtype=float32)
NGパターン
プロセスが止まる。
NG
def worker(i):
net = NeuralNetwork()
arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
print(net(arr))
net = NeuralNetwork()
with multiprocessing.Pool(processes=4) as pool:
pool.map(worker, range(4))
クラス編
オーソドックスな使い方
class Worker:
def __init__(self):
self.net = 1
def run(self):
print(self.net)
def job(instance):
instance.run()
return instance
with multiprocessing.Pool(processes=4) as pool:
instances = [Worker() for i in range(4)]
pool.map(job, instances)
標準出力
1111
OKパターン
あくまでも、子プロセス内でインスタンス化をします。
OK
class Strage():
def __init__(self):
self.net = NeuralNetwork()
def run(self, arr):
print(self.net(arr))
def worker(i):
s = Strage()
arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
print(s.net(arr))
with multiprocessing.Pool(processes=4) as pool:
pool.map(worker, range(4))
標準出力
tf.Tensor([[-0.24559836]], shape=(1, 1), dtype=float32)
tf.Tensor([[-0.16172571]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.19367361]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.9469359]], shape=(1, 1), dtype=float32)
NGパターン
①エラーを出力
NG
class Worker:
def __init__(self):
self.net = NeuralNetwork()
def run(self):
arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
print(self.net(arr))
def job(instance):
instance.run()
return instance
with multiprocessing.Pool(processes=4) as pool:
instances = [Worker() for i in range(4)]
pool.map(job, instances)
標準出力
TypeError: can't pickle weakref objects
②プロセスが止まる
NG
class Worker:
def __init__(self):
self.net = 1
def run(self):
print(self.net)
net = NeuralNetwork()
arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
print(net(arr))
def job(instance):
instance.run()
return instance
with multiprocessing.Pool(processes=4) as pool:
instances = [Worker() for i in range(4)]
pool.map(job, instances)
ProcessPoolExecutor編
import
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
selfは渡せない
OKパターン
OK
def worker(i):
print(f'worker No.{str(i)}')
with ProcessPoolExecutor() as executor:
for i in range(4):
future = executor.submit(worker, i)
標準出力
worker No.3
worker No.0
worker No.2
worker No.1
NGパターン
プロセスが止まる。
NG
class worker:
def __init__(self, i):
self.i = i
def tarin(self):
print(f'worker No.{str(self.i)}')
with ProcessPoolExecutor() as executor:
for i in range(4):
env = worker(i)
job = lambda: env.tarin()
future = executor.submit(job)
例外ハンドリング
OKパターン
OK
def worker(i):
try:
net = NeuralNetwork()
raise Exception('Something bad happened!')
except Exception as e:
return i, e
return i, "never gets here"
with ProcessPoolExecutor() as executor:
futures = [executor.submit(worker, i) for i in range(4)]
for f in futures:
i, e = f.result()
print(f"worker_{i}: {e}")
標準出力
worker_0: Something bad happened!
worker_1: Something bad happened!
worker_2: Something bad happened!
worker_3: Something bad happened!
NGパターン
プロセスが止まる。
NG
def worker(i):
try:
net = NeuralNetwork()
raise Exception('Something bad happened!')
except Exception as e:
return i, e
print(e)
return i, "never gets here"
with ProcessPoolExecutor() as executor:
futures = [executor.submit(worker, i) for i in range(4)]
for f in futures:
i, e = f.result()
print(f"worker_{i}: {e}")
Queue
ProcessPoolExecutorでは、multiprocessing.Manager().Queue()を使うと動く。
OKパターン
OK
def train(q):
sleep(1)
while not q.empty():
print(q.get())
def worker(q, i):
q.put(i)
with ProcessPoolExecutor() as executor:
q = multiprocessing.Manager().Queue()
futures = [executor.submit(worker, q, i) for i in range(4)]
futures.append(executor.submit(train, q))
標準出力
0
1
2
3
NGパターン
プロセスが止まる。
NG
from queue import Queue
def train(q):
sleep(1)
while not q.empty():
print(q.get())
def worker(q, i):
q.put(i)
with ProcessPoolExecutor() as executor:
q = Queue()
futures = [executor.submit(worker, q, i) for i in range(4)]
futures.append(executor.submit(train, q))