LoginSignup
24
20

More than 5 years have passed since last update.

Flask + SQLAlchemy + multiprocessing

Last updated at Posted at 2017-12-20

はじめに

このエントリーは、 エキサイト Advent Calendar 2017 の 12/20 の記事です。

この記事は Flask + SQLAlchemy + multiprocessing を使用した際にハマった部分をまとめました。

なお、本記事内のコードの全体は GitHub 上に置かれています。
Pull Request 、編集リクエスト等大歓迎です。よろしくお願い致します。

環境

  • Ubuntu 16.04.3 LTS
  • Python 3.6.2
  • MySQL 5.7

multiprocessing

コード

multiprocessing.Pool を使用した並行処理のサンプルです。

def register_user_scores():
    users = User.get_users()
    p = Pool(os.cpu_count() if os.cpu_count() else 1)
    for user in users:
        p.apply_async(_calc_scores, (user['id'],), error_callback=_callback_error)
    p.close()
    p.join()


# private

def _calc_scores(user_id: int):
    with app.app_context():
        score = sum(j * user_id for j in range(10000))
        Score.create_score(user_id, score)


def _callback_error(e: Exception):
    with app.app_context():
        app.logger.error(e)

解説

Pool

Pool は、指定された数のプロセスをプーリングし、適宜空いているプロセスにタスクを割り振ってくれます。

Pool(os.cpu_count())

と指定することで、システムの CPU の数だけプロセスをプーリングしてくれます。

os.cpu_count() は CPU の数がカウントできなかった場合に None が返るため、注意が必要です。
また、似たような関数で multiprocessing.cpu_count() があり、こちらを使っているサンプルコードも多いのですが、
Python 3.4 以降は os.cpu_count() を使うほうが良さそうです。1

ワーカーの実行

ワーカーを実行するためのメソッドはいくつかありますが、よく使われるものを挙げます。2

Method Detail
apply() 複数の引数を伴って与えられたメソッドを呼び出す。結果が出るまでブロックする
apply_async() 複数の引数を伴って与えられたメソッドを非同期で呼び出す。 AsyncResult オブジェクトを返す
map() 組み込み関数 map() の並列版。単一引数のみ
map_async() map() の非同期版。 AsyncResult オブジェクトを返す

上のサンプルでは apply_async() を使用しています。
apply_async()map_async() などは AsyncResult.get() でワーカーの結果を受け取る事ができます。
ただし、ワーカーで発生した例外を受け取って処理をしたい場合は error_callback を指定してあげる方が扱いやすいです。

Session

sessionmaker() と scoped_session()

SQLAlchemy でセッションを生成する方法は scoped_session()sessionmaker() の2種類があります。

Type Detail
scoped_session() 複数回呼び出しても1つの共通のセッションを返す。セッションが明示的に破棄されるまで、同じセッションのレジストリを保持し続ける
sessionmaker() 呼び出す度に新しいセッションを生成する

並列で処理する場合は scoped_session() は使えないため、 sessionmaker() を使う必要があります。

session = sessionmaker(autocommit=False,
                       autoflush=True,
                       expire_on_commit=False,
                       bind=engine)

Flask-SQLAlchemy-Session を使用する

sessionmaker() を使用する場合、適切にセッションの管理を行う必要があります。
Flask を使用する場合は Flask-SQLAlchemy-Session を使うと適切に処理してくれます。

init 時に flask_scoped_session() を設定します。

app = Flask(__name__)
engine = create_engine(
    'mysql+pymysql://user:pass@localhost/test?charset=utf8mb4', encoding='utf-8')

flask_scoped_session(sessionmaker(
            autocommit=False,
            autoflush=True,
            expire_on_commit=False,
            bind=engine), app)

add や commit するときは current_session を使用します。


user = User(1, 'Scott')

current_session.add(user)
current_session.commit()

その他

Python3 + MySQL

SQLAlchemy のデフォルトのドライバが MySQL-Python のようなのですが、
こちらは Python3 に対応していないため、 Python3 で実装する場合は別のドライバをインストールする必要があります。

候補としては以下のものがあります。

Dialect Detail
mysqlclient-python MySQL-Python から fork された Python3 対応版
PyMySQL Python のみで実装。 MySQL-Python と完全互換
mysql-connector-python MySQL 公式

また、 MySQL の encoding を utf8mb4 にする場合は PyMySQL のみが対応しています。

特にこだわりがなければ PyMySQL がオススメです。

ちなみに、 SQLAlchemy の extras_require に PyMySQL が含まれているので、
setup.py 内で以下のように指定すると、一緒に PyMySQL もインストールできます。

setup.py

setup(
    install_requires=[
        'SQLAlchemy[pymysql]'
    ]
)

Connection String

engine = create_engine(
    'mysql+pymysql://user:pass@localhost/test?charset=utf8mb4', encoding='utf-8')

まとめ

  • Python 3.4 以降は os.cpu_count() の使用を推奨
  • 複数の引数を使う際は multiprocessing.Pool.apply_async() を使用する
  • apply_async() 使用時にワーカープロセスの Exception を取得したい場合は AsyncResult.get() を使用するか、 error_callback を指定
  • multiprocessing を使用する場合は sessionmaker() を使用する

全体のコードは GitHub にて。

参考

24
20
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
24
20