はじめに
このエントリーは、 エキサイト Advent Calendar 2017 の 12/20 の記事です。
この記事は Flask + SQLAlchemy + multiprocessing を使用した際にハマった部分をまとめました。
なお、本記事内のコードの全体は GitHub 上に置かれています。
Pull Request 、編集リクエスト等大歓迎です。よろしくお願い致します。
環境
- Ubuntu 16.04.3 LTS
- Python 3.6.2
- MySQL 5.7
multiprocessing
コード
multiprocessing.Pool を使用した並行処理のサンプルです。
def register_user_scores():
users = User.get_users()
p = Pool(os.cpu_count() if os.cpu_count() else 1)
for user in users:
p.apply_async(_calc_scores, (user['id'],), error_callback=_callback_error)
p.close()
p.join()
# private
def _calc_scores(user_id: int):
with app.app_context():
score = sum(j * user_id for j in range(10000))
Score.create_score(user_id, score)
def _callback_error(e: Exception):
with app.app_context():
app.logger.error(e)
解説
Pool
Pool は、指定された数のプロセスをプーリングし、適宜空いているプロセスにタスクを割り振ってくれます。
Pool(os.cpu_count())
と指定することで、システムの CPU の数だけプロセスをプーリングしてくれます。
os.cpu_count()
は CPU の数がカウントできなかった場合に None
が返るため、注意が必要です。
また、似たような関数で multiprocessing.cpu_count()
があり、こちらを使っているサンプルコードも多いのですが、
Python 3.4 以降は os.cpu_count()
を使うほうが良さそうです。1
ワーカーの実行
ワーカーを実行するためのメソッドはいくつかありますが、よく使われるものを挙げます。2
Method | Detail |
---|---|
apply() | 複数の引数を伴って与えられたメソッドを呼び出す。結果が出るまでブロックする |
apply_async() | 複数の引数を伴って与えられたメソッドを非同期で呼び出す。 AsyncResult オブジェクトを返す |
map() | 組み込み関数 map() の並列版。単一引数のみ |
map_async() |
map() の非同期版。 AsyncResult オブジェクトを返す |
上のサンプルでは apply_async()
を使用しています。
apply_async()
、 map_async()
などは AsyncResult.get()
でワーカーの結果を受け取る事ができます。
ただし、ワーカーで発生した例外を受け取って処理をしたい場合は error_callback
を指定してあげる方が扱いやすいです。
Session
sessionmaker() と scoped_session()
SQLAlchemy でセッションを生成する方法は scoped_session()
と sessionmaker()
の2種類があります。
Type | Detail |
---|---|
scoped_session() | 複数回呼び出しても1つの共通のセッションを返す。セッションが明示的に破棄されるまで、同じセッションのレジストリを保持し続ける |
sessionmaker() | 呼び出す度に新しいセッションを生成する |
並列で処理する場合は scoped_session()
は使えないため、 sessionmaker()
を使う必要があります。
session = sessionmaker(autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=engine)
Flask-SQLAlchemy-Session を使用する
sessionmaker()
を使用する場合、適切にセッションの管理を行う必要があります。
Flask を使用する場合は Flask-SQLAlchemy-Session を使うと適切に処理してくれます。
init 時に flask_scoped_session()
を設定します。
app = Flask(__name__)
engine = create_engine(
'mysql+pymysql://user:pass@localhost/test?charset=utf8mb4', encoding='utf-8')
flask_scoped_session(sessionmaker(
autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=engine), app)
add や commit するときは current_session
を使用します。
user = User(1, 'Scott')
current_session.add(user)
current_session.commit()
その他
Python3 + MySQL
SQLAlchemy のデフォルトのドライバが MySQL-Python のようなのですが、
こちらは Python3 に対応していないため、 Python3 で実装する場合は別のドライバをインストールする必要があります。
候補としては以下のものがあります。
Dialect | Detail |
---|---|
mysqlclient-python | MySQL-Python から fork された Python3 対応版 |
PyMySQL | Python のみで実装。 MySQL-Python と完全互換 |
mysql-connector-python | MySQL 公式 |
また、 MySQL の encoding を utf8mb4
にする場合は PyMySQL のみが対応しています。
特にこだわりがなければ PyMySQL がオススメです。
ちなみに、 SQLAlchemy の extras_require
に PyMySQL が含まれているので、
setup.py 内で以下のように指定すると、一緒に PyMySQL もインストールできます。
setup.py
setup(
install_requires=[
'SQLAlchemy[pymysql]'
]
)
Connection String
engine = create_engine(
'mysql+pymysql://user:pass@localhost/test?charset=utf8mb4', encoding='utf-8')
まとめ
- Python 3.4 以降は
os.cpu_count()
の使用を推奨 - 複数の引数を使う際は
multiprocessing.Pool.apply_async()
を使用する -
apply_async()
使用時にワーカープロセスの Exception を取得したい場合はAsyncResult.get()
を使用するか、error_callback
を指定 - multiprocessing を使用する場合は
sessionmaker()
を使用する
全体のコードは GitHub にて。
参考
- [Python] SQLAlchemyを頑張って高速化
- 17.2.2.9. Process Pools
- Python 3.3 までの multiprocessing.cpu_count() が「ぎょえ」で 3.4 以降も継承されてるハナシ
- Contextual/Thread-local Sessions
- Understanding Python SQLAlchemy’s Session
- Flask-SQLAlchemy-Session