Edited at

[Python] SQLAlchemyを頑張って高速化

More than 1 year has passed since last update.


はじめに

pythonからMySQLを叩く際,何を使っていますか?SQLAlchemy,Django,peeweeあたりを使っている方が多いのではないでしょうか.私は色々使ってみて結局SQLAlchemyに落ち着いていますが,何千万,何億オーダのデータになると処理が遅く非常にやっかいです.

なので今回は(大きなデータにsql使うなとか高速処理にpython使うなという話は一旦置いておいて),SQLAlchemyを使ったデータ処理をいかに高速にするか,何項目かに分けて備忘録を残しておきます.

SQLAlchemyを~と書いてありますが,SQLAlchemyを使ってDBにデータをINSERT,SELECTする際のpython高速化TIPSごった煮です.本当は記事を分けたほうが良いのですが,自分で見返す際にまとまっているほうが嬉しいのでこうしました.

なお,今回使用するコードは全てgithubにおいてあるので,よければそちらもごらんください.


実行環境


  • Ubuntu 16.04.1

  • Intel(R) Core(TM) i7-6700K CPU @ 4.00GHz

  • 仮想8コア、物理4コア

  • Python 3.5.2


DB構造

今回のテストで使用するDB構造は以下のようになります.


user table

id
name
age
team_id
created_at
updated_at

1
John1
12
4
1486030539
1486030539

2
Kevin2
54
12
1486030539
1486030539

...


team table

id
name
created_at
updated_at

1
A
1486030539
1486030539

2
B
1486030539
1486030539

...

userはteamの外部キーを持っています.


INSERT編

まずはデータをteam, userテーブルに登録します.

team_list = ['A', 'B',...,'Z']

user_list = [('John', 14, 'C'), ...]

というデータを持っているところからスタートします.

team数はA~Zの26,user数は10万です.


0. 1つずつSQLAlchemy Table objectを作って1つずつInsert

さすがにこんなやり方は最初からしませんが,比較のためにここからスタートします.

class Base(object):

def __iter__(self):
return iter(self.__dict__.items())

def dict(self):
return self.__dict__

@classmethod
def query(cls):
if not hasattr(cls, "_query"):
cls._query = database.session().query_property()
return cls._query

class User(Base):

def __repr__(self):
return '<User %r>' % (self.id)

def __init__(self, name, age, team):
self.name = name
self.age = age
self.team = team
self.updated_at = time.time()
self.created_at = time.time()

@staticmethod
def create_dict(name, age, team_id):
return {'name': name, 'age': age, 'team_id': team_id,
'updated_at': time.time(), 'created_at': time.time()}

signup_user = Table('user', metadata,
Column('id', BigInteger, nullable=False,
primary_key=True, autoincrement=True),
Column('name', Unicode(255), nullable=False),
Column('age', Integer, nullable=False),
Column('team_id', ForeignKey('team.id'), nullable=False),
Column('updated_at', BigInteger, nullable=False),
Column('created_at', BigInteger, nullable=False))

mapper(User, signup_user,
properties={
'id': signup_user.c.id,
'name': signup_user.c.name,
'age': signup_user.c.age,
'team': relationship(Team),
'updated_at': signup_user.c.updated_at,
'created_at': signup_user.c.created_at
})

User.__table__ = signup_user

のようにTableオブジェクトを作成しておきます.DB側セッションの準備はこんなかんじ

from sqlalchemy import create_engine, MetaData

from sqlalchemy.orm import scoped_session, sessionmaker

metadata = MetaData()
engine = create_engine(uri, encoding='utf-8', pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
expire_on_commit=False,
bind=_engine))

metadata.create_all(bind=_engine)

teamを登録する部分は省略しますが,

def insert_user(name, age, team):

u = User(name, age, team)
session.add(u)
session.commit()

teams = Team.query().all()
# team_dict = {'A': <Team1>, 'B': <Team2>, ...}
team_dict = {t.name: t for t in teams}
[insert_user(d[0], d[1], team_dict[d[2]]) for d in data_list]

のように1つずつUserを作成し,add, commitしていきます.


1. 複数まとめてInsert.

明らかに0.は効率が悪いので,複数レコードを一気に追加できるadd_allを使います.

users = [User(d[0], d[1], team_dict[d[2]]) for d in data_list]

database.session().add_all(users)
database.session().commit()

コードとしてもだいぶスッキリしました.


2. bulk insert

SQLAlchemyのORMにはbulk_save_objectsがあります .

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class UserTable(Base):
__tablename__ = "user"
id = Column(BigInteger, nullable=False,
primary_key=True, autoincrement=True)
name = Column(Unicode(255), nullable=False)
age = Column(Integer, nullable=False)
team_id = Column(BigInteger, nullable=False)
updated_at = Column(BigInteger, nullable=False)
created_at = Column(BigInteger, nullable=False)

のようにTableオブジェクトを作り,

session.bulk_save_objects(

[UserTable(name=d[0],
age=d[1],
team_id=team_dict[d[2]].id,
updated_at = time.time(),
created_at = time.time())
for d in data_list], return_defaults=True)
session.commit()

のように挿入します.外部キー等の扱いが変わっているのがわかります.


3. sqlalchemy.coreを使う

ORMは使いやすく,コードも短くできる上,rollbackなどの細かい制御を裏側でやってくれる等様々な利点を有していますが,クエリ生成のオーバーヘッドが大きく高速化を考える時にはネックになります. sqlalchemy.coreを使うと手間は増えますが,ORMを使うよりも高速にクエリを発行することができます.

users = [{'name':d[0], 'age': d[1], 'team_id': team_dict[d[2]]['id'],

'updated_at': time.time(), 'created_at': time.time()} for d in data_list]
session.execute(User.__table__.insert(), users)
session.commit()


比較

0~3までをまとめて速度比較してみましょう.

SqlAlchemy ORM: elapsed time of insertion: 62.205 [sec]

SqlAlchemy ORM multi insert: elapsed time of insertion: 1.421 [sec]
SqlAlchemy ORM bulk insert: elapsed time of insertion: 1.170 [sec]
SqlAlchemy core bulk insert: elapsed time of insertion: 0.261 [sec]

sqlalchemy.core... 圧倒的...!! bulk insertと比べても5,6倍速くなっています.

ORMを使うにしてもbulk insertを使うのが良さそうですね.


おまけ. bulk insertの分割は不要?

bulk insertの際,ある程度大きいデータを入れる際は分割したほうが速い,みたいな話をどこかで聞いたことある気がしたのでsqlalchemy.coreを使って実際に試してみました(100万件).

SqlAlchemy core bulk insert (10): elapsed time of insertion: 51.066 [sec]

SqlAlchemy core bulk insert (20): elapsed time of insertion: 37.913 [sec]
SqlAlchemy core bulk insert (50): elapsed time of insertion: 27.323 [sec]
SqlAlchemy core bulk insert (100): elapsed time of insertion: 23.954 [sec]
SqlAlchemy core bulk insert (150): elapsed time of insertion: 22.607 [sec]
SqlAlchemy core bulk insert (200): elapsed time of insertion: 21.853 [sec]
SqlAlchemy core bulk insert (500): elapsed time of insertion: 20.139 [sec]
SqlAlchemy core bulk insert (750): elapsed time of insertion: 19.399 [sec]
SqlAlchemy core bulk insert (1000): elapsed time of insertion: 19.362 [sec]
SqlAlchemy core bulk insert (5000): elapsed time of insertion: 19.493 [sec]
SqlAlchemy core bulk insert (10000): elapsed time of insertion: 19.387 [sec]
SqlAlchemy core bulk insert (20000): elapsed time of insertion: 18.983 [sec]
SqlAlchemy core bulk insert (50000): elapsed time of insertion: 19.641 [sec]
SqlAlchemy core bulk insert (100000): elapsed time of insertion: 19.022 [sec]
SqlAlchemy core bulk insert (500000): elapsed time of insertion: 19.837 [sec]

うーん,どうやら気のせいだったみたいです...


SELECT編

先ほど登録したteam, userデータを使います.

team数はA~Zの26,user数は100万です.

まずは単純に,年齢高い順に[{'id': 10, 'name': 'John', 'age': 34, 'team': 'K'}, {...}, ...]

のような辞書リストをlimit(今回は100)件返す処理をつくります.


0. MySQL側のチューニング(Index等)

当たり前ですがまずはしっかりMySQL側のチューニングをしましょう.

今回の例ではuser.ageにindexを貼るだけで約10倍処理が高速になります.

チューニングに関してはすでに様々な記事があるので今回は割愛します.


1. ORMを使用

users = User.query().order_by(desc(User.age)).limit(limit).all()

result = [{'id': u.id, 'name': u.name, 'age': u.age, 'team': u.team.name}
for u in users]

短い! いいですねー.外部キーで繋がっているテーブルをまたがるような場合は特にコードが短くなるのでありがたいです.


2. sqlalchemy.coreを使用

sqlalchemyのselect関数を用います.

from sqlalchemy import select, desc, and_, func

u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
result = [{'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}
for r in session.execute(sel)]

だいぶ長くなってしまいました.


3. multiprocessing併用

この例ではsql処理と直接は関係ないですが,データ数が増えてきた場合,並列処理をかませると劇的に処理速度が改善します.使い方については以前記事を書いたのでよろしければそちらをごらんください.

from multiprocessing import Pool

import multiprocessing as multi

def get_user(r):
return {'id': r[0], 'name': r[1], 'age': r[2], 'team': r[3]}

def select_user_multi():
u = User.__table__.c
t = Team.__table__.c
sel = select([u.id, u.name, u.age, t.name])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.order_by(desc(u.age)).limit(limit)
p = Pool(multi.cpu_count())
result = p.map(get_user, session.execute(sel))
p.close()
return result


比較

sqlAlchemy ORM: elapsed time: 0.3291 [sec]

sqlAlchemy core: elapsed time: 0.5837 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0096 [sec]

あれ...遅くなってしまいました.クエリが単純だったからかもしれません.multiprocessingをかませた結果はだいぶ速くなりました.


おまけ1. count

続いて,teamごとのuser数の集計をしてみます.少しクエリを複雑にして,それぞれのチームに属している50歳未満のユーザ数をカウントし{'A': 1400, 'B': 2122, ....}のようなデータを作成する処理をおこないます.

なお,カウント処理の高速化自体はこちら(InnoDBでCOUNT()を扱う際の注意事項あれこれ。)が大変参考になります


ORM

def select_teams_orm():

return Team.query().all()

teams = select_teams_orm()
counts = {tm.name: User.query()\
.filter(and_(User.team == tm, User.age < 50)).count()\
for tm in teams}

相変わらず短い!


sqlalchemy.core

def select_teams_core():

t = Team.__table__.c
sel = select([t.id, t.name]).select_from(Team.__table__)
res = session.execute(sel)
result = [{'id': r[0], 'name': r[1]} for r in res]
return result

teams = select_teams_core()
sess = lambda sel: session.execute(sel)
u = User.__table__.c
counts = {tm['name']: sess(
select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == tm['id'], u.age < 50))\
).scalar() for tm in teams}

わかりにくい!1ループで済ませるために上のような書き方をしていますが,分解すると

def create_query(team_id): # クエリ作成 team idとuser ageで絞り込み

u = User.__table__.c
return select([func.count()]).select_from(User.__table__)\
.where(add_(u.team_id == team_id, u.age < 50))

queries = [create_query(tm['id']) for tm in teams] # チームごとにクエリ作成
counts = [session.execute(q) for q in queries] # クエリ発行
result = [{tm['name']: c.scalar()} for tm,c in zip(teams,counts)] # {'A': count, ...} 辞書作成

といった感じです.まただいぶコード量が増えてしまいました.


multiprocessing

今度はSELECTクエリを投げるところから並列化してみます.

注意点は,JoblibのParallelizeは使えない,scoped_sessionで作成したsessionは並列処理内では使えない,の二点です.

session = sessionmaker(autocommit=False, # scoped_session はダメ

autoflush=True,
expire_on_commit=False,
bind=_engine)

def count_user(team):
u = User.__table__.c
sel = select([func.count()]).select_from(User.__table__)\
.where(and_(u.team_id == team['id'], u.age < 50))
result = session.execute(sel).scalar()
return result

def count_user_multi():
teams = select_teams_core()
p = Pool(multi.cpu_count())
counts = p.map(count_user, teams)
counts = {t['name']: c for t, c in zip(teams, counts)}
p.close()
session.close()
return counts


クエリの改善

今回,チームごとにカウントするクエリを投げていますが,そもそも

SELECT DISTINCT(team.id), team.name, COUNT(*)

FROM user JOIN team ON team.id = user.team_id
WHERE user.age < 50 GROUP BY team.id;

とすれば投げるクエリは一回でいいので修正します.

u = User.__table__.c

t = User.__table__.c
sel = select([func.distinct(t.id), t.name, func.count()])\
.select_from(User.__table__.join(Team.__table__, t.id == u.team_id))\
.where(u.age < 50).group_by(t.id)
counts = {r[1]: r[2] for r in database.session().execute(sel)}


比較

sqlAlchemy ORM: elapsed time: 0.9522 [sec]

sqlAlchemy core: elapsed time: 0.7772 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0733 [sec]
--
sqlAlchemy core with fixed query: elapsed time: 0.2207 [sec]

今度はcoreを使うほうがORMより若干速くなりました.並列化すると大体10倍程度速くなるようです.また,クエリを改良し一回投げれば済むようにした場合,元のクエリよりは三倍強速くなったものの,並列化したものには遠く及びませんでした.


おまけ2. もう少し複雑なクエリ

先ほどはcoreを使ったほうが遅くなってしまいましたが,チームごとに年齢が高い順に100名取得し,以下のようなデータを返するような処理を行います.

[{'id': 1, 'name': 'A', 'users': [{'id': 400, 'name': 'Kevin', 'age': 32}, {...}, ...]},

{'id': 2, 'name': 'B', 'users': [...]},
...]

コードは省きますが(詳しくはgithubの方を参照してください),結果は以下のようになりました.

sqlAlchemy ORM: elapsed time: 0.9782 [sec]

sqlAlchemy core: elapsed time: 0.8864 [sec]
sqlAlchemy core with multiprocessing: elapsed time: 0.0807 [sec]

この例の場合,ORMとcoreの差が0.1sec程度なのでそこまで気にする必要はないかもしれませんが,100万件でこれくらいの差なので,もっとクエリが複雑になる・件数が多くなる場合はsqlalchemy.coreを使う・並列化するのが有用そうです.


おわりに

以上,INSERT編,SELECT編と見てきましたが,まとめると,


  • まずはsql,クエリのチューニング

  • Insertはコード量もそこまで増えず,速度は5,6倍になるためcoreを使うのがよさそう.

  • bulk insertは分割挿入しても速くならない

  • SelectはORMからcoreに移行すると10%近く高速化できるが,コード量が増大する.

  • Selectを並列化するとかなりの高速化が可能(今回は10倍近い結果がでましたが,この辺りはのマシン性能により大きく変わるとおもわれます)

でした.ORMの良さはほぼ殺してしまうし,もはや別の言語を使えばいいという話もありますが,pythonを使わざるをえない時はデータサイズやアプリケーションの内容に合わせて使い分けられると良さそうですね.


参考文献