概要
生産も消費も重い処理を並列するための簡素な実装をする。
multicoreを活用したいので、multiprocessingを用いる。
内容
注意点
- 終了検出に、カスタムクラスを使うことにした。(cf. 以前書いた記事)
- マネジメントのためにクラスを使うことにした。
環境
macOS Catalina
Python 3.7.0
準備
SingleQueueManager
と終了用クラス(DoneSignal
)の定義
from multiprocessing import Process, Queue
class SingleQueueManager:
class DoneSignal: pass
def __init__(self, verbose=False):
self.verbose = verbose
self.queue = Queue()
self.system_queue = Queue()
self.readers = []
self.writers = []
self.add_finisher()
-
self.verbose
: 途中経過を表示するか -
self.queue
:データを扱うためのqueue -
self.system_queue
:プロセス終了管理のためのqueue -
self.readers
:self.queueに溜まったものを消費するProcessのリスト -
self.writers
:self.queueに溜めるProcessのリスト -
self.add_finisher()
:終了判定をするProcess(finisher
)を定義する
self.add_reader
の定義
class SingleQueueManager:
...
def add_reader(self, reader_function):
target = self._wrap_reader_proc(reader_function, self.queue)
args = (self.queue,)
self.readers.append(Process(target=target, args=args))
def _wrap_reader_proc(self, reader_function, queue):
def tmp(queue):
reader_function(queue)
return tmp
reader_function(queue)
を定義すれば、
self.add_reader(reader_function)
と渡せば大丈夫なようにした。
self.add_writer
の定義
class SingleQueueManager:
...
def add_writer(self, writer_function):
name = 'write' + str(len(self.writers) + 1)
target = self._wrap_writer_proc(writer_function, name, self.queue, self.system_queue)
args = (self.queue,)
self.writers.append(Process(target=target, args=args))
def _wrap_writer_proc(self, writer_function, name, queue, system_queue):
def tmp(queue):
writer_function(queue)
self.print(name, 'finished')
system_queue.put('DONE')
return tmp
writer_function(queue)
を定義すれば、
self.add_writer(writer_function)
と渡せば大丈夫なようにした。
wrapすることで、system_queue.put('DONE')
を自然と実装できている。
self.add_element_reader
の定義
実際には、readerは、要素に対して定義される方が自然だと思うので、
要素を受け取り処理する関数でadd_readerできるようにした。
class SingleQueueManager:
...
def add_element_reader(self, element_reader):
def reader_proc(queue):
while True:
msg = queue.get()
if (msg is self.__class__.DoneSignal):
break
element_reader(msg)
self.add_reader(reader_proc)
element_reader(msg)
を定義すれば、
self.add_element_writer(element_reader)
と渡せば大丈夫なようにした。
終了処理の定義
class SingleQueueManager:
...
def add_finisher(self):
self.finisher = Process(
target=self.finisher_proc,
args=(self.queue, self.system_queue))
def finisher_proc(self, queue, system_queue):
done = 0
while True:
msg = system_queue.get()
done += 1
n_writers, n_readers = len(self.writers), len(self.readers)
if done == n_writers:
for i in range(n_readers):
queue.put(self.__class__.DoneSignal)
break
system_queue
にself.writers
の数だけ入力があったら終了なので、
self.queue
にself.readers
の数だけ終了シグナルを入れる。
その他利便性のための実装
class SingleQueueManager:
...
def print(self, *args, **kwargs):
if self.verbose:
print(*args, **kwargs)
def start(self):
for reader in self.readers:
reader.start()
for writer in self.writers:
writer.start()
self.finisher.start()
def join(self):
for reader in self.readers:
reader.join()
for writer in self.writers:
writer.join()
self.finisher.join()
実行
def my_element_reader(msg):
print(msg)
def my_writer_proc(queue):
import time
import random
for i in range(3):
queue.put(i)
time.sleep(random.randint(0, 5))
def main(verbose=False):
m = SingleQueueManager(verbose=verbose)
m.add_element_reader(my_element_reader)
m.add_element_reader(my_element_reader)
m.add_element_reader(my_element_reader)
m.add_writer(my_writer_proc)
m.add_writer(my_writer_proc)
m.start()
m.join()
if __name__=='__main__':
main(verbose=True)
0
0
1
1
2
2
write2 finished
write1 finished
最終的なクラス定義
class SingleQueueManager:
class DoneSignal: pass
def __init__(self, verbose=False):
self.verbose = verbose
self.queue = Queue()
self.system_queue = Queue()
self.readers = []
self.writers = []
self.add_finisher()
def print(self, *args, **kwargs):
if self.verbose:
print(*args, **kwargs)
def add_finisher(self):
self.finisher = Process(
target=self.finisher_proc,
args=(self.queue, self.system_queue))
def finisher_proc(self, queue, system_queue):
done = 0
while True:
msg = system_queue.get()
done += 1
n_writers, n_readers = len(self.writers), len(self.readers)
if done == n_writers:
for i in range(n_readers):
queue.put(self.__class__.DoneSignal)
break
def add_reader(self, reader_function):
target = self._wrap_reader_proc(reader_function, self.queue)
args = (self.queue,)
self.readers.append(Process(target=target, args=args))
def _wrap_reader_proc(self, reader_function, queue):
def tmp(queue):
reader_function(queue)
return tmp
def add_writer(self, writer_function):
name = 'write' + str(len(self.writers) + 1)
target = self._wrap_writer_proc(writer_function, name, self.queue, self.system_queue)
args = (self.queue,)
self.writers.append(Process(target=target, args=args))
def _wrap_writer_proc(self, writer_function, name, queue, system_queue):
def tmp(queue):
writer_function(queue)
self.print(name, 'finished')
system_queue.put('DONE')
return tmp
def add_element_reader(self, element_reader):
def reader_proc(queue):
while True:
msg = queue.get()
if (msg is self.__class__.DoneSignal):
break
element_reader(msg)
self.add_reader(reader_proc)
def start(self):
for reader in self.readers:
reader.start()
for writer in self.writers:
writer.start()
self.finisher.start()
def join(self):
for reader in self.readers:
reader.join()
for writer in self.writers:
writer.join()
self.finisher.join()
蛇足
書き終わった頃にmultiprocessing.Managerというものを見つけてしまったが、
これがあれば解決するのだろうか。
参考にさせていただいた本・頁
特になし
(cf. 前回記事)
感想
とりあえず動くものが作れて良かった。
今後
queueから取り出したものをまとめるものを後で作る。
追記:作りました。(https://qiita.com/yo314159265/items/c65fa68d55ea11ad4e6c)