LoginSignup
0
0

More than 3 years have passed since last update.

単純な並列処理③生産消費ともに時間がかかる時 (Python; multiprocessing.Queue)

Last updated at Posted at 2021-03-02

概要

生産も消費も重い処理を並列するための簡素な実装をする。

multicoreを活用したいので、multiprocessingを用いる。

内容

注意点

  • 終了検出に、カスタムクラスを使うことにした。(cf. 以前書いた記事)
  • マネジメントのためにクラスを使うことにした。

環境

macOS Catalina
Python 3.7.0

準備

SingleQueueManagerと終了用クラス(DoneSignal)の定義

queue.py
from multiprocessing import Process, Queue

class SingleQueueManager:
    class DoneSignal: pass

    def __init__(self, verbose=False):
        self.verbose = verbose
        self.queue = Queue()
        self.system_queue = Queue()
        self.readers = []
        self.writers = []
        self.add_finisher()
  • self.verbose: 途中経過を表示するか
  • self.queue:データを扱うためのqueue
  • self.system_queue:プロセス終了管理のためのqueue
  • self.readers:self.queueに溜まったものを消費するProcessのリスト
  • self.writers:self.queueに溜めるProcessのリスト
  • self.add_finisher():終了判定をするProcess(finisher)を定義する

self.add_readerの定義

queue.py
class SingleQueueManager:
    ...
    def add_reader(self, reader_function):
        target = self._wrap_reader_proc(reader_function, self.queue)
        args = (self.queue,)
        self.readers.append(Process(target=target, args=args))

    def _wrap_reader_proc(self, reader_function, queue):
        def tmp(queue):
            reader_function(queue)
        return tmp

reader_function(queue)を定義すれば、
self.add_reader(reader_function)と渡せば大丈夫なようにした。

self.add_writerの定義

queue.py
class SingleQueueManager:
    ...
    def add_writer(self, writer_function):
        name = 'write' + str(len(self.writers) + 1)
        target = self._wrap_writer_proc(writer_function, name, self.queue, self.system_queue)
        args = (self.queue,)
        self.writers.append(Process(target=target, args=args))

    def _wrap_writer_proc(self, writer_function, name, queue, system_queue):
        def tmp(queue):
            writer_function(queue)
            self.print(name, 'finished')
            system_queue.put('DONE')
        return tmp

writer_function(queue)を定義すれば、
self.add_writer(writer_function)と渡せば大丈夫なようにした。

wrapすることで、system_queue.put('DONE')を自然と実装できている。

self.add_element_readerの定義

実際には、readerは、要素に対して定義される方が自然だと思うので、
要素を受け取り処理する関数でadd_readerできるようにした。

queue.py
class SingleQueueManager:
    ...
    def add_element_reader(self, element_reader):
        def reader_proc(queue):
            while True:
                msg = queue.get()
                if (msg is self.__class__.DoneSignal):
                    break
                element_reader(msg)
        self.add_reader(reader_proc)

element_reader(msg)を定義すれば、
self.add_element_writer(element_reader)と渡せば大丈夫なようにした。

終了処理の定義

queue.py
class SingleQueueManager:
    ...
    def add_finisher(self):
        self.finisher = Process(
                target=self.finisher_proc,
                args=(self.queue, self.system_queue))

    def finisher_proc(self, queue, system_queue):
        done = 0
        while True:
            msg = system_queue.get()
            done += 1
            n_writers, n_readers = len(self.writers), len(self.readers)

            if done == n_writers:
                for i in range(n_readers):
                    queue.put(self.__class__.DoneSignal)
                break

system_queueself.writersの数だけ入力があったら終了なので、
self.queueself.readersの数だけ終了シグナルを入れる。

その他利便性のための実装

queue.py
class SingleQueueManager:
    ...
    def print(self, *args, **kwargs):
        if self.verbose:
            print(*args, **kwargs)

    def start(self):
        for reader in self.readers:
            reader.start()
        for writer in self.writers:
            writer.start()
        self.finisher.start()

    def join(self):
        for reader in self.readers:
            reader.join()
        for writer in self.writers:
            writer.join()
        self.finisher.join()

実行

queue.py
def my_element_reader(msg):
    print(msg)

def my_writer_proc(queue):
    import time
    import random
    for i in range(3):
        queue.put(i)
        time.sleep(random.randint(0, 5))


def main(verbose=False):
    m = SingleQueueManager(verbose=verbose)
    m.add_element_reader(my_element_reader)
    m.add_element_reader(my_element_reader)
    m.add_element_reader(my_element_reader)
    m.add_writer(my_writer_proc)
    m.add_writer(my_writer_proc)

    m.start()
    m.join()

if __name__=='__main__':
    main(verbose=True)
output
0
0
1
1
2
2
write2 finished
write1 finished

最終的なクラス定義

queue.py
class SingleQueueManager:
    class DoneSignal: pass

    def __init__(self, verbose=False):
        self.verbose = verbose
        self.queue = Queue()
        self.system_queue = Queue()
        self.readers = []
        self.writers = []
        self.add_finisher()

    def print(self, *args, **kwargs):
        if self.verbose:
            print(*args, **kwargs)

    def add_finisher(self):
        self.finisher = Process(
                target=self.finisher_proc,
                args=(self.queue, self.system_queue))

    def finisher_proc(self, queue, system_queue):
        done = 0
        while True:
            msg = system_queue.get()
            done += 1
            n_writers, n_readers = len(self.writers), len(self.readers)

            if done == n_writers:
                for i in range(n_readers):
                    queue.put(self.__class__.DoneSignal)
                break

    def add_reader(self, reader_function):
        target = self._wrap_reader_proc(reader_function, self.queue)
        args = (self.queue,)
        self.readers.append(Process(target=target, args=args))

    def _wrap_reader_proc(self, reader_function, queue):
        def tmp(queue):
            reader_function(queue)
        return tmp

    def add_writer(self, writer_function):
        name = 'write' + str(len(self.writers) + 1)
        target = self._wrap_writer_proc(writer_function, name, self.queue, self.system_queue)
        args = (self.queue,)
        self.writers.append(Process(target=target, args=args))

    def _wrap_writer_proc(self, writer_function, name, queue, system_queue):
        def tmp(queue):
            writer_function(queue)
            self.print(name, 'finished')
            system_queue.put('DONE')
        return tmp

    def add_element_reader(self, element_reader):
        def reader_proc(queue):
            while True:
                msg = queue.get()
                if (msg is self.__class__.DoneSignal):
                    break
                element_reader(msg)
        self.add_reader(reader_proc)

    def start(self):
        for reader in self.readers:
            reader.start()
        for writer in self.writers:
            writer.start()
        self.finisher.start()

    def join(self):
        for reader in self.readers:
            reader.join()
        for writer in self.writers:
            writer.join()
        self.finisher.join()

蛇足

書き終わった頃にmultiprocessing.Managerというものを見つけてしまったが、
これがあれば解決するのだろうか。

参考にさせていただいた本・頁

特になし
(cf. 前回記事)

感想

とりあえず動くものが作れて良かった。

今後

queueから取り出したものをまとめるものを後で作る。

追記:作りました。(https://qiita.com/yo314159265/items/c65fa68d55ea11ad4e6c)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0