Posted at

Flaskで多重実行(排他)を制御する方法

More than 1 year has passed since last update.


1. はじめに

前回の記事ではFlaskのアプリケーションで同時アクセスを有効にする方法について説明しました。しかし、処理によっては同時実行数を制限しなけばならない場合があるかと思います。


  • リソースを消費する重たい処理


    • たとえば巨大な画像ファイルを生成する処理など



  • もともと同時実行を考慮していない処理


    • たとえば機械学習におけるモデルの構築処理など



今回は特定の処理に対して多重実行を制御する方法について説明したいと思います。


1.1. ポイント


  • 多重実行の制御にはQueueを利用する

  • 多重制御の関数デコレータ@multiple_controlを作成する

  • 制限を掛ける処理に@multiple_controlデコレータを付与する


利用イメージ

@app.route('/world/<string:value>', methods=['GET'])

@multiple_control(singleQueue)
def world(value):

result = heavy_process(value)
return(make_response(result))



1.2. 検証環境


  • Python 2.7.13

  • Flask 0.12.2


2. ソースコード


multi_flask.py

# -*- coding: utf-8 -*-

from flask import Flask, send_file, make_response, send_from_directory, jsonify
from Queue import Queue
import time
import functools

# flask
app = Flask(__name__, static_folder=None)

# ★ポイント1
# 多重実行用のキュー
multipleQueue = Queue(maxsize=2)
singleQueue = Queue(maxsize=1)

# ★ポイント2
# 多重制御の関数デコレータ
def multiple_control(q):
def _multiple_control(func):
@functools.wraps(func)
def wrapper(*args,**kwargs):
q.put(time.time())
print("/// [start] critial zone")
result = func(*args,**kwargs)
print("/// [end] critial zone")
q.get()
q.task_done()
return result

return wrapper
return _multiple_control

# ★ポイント3
# you should limit execute this function
def heavy_process(data):
print("<business> : " + data)
time.sleep(10)
return "ok : " + data

# rest api
@app.route('/hello', methods=['GET'])
def hello():
print('hello')
return(make_response('hello'))

@app.route('/goodby', methods=['GET'])
def goodby():
print('goodby')
time.sleep(10)
return(make_response('goodby'))

# ★ポイント4
@app.route('/world/<string:value>', methods=['GET'])
@multiple_control(singleQueue)
def world(value):

result = heavy_process(value)
return(make_response(result))

# main
if __name__ == "__main__":
print(app.url_map)
app.run(host='localhost', port=3000, threaded=True)



★ポイント1

多重実行用にキュー(Queue)を定義します。同時実行を許可する数をmaxsizeに指定します。maxsize=1なら同時に1つしか実行されないため排他制御としても利用できます。なお、maxsizeのデフォルト値は0で無制限です。

○○○処理は同時実行をX、●●●処理は同時実行をYとしたい場合、それぞれキューを用意します。


★ポイント2

多重制御の関数デコレータの@multiple_controlを定義します。関数デコレータについては「FlaskでシンプルにContent-Typeをチェックする(@content_type)」を参照してください。

ポイントは目的の関数の呼び出し前後にQueue操作を行っていることです。これにより同時実行を制御しています。Queueについては「おまいらのthreading.Eventの使い方は間違っている(以下の引用)」が分かりやすいので参照してみてください。


Python標準モジュールにはqueueというのがあります。

これはマルチスレッドに対応したキューで、以下のようなことができます。


  • pop()を呼び出した時にキューが空ならスレッドを待機する

  • push()を呼び出した時にキューが満杯なら空きができるまで待機する



★ポイント3

重たい処理のサンプルです。とりあえず10秒スリープするようにしました。


★ポイント4

多重実行を制御したい処理に@multiple_control関数デコレータを付与します。デコレータの引数には多重実行用のQueueを指定します。

○○○処理は同時実行をX、●●●処理は同時実行をYとしたい場合、用意したそれぞれのキューを指定します。

サンプルではsingleQueueを指定していますが、multipleQueueに変えると2多重で動かすことができます。


3. 動作確認

Webブラウザの画面(タブ)を3つ開いてURL①、URL②、URL③にそれぞれアクセスします。URL①、URL②は同時実行ができないためURL②の処理はURL①の処理が完了した後に実行されますが、URL③は対象外のため、アクセスするとすぐに処理されるはずです。


実行結果

C:\tools\py>python multi_flask.py

Map([<Rule '/goodby' (HEAD, OPTIONS, GET) -> goodby>,
<Rule '/hello' (HEAD, OPTIONS, GET) -> hello>,
<Rule '/world/<value>' (HEAD, OPTIONS, GET) -> world>])
* Running on http://localhost:3000/ (Press CTRL+C to quit)
/// [start] critial zone
<business> : 123
hello
127.0.0.1 - - [29/Sep/2018 17:42:52] "GET /hello HTTP/1.1" 200 -
127.0.0.1 - - [29/Sep/2018 17:42:52] "GET /favicon.ico HTTP/1.1" 404 -
/// [end] critial zone
/// [start] critial zone1
27.0.0.1 - - [29/Sep/2018 17:42:59] "GET /world/123 HTTP/1.1" 200 -
<business> : 456
127.0.0.1 - - [29/Sep/2018 17:43:00] "GET /favicon.ico HTTP/1.1" 404 -
/// [end] critial zone
127.0.0.1 - - [29/Sep/2018 17:43:09] "GET /world/456 HTTP/1.1" 200 -
127.0.0.1 - - [29/Sep/2018 17:43:10] "GET /favicon.ico HTTP/1.1" 404 -

確かにURL③の/helloは制限されずにURL①の処理中に実行されていることが分かります。


4. さいごに

今回は特定の処理に対して多重実行を制御する方法について説明しました。

サンプルではpython標準のQueueを利用しているため、プロセスを跨いだ多重実行は制御できません。複数プロセスや別サーバ等でも多重実行を制御したい場合は外部のキューサービスを利用するようにしてください。