はじめに
またまた自分用メモ。
↓みたいので受信待機するときに、リクエスト元とかメッセージの種類ごとにプロセスを分けたくて書いた。
ソースコード
main.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import procs
workers = {}
source = procs.Source()
try:
    while True:
        # Sourceからpoll
        data = source.poll()
        key = data['key']
        # keyに対応するWorkerが無ければ生成
        if key not in workers:
            workers[key] = procs.Worker()
        # workerに処理を委譲
        workers[key].delegate(data)
finally:
    # KeyboardInterrupt等で終了したらWorkerをterminate
    source.terminate()
    [ w.terminate() for _,w in workers ]
procs.py
# !/usr/bin/env python
# -*- coding: utf-8 -*-
import abc
import multiprocessing
class Proc(metaclass=abc.ABCMeta):
    
    def __init__(self):
        self._stop    = multiprocessing.Event()
        self._queue   = multiprocessing.SimpleQueue()
        self._process = multiprocessing.Process(target=self._run)
        self._process.start()
    def terminate(self):
        self._stop.set()
        self._process.join()
        self._process.terminate()
    def _run(self):
        while not self._stop.is_set():
            self._do()
    @abc.abstractmethod
    def _do(self, **kwargs):
        pass
class Source(Proc):
    
    def _do(self, **kwargs):
        # メッセージの受信とか
        data = { 'key' : 'some-data' }
        self._queue.put(data)
    def poll(self):
        return self._queue.get()
        
class Worker(Proc):
    def _do(self, **kwargs):
        data = self._queue.get()
        # 受け取ったデータを処理
    def delegate(self, data):
        return self._queue.put(data)