Python
Python3
aiomysql
asyncio

aiomysqlを使ってみた

More than 1 year has passed since last update.

PyConJP2016で、最近のPythonにはasyncioというものがあるということを知りました。asyncioはI/O処理などプログラムに待ち時間が発生するときに待ち時間を効率的に使えるようにするもの(たぶん)と理解したので、MySQLへの問い合わせの高速化に使えるかなと考えました。asyncioの詳細については他の記事を探してください。

MySQLにクエリを実行させるとき、asyncioに対応したライブラリを使うときと使わないときとで、処理時間に違いがあるかを調べました。

asyncioに対応したMySQL接続をするライブラリとしてaiomysqlを使います。aiomysqlはPyMySQLをベースにしたライブラリみたいですが、もともとはMySQL-Connector-Pythonを使っていたので、比較対象はMySQL-Connector-Pythonです。


結論

結論からいうと、aiomysqlとMySQL-Connector-Pythonとで処理時間の大きな違いはみれませんでした……。asyncioを使うシチュエーションを間違えてるんでしょうか。MySQL内部のロックでひっかかってるんでしょうか。

この結果は、aiomysqlが遅いことを結論するものではなく、Spatialなクエリだと早くならないことを示しているだけです(しかしこのSELECTクエリでも、マルチプロセスで並列化すると早くなるんだけど……)。


データベース側

MySQLのSpatial Indexを使ったSQLを実行しています。使ったテーブルは次のようなものです。このテーブルに、日本の市区町村の境界ポリゴンデータ[1]が入っています。

create table if not exists {TABLE} ( 

code mediumint(5) not null,
name varchar(100) not null,
shape multipolygon not null,
center point,
primary key (code),
spatial key shape (shape)
) engine=MyISAM default charset=utf8;

実行したクエリは次のやつです。

select code from {TABLE} 

where st_contains(shape, geomfromtext(%s))


使ったプログラム

緯度経度を含んだTSVファイルを読み込んで、マッチしたエリアを出力するプログラムです。


aiomysqlを使うプログラム


asyncmatcher.py

# coding: utf-8

import sys
import csv
csv.field_size_limit(10000000000)
import asyncio
import aiomysql

TABLE = 'gxmlcity'
contains_sql = ('SELECT code from {table} '
'WHERE St_Contains(shape, GeomFromText(%s))').format(table=TABLE)

import time
def record_time(func):
def record(*args, **kwargs):
start = time.time()
ret = func(*args, **kwargs)
elapsed_time = time.time() - start
print('Elapsed time: {} [sec]'.format(elapsed_time), file=sys.stderr)
return record

def print_result(cols, result):
if result:
print(*(tuple(cols) + result[0]), sep='\t')
else:
print(*(tuple(cols) + ('No Match',)), sep='\t')
if len(result) > 1:
print(cols, result)

async def match(cur, lat, lon):
p_str = 'POINT({} {})'.format(lat, lon)
await cur.execute(contains_sql, (p_str,))
result = await cur.fetchall()
return result

async def iterate_to_match(cur, args):
for cols in csv.reader(args.infile, delimiter='\t'):
if cols[2] != 'None':
result = await match(cur, float(cols[2]), float(cols[3]))
print_result(cols, result)

async def match_areas(loop, args):
conn = await aiomysql.connect(user='root', password='', db=args.dbname, loop=loop, charset='utf8')
try:
cur = await conn.cursor()
await iterate_to_match(cur, args)
await cur.close()
finally:
conn.close()

def parse_args():
import argparse
parser = argparse.ArgumentParser(description='非同期にエリア照合をする')
parser.add_argument('--infile', type=argparse.FileType('r', encoding='utf-8'), default=sys.stdin)
parser.add_argument('--dbname', required=True, help='エリアDBのあるDB名')
return parser.parse_args()

@record_time
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(match_areas(loop, args))
loop.close()

if __name__ == '__main__':
args = parse_args()
main()



mysql-connector-pythonを使うプログラム


singlematcher.py

# coding: utf-8

import sys
import csv
csv.field_size_limit(10000000000)
import mysql.connector

TABLE = 'gxmlcity'
contains_sql = ('SELECT code from {table} '
'WHERE St_Contains(shape, GeomFromText(%s))').format(table=TABLE)

import time
def record_time(func):
def record(*args, **kwargs):
start = time.time()
ret = func(*args, **kwargs)
elapsed_time = time.time() - start
print('Elapsed time: {} [sec]'.format(elapsed_time), file=sys.stderr)
return record

def print_result(cols, result):
if result:
print(*(tuple(cols) + result[0]), sep='\t')
else:
print(*(tuple(cols) + ('No Match',)), sep='\t')
if len(result) > 1:
print(cols, result)

def match(cur, lat, lon):
p_str = 'POINT({} {})'.format(lat, lon)
cur.execute(contains_sql, (p_str,))
result = cur.fetchall()
return result

def iterate_to_match(cur, args):
for cols in csv.reader(args.infile, delimiter='\t'):
if cols[2] != 'None':
result = match(cur, float(cols[2]), float(cols[3]))
print_result(cols, result)

def match_areas(args):
conn = mysql.connector.connect(user='root', password='', db=args.dbname, charset='utf8')
try:
cur = conn.cursor()
iterate_to_match(cur, args)
cur.close()
finally:
conn.close()

def parse_args():
import argparse
parser = argparse.ArgumentParser(description='ふつうにエリア照合をする')
parser.add_argument('--infile', type=argparse.FileType('r', encoding='utf-8'), default=sys.stdin)
parser.add_argument('--dbname', required=True, help='エリアDBのあるDB名')
return parser.parse_args()

@record_time
def main():
match_areas(args)

if __name__ == '__main__':
args = parse_args()
main()



比較

aiomysqlを使った場合(並列にリクエストしていてほしい方)

time ( gzip -dc json_2014-08-01.txt.gz | head -n 1000 | python scripts/asyncmatcher.py --dbname reftest > /dev/null )

Elapsed time: 29.44952368736267 [sec]

real 0m29.581s
user 0m0.736s
sys 0m0.044s

mysql-connector-pythonを使った場合(ふつう)

$ time ( gzip -dc json_2014-08-01.txt.gz | head -n 1000 | python scripts/singlematcher.py --dbname reftest > /dev/null )

Elapsed time: 27.986697673797607 [sec]

real 0m28.183s
user 0m0.620s
sys 0m0.024s


まとめ

asyncioを使っても早くならない……。使うシチュエーションはこれであっているのでしょうか。


実験環境

比較をしているのにPythonのバージョンが異なっていたりします。MySQL-Connector-PythonがPython 3.5に対応していなかったことが原因です。


  • Debian 8.1

  • mysql Ver 14.14 Distrib 5.6.25, for Linux (x86_64) using EditLine wrapper


MySQL-Connector-Pythonを使うとき


  • Python 3.4.3 :: Continuum Analytics, Inc.

  • mysql-connector-python 2.0.4


aiomysqlを使うとき


  • Python 3.5.2

  • aiomysql 0.0.9


参考文献

[1] 森國泰平, 吉田光男, 岡部正幸, 梅村恭司. ツイート投稿位置推定のための単語フィルタリング手法. 情報処理学会論文誌 データベース. 2015, vol. 8, no. 4, p. 16–26.