はじめに
- FlaskでMySQL操作をしていて、ハマったのでメモしておきます。
実現できること
その1
- ユーザー一覧の取得(戻り値:Dict型)
api
http://127.0.0.1/users
MySQLへのクエリ―
SELECT * FROM users;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(dictionary=True)
cursor.execute(stmt)
cursor.fetchall()
期待通りのデータ(戻り値)
[
{'id': 1, 'name': '日本 太郎', 'age': '22', 'gender': '男性'},
:
]
その2
- ユーザー単体の取得(戻り値:tuple型)
api
http://127.0.0.1/users/1
MySQLへのクエリ―
SELECT * FROM users WHERE id = ?;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(prepared=True)
cursor.execute(stmt, id)
cursor.fetchall()
期待通りのデータ(戻り値)
[(1, '日本 太郎', '22', '男性')]
実現したいこと
- 上記の合わせ技(戻り値:辞書型,プリペアードステートメント使用)
api
http://127.0.0.1/users/1
MySQLへのクエリ―
SELECT * FROM users WHERE id = ?;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(dictionary=True, prepared=True) # Error
cursor.execute(stmt, id)
cursor.fetchall()
期待するデータ(戻り値)
[{'id': 1, 'name': '日本 太郎', 'age': '22', 'gender': '男性'}]
なんで?
KeyError
[2018-10-07 14:18:07,670] ERROR in app: Exception on /users/1 [GET]
Traceback (most recent call last):
File "c:\pystudy\lesson-flask\step5\lib\site-packages\mysql\connector\connection.py", line 901, in cursor
return (types[cursor_type])(self)
KeyError: 20
ValueError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 2292, in wsgi_app
response = self.full_dispatch_request()
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1815, in full_dispatch_request
rv = self.handle_user_exception(e)
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1718, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\_compat.py", line 35, in reraise
raise value
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1813, in full_dispatch_request
rv = self.dispatch_request()
File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1799, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "C:\pystudy\lesson-flask\step5\index.py", line 104, in user
user = db.select(stmt, id, prepared=True)
File "C:\pystudy\lesson-flask\step5\DataStore\MySQL.py", line 30, in select
cursor = self.dbh.cursor(dictionary=True, prepared=True)
File "c:\pystudy\lesson-flask\step5\lib\site-packages\mysql\connector\connection.py", line 905, in cursor
', '.join([args[i] for i in range(5)
ValueError: Cursor not available with given criteria: dictionary, prepared
解決策?
>>> import mysql.connector as mc
>>> dns = {'user': 'mysql', 'password': 'NewPassword', 'host': 'localhost', 'database': 'kaggle'}
>>> dbh = mc.connect(**dns)
>>> cursor = dbh.cursor(prepared=True)
>>> id = (1,)
>>> stmt = 'SELECT * FROM users WHERE id=?'
>>> cursor.execute(stmt, id)
>>> keys = cursor.column_names
>>> rows = cursor.fetchall()
>>> data = []
>>> for row in rows:
... data.append(dict(zip(keys, row)))
...
>>> cursor.close()
>>> dbh.close()
>>> data
[{'id': 1, 'name': '日本 花子', 'age': '32', 'gender': '女'}]
勉強がてらソースファイルを眺めてみる…
mysql\connector\abstracts
class MySQLCursorAbstract(object):
def __init__(self):
self._description = None
self._rowcount = -1
self._last_insert_id = None
self._warnings = None
self.arraysize = 1
@abstractmethod
def callproc(self, procname, args=()):
pass
@abstractmethod
def close(self):
pass
@abstractmethod
def execute(self, operation, params=(), multi=False):
pass
@abstractmethod
def executemany(self, operation, seq_params):
pass
@abstractmethod
def fetchone(self):
pass
@abstractmethod
def fetchmany(self, size=1):
pass
@abstractmethod
def fetchall(self):
pass
def nextset(self):
pass
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, column=None):
pass
def reset(self, free=True):
pass
@abstractproperty
def description(self):
return self._description
@abstractproperty
def rowcount(self):
return self._rowcount
@abstractproperty
def lastrowid(self):
return self._last_insert_id
def fetchwarnings(self):
return self._warnings
mysql\connector\connection
class MySQLConnection(MySQLConnectionAbstract):
# :
# snip
# :
def cursor(self, buffered=None, raw=None, prepared=None, cursor_class=None,
dictionary=None, named_tuple=None):
self.handle_unread_result()
if not self.is_connected():
raise errors.OperationalError("MySQL Connection not available.")
if cursor_class is not None:
if not issubclass(cursor_class, CursorBase):
raise errors.ProgrammingError(
"Cursor class needs be to subclass of cursor.CursorBase")
return (cursor_class)(self)
buffered = buffered if buffered is not None else self._buffered
raw = raw if raw is not None else self._raw
cursor_type = 0
if buffered is True:
cursor_type |= 1
if raw is True:
cursor_type |= 2
if dictionary is True:
cursor_type |= 4
if named_tuple is True:
cursor_type |= 8
if prepared is True:
cursor_type |= 16
types = {
0: MySQLCursor, # 0
1: MySQLCursorBuffered,
2: MySQLCursorRaw,
3: MySQLCursorBufferedRaw,
4: MySQLCursorDict,
5: MySQLCursorBufferedDict,
8: MySQLCursorNamedTuple,
9: MySQLCursorBufferedNamedTuple,
16: MySQLCursorPrepared
}
try:
return (types[cursor_type])(self)
except KeyError:
args = ('buffered', 'raw', 'dictionary', 'named_tuple', 'prepared')
raise ValueError('Cursor not available with given criteria: ' +
', '.join([args[i] for i in range(5)
if cursor_type & (1 << i) != 0]))
mysql\connector\cursor
class CursorBase(MySQLCursorAbstract):
_raw = False
def __init__(self):
self._description = None
self._rowcount = -1
self._last_insert_id = None
self.arraysize = 1
super(CursorBase, self).__init__()
def callproc(self, procname, args=()):
pass
def close(self):
pass
def execute(self, operation, params=(), multi=False):
pass
def executemany(self, operation, seq_params):
pass
def fetchone(self):
pass
def fetchmany(self, size=1):
pass
def fetchall(self):
pass
def nextset(self):
pass
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, column=None):
pass
def reset(self, free=True):
pass
@property
def description(self):
return self._description
@property
def rowcount(self):
return self._rowcount
@property
def lastrowid(self):
return self._last_insert_id
class MySQLCursor(CursorBase):
def __init__(self, connection=None):
CursorBase.__init__(self)
self._connection = None
self._stored_results = []
self._nextrow = (None, None)
self._warnings = None
self._warning_count = 0
self._executed = None
self._executed_list = []
self._binary = False
if connection is not None:
self._set_connection(connection)
def __iter__(self):
return iter(self.fetchone, None)
def _set_connection(self, connection):
try:
self._connection = weakref.proxy(connection)
self._connection.is_connected()
except (AttributeError, TypeError):
raise errors.InterfaceError(errno=2048)
def _reset_result(self):
self._rowcount = -1
self._nextrow = (None, None)
self._stored_results = []
self._warnings = None
self._warning_count = 0
self._description = None
self._executed = None
self._executed_list = []
self.reset()
def _have_unread_result(self):
try:
return self._connection.unread_result
except AttributeError:
return False
def next(self):
return self.__next__()
def __next__(self):
try:
row = self.fetchone()
except errors.InterfaceError:
raise StopIteration
if not row:
raise StopIteration
return row
def close(self):
if self._connection is None:
return False
self._connection.handle_unread_result()
self._reset_result()
self._connection = None
return True
def _process_params_dict(self, params):
try:
to_mysql = self._connection.converter.to_mysql
escape = self._connection.converter.escape
quote = self._connection.converter.quote
res = {}
for key, value in list(params.items()):
conv = value
conv = to_mysql(conv)
conv = escape(conv)
conv = quote(conv)
if PY2:
res[key] = conv
else:
res[key.encode()] = conv
except Exception as err:
raise errors.ProgrammingError(
"Failed processing pyformat-parameters; %s" % err)
else:
return res
def _process_params(self, params):
try:
res = params
to_mysql = self._connection.converter.to_mysql
escape = self._connection.converter.escape
quote = self._connection.converter.quote
res = [to_mysql(i) for i in res]
res = [escape(i) for i in res]
res = [quote(i) for i in res]
except Exception as err:
raise errors.ProgrammingError(
"Failed processing format-parameters; %s" % err)
else:
return tuple(res)
def _handle_noresultset(self, res):
try:
self._rowcount = res['affected_rows']
self._last_insert_id = res['insert_id']
self._warning_count = res['warning_count']
except (KeyError, TypeError) as err:
raise errors.ProgrammingError(
"Failed handling non-resultset; {0}".format(err))
self._handle_warnings()
if self._connection.raise_on_warnings is True and self._warnings:
raise errors.get_mysql_exception(
self._warnings[0][1], self._warnings[0][2])
def _handle_resultset(self):
pass
def _handle_result(self, result):
if not isinstance(result, dict):
raise errors.InterfaceError('Result was not a dict()')
if 'columns' in result:
# Weak test, must be column/eof information
self._description = result['columns']
self._connection.unread_result = True
self._handle_resultset()
elif 'affected_rows' in result:
# Weak test, must be an OK-packet
self._connection.unread_result = False
self._handle_noresultset(result)
else:
raise errors.InterfaceError('Invalid result')
def _execute_iter(self, query_iter):
executed_list = RE_SQL_SPLIT_STMTS.split(self._executed)
i = 0
while True:
result = next(query_iter) # pylint: disable=R1708
self._reset_result()
self._handle_result(result)
try:
self._executed = executed_list[i].strip()
i += 1
except IndexError:
self._executed = executed_list[0]
yield self
def execute(self, operation, params=None, multi=False):
if not operation:
return None
if not self._connection:
raise errors.ProgrammingError("Cursor is not connected")
self._connection.handle_unread_result()
self._reset_result()
stmt = ''
try:
if not isinstance(operation, (bytes, bytearray)):
stmt = operation.encode(self._connection.python_charset)
else:
stmt = operation
except (UnicodeDecodeError, UnicodeEncodeError) as err:
raise errors.ProgrammingError(str(err))
if params is not None:
if isinstance(params, dict):
stmt = _bytestr_format_dict(
stmt, self._process_params_dict(params))
elif isinstance(params, (list, tuple)):
psub = _ParamSubstitutor(self._process_params(params))
stmt = RE_PY_PARAM.sub(psub, stmt)
if psub.remaining != 0:
raise errors.ProgrammingError(
"Not all parameters were used in the SQL statement")
self._executed = stmt
if multi:
self._executed_list = []
return self._execute_iter(self._connection.cmd_query_iter(stmt))
try:
self._handle_result(self._connection.cmd_query(stmt))
except errors.InterfaceError:
if self._connection._have_next_result: # pylint: disable=W0212
raise errors.InterfaceError(
"Use multi=True when executing multiple statements")
raise
return None
def _batch_insert(self, operation, seq_params):
def remove_comments(match):
if match.group(1):
return ""
return match.group(2)
tmp = re.sub(RE_SQL_ON_DUPLICATE, '',
re.sub(RE_SQL_COMMENT, remove_comments, operation))
matches = re.search(RE_SQL_INSERT_VALUES, tmp)
if not matches:
raise errors.InterfaceError(
"Failed rewriting statement for multi-row INSERT. "
"Check SQL syntax."
)
fmt = matches.group(1).encode(self._connection.python_charset)
values = []
try:
stmt = operation.encode(self._connection.python_charset)
for params in seq_params:
tmp = fmt
if isinstance(params, dict):
tmp = _bytestr_format_dict(
tmp, self._process_params_dict(params))
else:
psub = _ParamSubstitutor(self._process_params(params))
tmp = RE_PY_PARAM.sub(psub, tmp)
if psub.remaining != 0:
raise errors.ProgrammingError(
"Not all parameters were used in the SQL statement")
#for p in self._process_params(params):
# tmp = tmp.replace(b'%s',p,1)
values.append(tmp)
if fmt in stmt:
stmt = stmt.replace(fmt, b','.join(values), 1)
self._executed = stmt
return stmt
return None
except (UnicodeDecodeError, UnicodeEncodeError) as err:
raise errors.ProgrammingError(str(err))
except errors.Error:
raise
except Exception as err:
raise errors.InterfaceError(
"Failed executing the operation; %s" % err)
def executemany(self, operation, seq_params):
if not operation or not seq_params:
return None
self._connection.handle_unread_result()
try:
_ = iter(seq_params)
except TypeError:
raise errors.ProgrammingError(
"Parameters for query must be an Iterable.")
# Optimize INSERTs by batching them
if re.match(RE_SQL_INSERT_STMT, operation):
if not seq_params:
self._rowcount = 0
return None
stmt = self._batch_insert(operation, seq_params)
if stmt is not None:
return self.execute(stmt)
rowcnt = 0
try:
for params in seq_params:
self.execute(operation, params)
if self.with_rows and self._have_unread_result():
self.fetchall()
rowcnt += self._rowcount
except (ValueError, TypeError) as err:
raise errors.InterfaceError(
"Failed executing the operation; {0}".format(err))
except:
# Raise whatever execute() raises
raise
self._rowcount = rowcnt
return None
def stored_results(self):
return iter(self._stored_results)
def callproc(self, procname, args=()):
if not procname or not isinstance(procname, str):
raise ValueError("procname must be a string")
if not isinstance(args, (tuple, list)):
raise ValueError("args must be a sequence")
argfmt = "@_{name}_arg{index}"
self._stored_results = []
results = []
try:
argnames = []
argtypes = []
if args:
for idx, arg in enumerate(args):
argname = argfmt.format(name=procname, index=idx + 1)
argnames.append(argname)
if isinstance(arg, tuple):
argtypes.append(" CAST({0} AS {1})".format(argname,
arg[1]))
self.execute("SET {0}=%s".format(argname), (arg[0],))
else:
argtypes.append(argname)
self.execute("SET {0}=%s".format(argname), (arg,))
call = "CALL {0}({1})".format(procname, ','.join(argnames))
# pylint: disable=W0212
# We disable consuming results temporary to make sure we
# getting all results
can_consume_results = self._connection._consume_results
for result in self._connection.cmd_query_iter(call):
self._connection._consume_results = False
if self._raw:
tmp = MySQLCursorBufferedRaw(self._connection._get_self())
else:
tmp = MySQLCursorBuffered(self._connection._get_self())
tmp._executed = "(a result of {0})".format(call)
tmp._handle_result(result)
if tmp._warnings is not None:
self._warnings = tmp._warnings
if 'columns' in result:
results.append(tmp)
self._connection._consume_results = can_consume_results
# pylint: enable=W0212
if argnames:
select = "SELECT {0}".format(','.join(argtypes))
self.execute(select)
self._stored_results = results
return self.fetchone()
self._stored_results = results
return ()
except errors.Error:
raise
except Exception as err:
raise errors.InterfaceError(
"Failed calling stored routine; {0}".format(err))
def getlastrowid(self):
return self._last_insert_id
def _fetch_warnings(self):
res = []
try:
cur = self._connection.cursor(raw=False)
cur.execute("SHOW WARNINGS")
res = cur.fetchall()
cur.close()
except Exception as err:
raise errors.InterfaceError(
"Failed getting warnings; %s" % err)
if res:
return res
return None
def _handle_warnings(self):
if self._connection.get_warnings is True and self._warning_count:
self._warnings = self._fetch_warnings()
def _handle_eof(self, eof):
self._connection.unread_result = False
self._nextrow = (None, None)
self._warning_count = eof['warning_count']
self._handle_warnings()
if self._connection.raise_on_warnings is True and self._warnings:
raise errors.get_mysql_exception(
self._warnings[0][1], self._warnings[0][2])
def _fetch_row(self, raw=False):
if not self._have_unread_result():
return None
row = None
if self._nextrow == (None, None):
(row, eof) = self._connection.get_row(
binary=self._binary, columns=self.description, raw=raw)
else:
(row, eof) = self._nextrow
if row:
self._nextrow = self._connection.get_row(
binary=self._binary, columns=self.description, raw=raw)
eof = self._nextrow[1]
if eof is not None:
self._handle_eof(eof)
if self._rowcount == -1:
self._rowcount = 1
else:
self._rowcount += 1
if eof:
self._handle_eof(eof)
return row
def fetchone(self):
row = self._fetch_row()
if row:
return row
return None
def fetchmany(self, size=None):
res = []
cnt = (size or self.arraysize)
while cnt > 0 and self._have_unread_result():
cnt -= 1
row = self.fetchone()
if row:
res.append(row)
return res
def fetchall(self):
if not self._have_unread_result():
raise errors.InterfaceError("No result set to fetch from.")
(rows, eof) = self._connection.get_rows()
if self._nextrow[0]:
rows.insert(0, self._nextrow[0])
self._handle_eof(eof)
rowcount = len(rows)
if rowcount >= 0 and self._rowcount == -1:
self._rowcount = 0
self._rowcount += rowcount
return rows
@property
def column_names(self):
if not self.description:
return ()
return tuple([d[0] for d in self.description])
@property
def statement(self):
if self._executed is None:
return None
try:
return self._executed.strip().decode('utf-8')
except (AttributeError, UnicodeDecodeError):
return self._executed.strip()
@property
def with_rows(self):
if not self.description:
return False
return True
def __str__(self):
fmt = "{class_name}: {stmt}"
if self._executed:
try:
executed = self._executed.decode('utf-8')
except AttributeError:
executed = self._executed
if len(executed) > 40:
executed = executed[:40] + '..'
else:
executed = '(Nothing executed yet)'
return fmt.format(class_name=self.__class__.__name__, stmt=executed)
class MySQLCursorPrepared(MySQLCursor):
def __init__(self, connection=None):
super(MySQLCursorPrepared, self).__init__(connection)
self._rows = None
self._next_row = 0
self._prepared = None
self._binary = True
self._have_result = None
self._last_row_sent = False
self._cursor_exists = False
def reset(self, free=True):
if self._prepared:
try:
self._connection.cmd_stmt_close(self._prepared['statement_id'])
except errors.Error:
# We tried to deallocate, but it's OK when we fail.
pass
self._prepared = None
self._last_row_sent = False
self._cursor_exists = False
def _handle_noresultset(self, res):
self._handle_server_status(res.get('status_flag',
res.get('server_status', 0)))
super(MySQLCursorPrepared, self)._handle_noresultset(res)
def _handle_server_status(self, flags):
self._cursor_exists = flags & ServerFlag.STATUS_CURSOR_EXISTS != 0
self._last_row_sent = flags & ServerFlag.STATUS_LAST_ROW_SENT != 0
def _handle_eof(self, eof):
self._handle_server_status(eof.get('status_flag',
eof.get('server_status', 0)))
super(MySQLCursorPrepared, self)._handle_eof(eof)
def callproc(self, procname, args=()):
raise errors.NotSupportedError()
def close(self):
self.reset()
super(MySQLCursorPrepared, self).close()
def _row_to_python(self, rowdata, desc=None):
pass
def _handle_result(self, result):
if isinstance(result, dict):
self._connection.unread_result = False
self._have_result = False
self._handle_noresultset(result)
else:
self._description = result[1]
self._connection.unread_result = True
self._have_result = True
if 'status_flag' in result[2]:
self._handle_server_status(result[2]['status_flag'])
elif 'server_status' in result[2]:
self._handle_server_status(result[2]['server_status'])
def execute(self, operation, params=(), multi=False): # multi is unused
if operation is not self._executed:
if self._prepared:
self._connection.cmd_stmt_close(self._prepared['statement_id'])
self._executed = operation
try:
if not isinstance(operation, bytes):
charset = self._connection.charset
if charset == 'utf8mb4':
charset = 'utf8'
operation = operation.encode(charset)
except (UnicodeDecodeError, UnicodeEncodeError) as err:
raise errors.ProgrammingError(str(err))
# need to convert %s to ? before sending it to MySQL
if b'%s' in operation:
operation = re.sub(RE_SQL_FIND_PARAM, b'?', operation)
try:
self._prepared = self._connection.cmd_stmt_prepare(operation)
except errors.Error:
self._executed = None
raise
self._connection.cmd_stmt_reset(self._prepared['statement_id'])
if self._prepared['parameters'] and not params:
return
elif len(self._prepared['parameters']) != len(params):
raise errors.ProgrammingError(
errno=1210,
msg="Incorrect number of arguments " \
"executing prepared statement")
res = self._connection.cmd_stmt_execute(
self._prepared['statement_id'],
data=params,
parameters=self._prepared['parameters'])
self._handle_result(res)
def executemany(self, operation, seq_params):
rowcnt = 0
try:
for params in seq_params:
self.execute(operation, params)
if self.with_rows and self._have_unread_result():
self.fetchall()
rowcnt += self._rowcount
except (ValueError, TypeError) as err:
raise errors.InterfaceError(
"Failed executing the operation; {error}".format(error=err))
except:
# Raise whatever execute() raises
raise
self._rowcount = rowcnt
def fetchone(self):
if self._cursor_exists:
self._connection.cmd_stmt_fetch(self._prepared['statement_id'])
return self._fetch_row() or None
def fetchmany(self, size=None):
res = []
cnt = (size or self.arraysize)
while cnt > 0 and self._have_unread_result():
cnt -= 1
row = self._fetch_row()
if row:
res.append(row)
return res
def fetchall(self):
if not self._have_unread_result():
raise errors.InterfaceError("No result set to fetch from.")
rows = []
if self._nextrow[0]:
rows.append(self._nextrow[0])
while self._have_unread_result():
if self._cursor_exists:
self._connection.cmd_stmt_fetch(
self._prepared['statement_id'], MAX_RESULTS)
(tmp, eof) = self._connection.get_rows(
binary=self._binary, columns=self.description)
rows.extend(tmp)
self._handle_eof(eof)
self._rowcount = len(rows)
return rows
class MySQLCursorDict(MySQLCursor):
def _row_to_python(self, rowdata, desc=None):
row = rowdata
if row:
return dict(zip(self.column_names, row))
return None
def fetchone(self):
row = self._fetch_row()
if row:
return self._row_to_python(row, self.description)
return None
def fetchall(self):
if not self._have_unread_result():
raise errors.InterfaceError(ERR_NO_RESULT_TO_FETCH)
(rows, eof) = self._connection.get_rows()
if self._nextrow[0]:
rows.insert(0, self._nextrow[0])
res = []
for row in rows:
res.append(self._row_to_python(row, self.description))
self._handle_eof(eof)
rowcount = len(rows)
if rowcount >= 0 and self._rowcount == -1:
self._rowcount = 0
self._rowcount += rowcount
return res
おわりに
- 普通に出来ると思ってたことが出来ないと、苦労しますね。