LoginSignup
4
3

More than 5 years have passed since last update.

Tornadoのコルーチンを使ったジョブキュー実装例

Last updated at Posted at 2015-09-05

Tornadoのコルーチンでジョブ待ち行列を作った時のメモです。

WorkerQueueのputメソッドでジョブ(関数と引数)をキューに入れます。現在実行中のジョブが終了次第、キューにあるジョブが古いものから順に1つずつ実行されます。

Tornadoのバージョン4.2で実装されたQueueを使用しています。TornadoのキューはPython標準ライブラリの同期キュー(queue.Queue)に似ていますが、putやgetがtornado.concurrent.Futureを返す点が異なります。

filename
from concurrent.futures import ProcessPoolExecutor
import time

from tornado import ioloop, gen, process
from tornado.queues import Queue


class WorkerQueue(object):
    def __init__(self):
        self.queue = Queue()
        self.current_worker_id = None
        self.current_worker = None
        self.queued_ids = []
        self._dispatcher()

    def put(self, id_, func, args):
        worker = Worker(func, args)
        self.queued_ids.append(id_)
        self.queue.put_nowait((id_, worker))
        print("Put: {}".format(id_))

    def status(self, id_):
        if id_ in self.queued_ids:
            return "Queued"
        elif id_ == self.current_worker_id:
            return "Running"
        else:
            return "Ready"

    @gen.coroutine
    def _dispatcher(self):
        while 1:
            id_, worker = yield self.queue.get()
            self.queued_ids.remove(id_)
            self.current_worker_id = id_
            self.current_worker = worker
            print("Start: {}".format(id_))
            res = yield self.current_worker.execute()
            self.current_worker_id = None
            self.current_worker = None
            print("{} is {}.".format(id_, res))


class Worker(object):
    def __init__(self, func, args):
        self.func = func
        self.args = args

    @gen.coroutine
    def execute(self):
        with ProcessPoolExecutor(process.cpu_count()) as exec_:
            res = yield exec_.submit(self.func, *self.args)
        return res


def job(sec):
    time.sleep(sec)
    return "done"


@gen.coroutine
def run():
    q = WorkerQueue()
    q.put("Job1", job, (2,))
    q.put("Job2", job, (2,))
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(1)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))
    yield gen.sleep(2)
    print("Job1 <{}>".format(q.status("Job1")))
    print("Job2 <{}>".format(q.status("Job2")))


if __name__ == "__main__":
    ioloop.IOLoop.current().run_sync(run)

実行結果

Put: Job1
Put: Job2
Job1 <Queued>
Job2 <Queued>
Start: Job1
Job1 <Running>
Job2 <Queued>
Job1 is done
Start: Job2
Job1 <Ready>
Job2 <Running>
Job2 is done
Job1 <Ready>
Job2 <Ready>

(2015.11.7追記)
大容量データを扱う際にはメモリ使用量が問題になってくるので、キューの長さを限定してproducerを待たせるなどの工夫が必要です。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3