概要
生産も消費も重い処理を並列するための簡素な実装をし、最終的な処理結果を得る。
multicoreを活用したいので、multiprocessingを用いる。
内容
注意点
以前定義したクラス定義を流用する。(前回記事)
環境
macOS Catalina
Python 3.7.0
変更
readersの出力を保持するself.outputs
を追加する。
queue.py
class SingleQueueManager:
...
def __init__(self, verbose=False):
self.verbose = verbose
self.queue = Queue()
self.system_queue = Queue()
self.outputs = Queue() # ←ここを追加した。
self.readers = []
self.writers = []
self.add_finisher()
...
def _wrap_reader_proc(self, reader_function, queue):
def tmp(queue):
return reader_function(queue) # ←ここを変更した。
return tmp
...
def add_element_reader(self, element_reader):
def reader_proc(queue, outputs=self.outputs):
while True:
msg = queue.get()
if (msg is self.__class__.DoneSignal):
break
outputs.put(element_reader(msg)) # ←ここを変更した。
self.add_reader(reader_proc)
...
def get_all_outputs(self): # ←この関数を追加した。
while not self.outputs.empty():
yield self.outputs.get()
実行
queue.py
def my_element_reader(msg):
return '*'*msg
def my_writer_proc(queue):
import time
import random
for i in range(3):
queue.put(i)
time.sleep(random.randint(0, 5))
def main():
m = SingleQueueManager()
m.add_element_reader(my_element_reader)
m.add_element_reader(my_element_reader)
m.add_element_reader(my_element_reader)
m.add_writer(my_writer_proc)
m.add_writer(my_writer_proc)
m.start()
m.join()
print(list(m.get_all_outputs()))
if __name__=='__main__':
main()
output
['', '', '*', '*', '**', '**']
参考にした頁・本
感想
- とりあえず動くものが作れて良かった。
- 車輪の再生産をしている気がする。
- 部分的に
multiprocessing
ではなく、threading
を使っても良いかもしれない。
今後
今後積み重ねる機会があれば、この先も実装したいと思う。