1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

TensorFlowのmultiprocessアンチパターン

Last updated at Posted at 2022-08-25

はじめに

TensorFlowの柔軟な使用法を想定しているので、TensorFlowでmultiprocessを行うときの失敗したパターンを記載したいと思います。

基本的なルールは一つだけです。

親プロセスでTensorFlow使用し、子プロセス内で再度TensorFlowを使用するとプロセスが止まる。

Import
import time
import tensorflow as tf
import tensorflow.keras.layers as kl
import numpy as np
import random
from multiprocessing import Process, Pool
import multiprocessing
NeuralNetwork
class NeuralNetwork(tf.keras.Model):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.action_space = 1
        self.dense1 = kl.Dense(128, activation="relu")
        self.dense2 = kl.Dense(128, activation="relu")
        self.out = kl.Dense(self.action_space)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)


    @tf.function
    def call(self, x):
        x = self.dense1(x)
        x = self.dense2(x)
        out = self.out(x)
        return out

関数編

OKパターン

OK
def worker(i):
        net = NeuralNetwork()
        arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
        print(net(arr))

with multiprocessing.Pool(processes=4) as pool:
        pool.map(worker, range(4))
標準出力
tf.Tensor([[0.645734]], shape=(1, 1), dtype=float32)
tf.Tensor([[-0.4795481]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.18732838]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.29219586]], shape=(1, 1), dtype=float32)

NGパターン

プロセスが止まる。

NG
def worker(i):
        net = NeuralNetwork()
        arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
        print(net(arr))

net = NeuralNetwork()
with multiprocessing.Pool(processes=4) as pool:
        pool.map(worker, range(4))

クラス編

オーソドックスな使い方

class Worker:
    def __init__(self):
        self.net = 1
    
    def run(self):
        print(self.net)

def job(instance):
    instance.run()
    return instance

with multiprocessing.Pool(processes=4) as pool:
        instances = [Worker() for i in range(4)]
        pool.map(job, instances)
標準出力
1111

OKパターン

あくまでも、子プロセス内でインスタンス化をします。

OK
class Strage():
    def __init__(self):
        self.net = NeuralNetwork()
    
    def run(self, arr):
        print(self.net(arr))

def worker(i):
    s = Strage()
    arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
    print(s.net(arr))

with multiprocessing.Pool(processes=4) as pool:
        pool.map(worker, range(4))
標準出力
tf.Tensor([[-0.24559836]], shape=(1, 1), dtype=float32)
tf.Tensor([[-0.16172571]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.19367361]], shape=(1, 1), dtype=float32)
tf.Tensor([[0.9469359]], shape=(1, 1), dtype=float32)

NGパターン

①エラーを出力

NG
class Worker:
    def __init__(self):
        self.net = NeuralNetwork()
    
    def run(self):
        arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
        print(self.net(arr))

def job(instance):
    instance.run()
    return instance

with multiprocessing.Pool(processes=4) as pool:
        instances = [Worker() for i in range(4)]
        pool.map(job, instances)
標準出力
TypeError: can't pickle weakref objects

②プロセスが止まる

NG
class Worker:
    def __init__(self):
        self.net = 1
    
    def run(self):
        print(self.net)
        net = NeuralNetwork()
        arr = tf.constant([[1 ,2 ,3]], dtype = tf.float32)
        print(net(arr))

def job(instance):
    instance.run()
    return instance

with multiprocessing.Pool(processes=4) as pool:
        instances = [Worker() for i in range(4)]
        pool.map(job, instances)

ProcessPoolExecutor編

import
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

selfは渡せない

OKパターン

OK
def worker(i):
    print(f'worker No.{str(i)}')

with ProcessPoolExecutor() as executor:
    for i in range(4):
        future = executor.submit(worker, i)
標準出力
worker No.3
worker No.0
worker No.2
worker No.1

NGパターン

プロセスが止まる。

NG
class worker:
    def __init__(self, i):
        self.i = i

    def tarin(self):
        print(f'worker No.{str(self.i)}')

with ProcessPoolExecutor() as executor:
    for i in range(4):
        env = worker(i)
        job = lambda: env.tarin()
        future = executor.submit(job)

例外ハンドリング

OKパターン

OK
def worker(i):
    try:
        net = NeuralNetwork()
        raise Exception('Something bad happened!')
    except Exception as e:
        return i, e
    return i, "never gets here"

with ProcessPoolExecutor() as executor:
    futures = [executor.submit(worker, i) for i in range(4)]
    for f in futures:
        i, e = f.result()
        print(f"worker_{i}: {e}")
標準出力
worker_0: Something bad happened!
worker_1: Something bad happened!
worker_2: Something bad happened!
worker_3: Something bad happened!

NGパターン

プロセスが止まる。

NG
def worker(i):
    try:
        net = NeuralNetwork()
        raise Exception('Something bad happened!')
    except Exception as e:
        return i, e
    print(e)
    return i, "never gets here"

with ProcessPoolExecutor() as executor:
    futures = [executor.submit(worker, i) for i in range(4)]
    for f in futures:
        i, e = f.result()
        print(f"worker_{i}: {e}")

Queue

ProcessPoolExecutorでは、multiprocessing.Manager().Queue()を使うと動く。

OKパターン

OK
def train(q):
    sleep(1)
    while not q.empty():
        print(q.get())

def worker(q, i):
    q.put(i)

with ProcessPoolExecutor() as executor:
    q = multiprocessing.Manager().Queue()

    futures = [executor.submit(worker, q, i) for i in range(4)]
    futures.append(executor.submit(train, q))
標準出力
0
1
2
3

NGパターン

プロセスが止まる。

NG
from queue import Queue

def train(q):
    sleep(1)
    while not q.empty():
        print(q.get())

def worker(q, i):
    q.put(i)

with ProcessPoolExecutor() as executor:
    q = Queue()
    futures = [executor.submit(worker, q, i) for i in range(4)]
    futures.append(executor.submit(train, q))
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?