LoginSignup
1

More than 5 years have passed since last update.

リクエストを別プロセスで捌く

Last updated at Posted at 2017-02-18

はじめに

またまた自分用メモ。

↓みたいので受信待機するときに、リクエスト元とかメッセージの種類ごとにプロセスを分けたくて書いた。

ソースコード

main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import procs


workers = {}
source = procs.Source()

try:
    while True:

        # Sourceからpoll
        data = source.poll()
        key = data['key']

        # keyに対応するWorkerが無ければ生成
        if key not in workers:
            workers[key] = procs.Worker()

        # workerに処理を委譲
        workers[key].delegate(data)

finally:
    # KeyboardInterrupt等で終了したらWorkerをterminate
    source.terminate()
    [ w.terminate() for _,w in workers ]
procs.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import abc
import multiprocessing


class Proc(metaclass=abc.ABCMeta):

    def __init__(self):
        self._stop    = multiprocessing.Event()
        self._queue   = multiprocessing.SimpleQueue()
        self._process = multiprocessing.Process(target=self._run)
        self._process.start()

    def terminate(self):
        self._stop.set()
        self._process.join()
        self._process.terminate()

    def _run(self):
        while not self._stop.is_set():
            self._do()

    @abc.abstractmethod
    def _do(self, **kwargs):
        pass


class Source(Proc):

    def _do(self, **kwargs):
        # メッセージの受信とか
        data = { 'key' : 'some-data' }
        self._queue.put(data)

    def poll(self):
        return self._queue.get()


class Worker(Proc):

    def _do(self, **kwargs):
        data = self._queue.get()
        # 受け取ったデータを処理

    def delegate(self, data):
        return self._queue.put(data)

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1