はじめに
またまた自分用メモ。
↓みたいので受信待機するときに、リクエスト元とかメッセージの種類ごとにプロセスを分けたくて書いた。
ソースコード
main.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import procs
workers = {}
source = procs.Source()
try:
while True:
# Sourceからpoll
data = source.poll()
key = data['key']
# keyに対応するWorkerが無ければ生成
if key not in workers:
workers[key] = procs.Worker()
# workerに処理を委譲
workers[key].delegate(data)
finally:
# KeyboardInterrupt等で終了したらWorkerをterminate
source.terminate()
[ w.terminate() for _,w in workers ]
procs.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import abc
import multiprocessing
class Proc(metaclass=abc.ABCMeta):
def __init__(self):
self._stop = multiprocessing.Event()
self._queue = multiprocessing.SimpleQueue()
self._process = multiprocessing.Process(target=self._run)
self._process.start()
def terminate(self):
self._stop.set()
self._process.join()
self._process.terminate()
def _run(self):
while not self._stop.is_set():
self._do()
@abc.abstractmethod
def _do(self, **kwargs):
pass
class Source(Proc):
def _do(self, **kwargs):
# メッセージの受信とか
data = { 'key' : 'some-data' }
self._queue.put(data)
def poll(self):
return self._queue.get()
class Worker(Proc):
def _do(self, **kwargs):
data = self._queue.get()
# 受け取ったデータを処理
def delegate(self, data):
return self._queue.put(data)