2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

cnx.cursor(dictionary=True, prepared=True)がNGなんです…

Last updated at Posted at 2018-10-07

はじめに

  • FlaskでMySQL操作をしていて、ハマったのでメモしておきます。

実現できること

その1

  • ユーザー一覧の取得(戻り値:Dict型)
api
http://127.0.0.1/users
MySQLへのクエリ―
SELECT * FROM users;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(dictionary=True)
cursor.execute(stmt)
cursor.fetchall()
期待通りのデータ(戻り値)
[
    {'id': 1, 'name': '日本 太郎', 'age': '22', 'gender': '男性'},
:
]

その2

  • ユーザー単体の取得(戻り値:tuple型)
api
http://127.0.0.1/users/1
MySQLへのクエリ―
SELECT * FROM users WHERE id = ?;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(prepared=True)
cursor.execute(stmt, id)
cursor.fetchall()
期待通りのデータ(戻り値)
[(1, '日本 太郎', '22', '男性')]

実現したいこと

  • 上記の合わせ技(戻り値:辞書型,プリペアードステートメント使用)
api
http://127.0.0.1/users/1
MySQLへのクエリ―
SELECT * FROM users WHERE id = ?;
dbh.cursor
dbh = mysql.connector.connect(**dns)
cursor = dbh.cursor(dictionary=True, prepared=True) # Error
cursor.execute(stmt, id)
cursor.fetchall()
期待するデータ(戻り値)
[{'id': 1, 'name': '日本 太郎', 'age': '22', 'gender': '男性'}]

なんで?

KeyError
[2018-10-07 14:18:07,670] ERROR in app: Exception on /users/1 [GET]
Traceback (most recent call last):
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\mysql\connector\connection.py", line 901, in cursor
    return (types[cursor_type])(self)
KeyError: 20
ValueError
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 2292, in wsgi_app
    response = self.full_dispatch_request()
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1815, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1718, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\_compat.py", line 35, in reraise
    raise value
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1813, in full_dispatch_request
    rv = self.dispatch_request()
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\flask\app.py", line 1799, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "C:\pystudy\lesson-flask\step5\index.py", line 104, in user
    user = db.select(stmt, id, prepared=True)
  File "C:\pystudy\lesson-flask\step5\DataStore\MySQL.py", line 30, in select
    cursor = self.dbh.cursor(dictionary=True, prepared=True)
  File "c:\pystudy\lesson-flask\step5\lib\site-packages\mysql\connector\connection.py", line 905, in cursor
    ', '.join([args[i] for i in range(5)
ValueError: Cursor not available with given criteria: dictionary, prepared

解決策?

>>> import mysql.connector as mc
>>> dns = {'user': 'mysql', 'password': 'NewPassword', 'host': 'localhost', 'database': 'kaggle'}
>>> dbh = mc.connect(**dns)
>>> cursor = dbh.cursor(prepared=True)
>>> id = (1,)
>>> stmt = 'SELECT * FROM users WHERE id=?'
>>> cursor.execute(stmt, id)
>>> keys = cursor.column_names
>>> rows = cursor.fetchall()
>>> data = []
>>> for row in rows:
...     data.append(dict(zip(keys, row)))
...
>>> cursor.close()
>>> dbh.close()
>>> data
[{'id': 1, 'name': '日本 花子', 'age': '32', 'gender': '女'}]

勉強がてらソースファイルを眺めてみる…

mysql\connector\abstracts
class MySQLCursorAbstract(object):
    def __init__(self):
        self._description = None
        self._rowcount = -1
        self._last_insert_id = None
        self._warnings = None
        self.arraysize = 1

    @abstractmethod
    def callproc(self, procname, args=()):
        pass

    @abstractmethod
    def close(self):
        pass

    @abstractmethod
    def execute(self, operation, params=(), multi=False):
        pass

    @abstractmethod
    def executemany(self, operation, seq_params):
        pass

    @abstractmethod
    def fetchone(self):
        pass

    @abstractmethod
    def fetchmany(self, size=1):
        pass

    @abstractmethod
    def fetchall(self):
        pass

    def nextset(self):
        pass

    def setinputsizes(self, sizes):
        pass

    def setoutputsize(self, size, column=None):
        pass

    def reset(self, free=True):
        pass

    @abstractproperty
    def description(self):
        return self._description

    @abstractproperty
    def rowcount(self):
        return self._rowcount

    @abstractproperty
    def lastrowid(self):
        return self._last_insert_id

    def fetchwarnings(self):
        return self._warnings
mysql\connector\connection
class MySQLConnection(MySQLConnectionAbstract):
 # :
 # snip
 # :
    def cursor(self, buffered=None, raw=None, prepared=None, cursor_class=None,
               dictionary=None, named_tuple=None):

        self.handle_unread_result()

        if not self.is_connected():
            raise errors.OperationalError("MySQL Connection not available.")
        if cursor_class is not None:
            if not issubclass(cursor_class, CursorBase):
                raise errors.ProgrammingError(
                    "Cursor class needs be to subclass of cursor.CursorBase")
            return (cursor_class)(self)

        buffered = buffered if buffered is not None else self._buffered
        raw = raw if raw is not None else self._raw

        cursor_type = 0
        if buffered is True:
            cursor_type |= 1
        if raw is True:
            cursor_type |= 2
        if dictionary is True:
            cursor_type |= 4
        if named_tuple is True:
            cursor_type |= 8
        if prepared is True:
            cursor_type |= 16

        types = {
            0: MySQLCursor,  # 0
            1: MySQLCursorBuffered,
            2: MySQLCursorRaw,
            3: MySQLCursorBufferedRaw,
            4: MySQLCursorDict,
            5: MySQLCursorBufferedDict,
            8: MySQLCursorNamedTuple,
            9: MySQLCursorBufferedNamedTuple,
            16: MySQLCursorPrepared
        }
        try:
            return (types[cursor_type])(self)
        except KeyError:
            args = ('buffered', 'raw', 'dictionary', 'named_tuple', 'prepared')
            raise ValueError('Cursor not available with given criteria: ' +
                             ', '.join([args[i] for i in range(5)
                                        if cursor_type & (1 << i) != 0]))
mysql\connector\cursor
class CursorBase(MySQLCursorAbstract):

    _raw = False

    def __init__(self):
        self._description = None
        self._rowcount = -1
        self._last_insert_id = None
        self.arraysize = 1
        super(CursorBase, self).__init__()

    def callproc(self, procname, args=()):
        pass

    def close(self):
        pass

    def execute(self, operation, params=(), multi=False):
        pass

    def executemany(self, operation, seq_params):
        pass

    def fetchone(self):
        pass

    def fetchmany(self, size=1):
        pass

    def fetchall(self):
        pass

    def nextset(self):
        pass

    def setinputsizes(self, sizes):
        pass

    def setoutputsize(self, size, column=None):
        pass

    def reset(self, free=True):
        pass

    @property
    def description(self):
        return self._description

    @property
    def rowcount(self):
        return self._rowcount

    @property
    def lastrowid(self):
        return self._last_insert_id


class MySQLCursor(CursorBase):
    def __init__(self, connection=None):
        CursorBase.__init__(self)
        self._connection = None
        self._stored_results = []
        self._nextrow = (None, None)
        self._warnings = None
        self._warning_count = 0
        self._executed = None
        self._executed_list = []
        self._binary = False

        if connection is not None:
            self._set_connection(connection)

    def __iter__(self):
        return iter(self.fetchone, None)

    def _set_connection(self, connection):
        try:
            self._connection = weakref.proxy(connection)
            self._connection.is_connected()
        except (AttributeError, TypeError):
            raise errors.InterfaceError(errno=2048)

    def _reset_result(self):
        self._rowcount = -1
        self._nextrow = (None, None)
        self._stored_results = []
        self._warnings = None
        self._warning_count = 0
        self._description = None
        self._executed = None
        self._executed_list = []
        self.reset()

    def _have_unread_result(self):
        try:
            return self._connection.unread_result
        except AttributeError:
            return False

    def next(self):
        return self.__next__()

    def __next__(self):
        try:
            row = self.fetchone()
        except errors.InterfaceError:
            raise StopIteration
        if not row:
            raise StopIteration
        return row

    def close(self):
        if self._connection is None:
            return False

        self._connection.handle_unread_result()
        self._reset_result()
        self._connection = None

        return True

    def _process_params_dict(self, params):
        try:
            to_mysql = self._connection.converter.to_mysql
            escape = self._connection.converter.escape
            quote = self._connection.converter.quote
            res = {}
            for key, value in list(params.items()):
                conv = value
                conv = to_mysql(conv)
                conv = escape(conv)
                conv = quote(conv)
                if PY2:
                    res[key] = conv
                else:
                    res[key.encode()] = conv
        except Exception as err:
            raise errors.ProgrammingError(
                "Failed processing pyformat-parameters; %s" % err)
        else:
            return res

    def _process_params(self, params):
        try:
            res = params

            to_mysql = self._connection.converter.to_mysql
            escape = self._connection.converter.escape
            quote = self._connection.converter.quote

            res = [to_mysql(i) for i in res]
            res = [escape(i) for i in res]
            res = [quote(i) for i in res]
        except Exception as err:
            raise errors.ProgrammingError(
                "Failed processing format-parameters; %s" % err)
        else:
            return tuple(res)

    def _handle_noresultset(self, res):
        try:
            self._rowcount = res['affected_rows']
            self._last_insert_id = res['insert_id']
            self._warning_count = res['warning_count']
        except (KeyError, TypeError) as err:
            raise errors.ProgrammingError(
                "Failed handling non-resultset; {0}".format(err))

        self._handle_warnings()
        if self._connection.raise_on_warnings is True and self._warnings:
            raise errors.get_mysql_exception(
                self._warnings[0][1], self._warnings[0][2])

    def _handle_resultset(self):
        pass

    def _handle_result(self, result):
        if not isinstance(result, dict):
            raise errors.InterfaceError('Result was not a dict()')

        if 'columns' in result:
            # Weak test, must be column/eof information
            self._description = result['columns']
            self._connection.unread_result = True
            self._handle_resultset()
        elif 'affected_rows' in result:
            # Weak test, must be an OK-packet
            self._connection.unread_result = False
            self._handle_noresultset(result)
        else:
            raise errors.InterfaceError('Invalid result')

    def _execute_iter(self, query_iter):
        executed_list = RE_SQL_SPLIT_STMTS.split(self._executed)

        i = 0
        while True:
            result = next(query_iter)  # pylint: disable=R1708
            self._reset_result()
            self._handle_result(result)
            try:
                self._executed = executed_list[i].strip()
                i += 1
            except IndexError:
                self._executed = executed_list[0]

            yield self

    def execute(self, operation, params=None, multi=False):
        if not operation:
            return None

        if not self._connection:
            raise errors.ProgrammingError("Cursor is not connected")

        self._connection.handle_unread_result()

        self._reset_result()
        stmt = ''

        try:
            if not isinstance(operation, (bytes, bytearray)):
                stmt = operation.encode(self._connection.python_charset)
            else:
                stmt = operation
        except (UnicodeDecodeError, UnicodeEncodeError) as err:
            raise errors.ProgrammingError(str(err))

        if params is not None:
            if isinstance(params, dict):
                stmt = _bytestr_format_dict(
                    stmt, self._process_params_dict(params))
            elif isinstance(params, (list, tuple)):
                psub = _ParamSubstitutor(self._process_params(params))
                stmt = RE_PY_PARAM.sub(psub, stmt)
                if psub.remaining != 0:
                    raise errors.ProgrammingError(
                        "Not all parameters were used in the SQL statement")

        self._executed = stmt
        if multi:
            self._executed_list = []
            return self._execute_iter(self._connection.cmd_query_iter(stmt))

        try:
            self._handle_result(self._connection.cmd_query(stmt))
        except errors.InterfaceError:
            if self._connection._have_next_result:  # pylint: disable=W0212
                raise errors.InterfaceError(
                    "Use multi=True when executing multiple statements")
            raise
        return None

    def _batch_insert(self, operation, seq_params):
        def remove_comments(match):
            if match.group(1):
                return ""
            return match.group(2)

        tmp = re.sub(RE_SQL_ON_DUPLICATE, '',
                     re.sub(RE_SQL_COMMENT, remove_comments, operation))

        matches = re.search(RE_SQL_INSERT_VALUES, tmp)
        if not matches:
            raise errors.InterfaceError(
                "Failed rewriting statement for multi-row INSERT. "
                "Check SQL syntax."
            )
        fmt = matches.group(1).encode(self._connection.python_charset)
        values = []

        try:
            stmt = operation.encode(self._connection.python_charset)
            for params in seq_params:
                tmp = fmt
                if isinstance(params, dict):
                    tmp = _bytestr_format_dict(
                        tmp, self._process_params_dict(params))
                else:
                    psub = _ParamSubstitutor(self._process_params(params))
                    tmp = RE_PY_PARAM.sub(psub, tmp)
                    if psub.remaining != 0:
                        raise errors.ProgrammingError(
                            "Not all parameters were used in the SQL statement")
                    #for p in self._process_params(params):
                    #    tmp = tmp.replace(b'%s',p,1)
                values.append(tmp)
            if fmt in stmt:
                stmt = stmt.replace(fmt, b','.join(values), 1)
                self._executed = stmt
                return stmt
            return None
        except (UnicodeDecodeError, UnicodeEncodeError) as err:
            raise errors.ProgrammingError(str(err))
        except errors.Error:
            raise
        except Exception as err:
            raise errors.InterfaceError(
                "Failed executing the operation; %s" % err)

    def executemany(self, operation, seq_params):
        if not operation or not seq_params:
            return None
        self._connection.handle_unread_result()

        try:
            _ = iter(seq_params)
        except TypeError:
            raise errors.ProgrammingError(
                "Parameters for query must be an Iterable.")

        # Optimize INSERTs by batching them
        if re.match(RE_SQL_INSERT_STMT, operation):
            if not seq_params:
                self._rowcount = 0
                return None
            stmt = self._batch_insert(operation, seq_params)
            if stmt is not None:
                return self.execute(stmt)

        rowcnt = 0
        try:
            for params in seq_params:
                self.execute(operation, params)
                if self.with_rows and self._have_unread_result():
                    self.fetchall()
                rowcnt += self._rowcount
        except (ValueError, TypeError) as err:
            raise errors.InterfaceError(
                "Failed executing the operation; {0}".format(err))
        except:
            # Raise whatever execute() raises
            raise
        self._rowcount = rowcnt
        return None

    def stored_results(self):
        return iter(self._stored_results)

    def callproc(self, procname, args=()):
        if not procname or not isinstance(procname, str):
            raise ValueError("procname must be a string")

        if not isinstance(args, (tuple, list)):
            raise ValueError("args must be a sequence")

        argfmt = "@_{name}_arg{index}"
        self._stored_results = []

        results = []
        try:
            argnames = []
            argtypes = []
            if args:
                for idx, arg in enumerate(args):
                    argname = argfmt.format(name=procname, index=idx + 1)
                    argnames.append(argname)
                    if isinstance(arg, tuple):
                        argtypes.append(" CAST({0} AS {1})".format(argname,
                                                                   arg[1]))
                        self.execute("SET {0}=%s".format(argname), (arg[0],))
                    else:
                        argtypes.append(argname)
                        self.execute("SET {0}=%s".format(argname), (arg,))

            call = "CALL {0}({1})".format(procname, ','.join(argnames))

            # pylint: disable=W0212
            # We disable consuming results temporary to make sure we
            # getting all results
            can_consume_results = self._connection._consume_results
            for result in self._connection.cmd_query_iter(call):
                self._connection._consume_results = False
                if self._raw:
                    tmp = MySQLCursorBufferedRaw(self._connection._get_self())
                else:
                    tmp = MySQLCursorBuffered(self._connection._get_self())
                tmp._executed = "(a result of {0})".format(call)
                tmp._handle_result(result)
                if tmp._warnings is not None:
                    self._warnings = tmp._warnings
                if 'columns' in result:
                    results.append(tmp)
            self._connection._consume_results = can_consume_results
            # pylint: enable=W0212

            if argnames:
                select = "SELECT {0}".format(','.join(argtypes))
                self.execute(select)
                self._stored_results = results
                return self.fetchone()

            self._stored_results = results
            return ()

        except errors.Error:
            raise
        except Exception as err:
            raise errors.InterfaceError(
                "Failed calling stored routine; {0}".format(err))

    def getlastrowid(self):
        return self._last_insert_id

    def _fetch_warnings(self):
        res = []
        try:
            cur = self._connection.cursor(raw=False)
            cur.execute("SHOW WARNINGS")
            res = cur.fetchall()
            cur.close()
        except Exception as err:
            raise errors.InterfaceError(
                "Failed getting warnings; %s" % err)

        if res:
            return res

        return None

    def _handle_warnings(self):
        if self._connection.get_warnings is True and self._warning_count:
            self._warnings = self._fetch_warnings()

    def _handle_eof(self, eof):
        self._connection.unread_result = False
        self._nextrow = (None, None)
        self._warning_count = eof['warning_count']
        self._handle_warnings()
        if self._connection.raise_on_warnings is True and self._warnings:
            raise errors.get_mysql_exception(
                self._warnings[0][1], self._warnings[0][2])

    def _fetch_row(self, raw=False):
        if not self._have_unread_result():
            return None
        row = None

        if self._nextrow == (None, None):
            (row, eof) = self._connection.get_row(
                binary=self._binary, columns=self.description, raw=raw)
        else:
            (row, eof) = self._nextrow

        if row:
            self._nextrow = self._connection.get_row(
                binary=self._binary, columns=self.description, raw=raw)
            eof = self._nextrow[1]
            if eof is not None:
                self._handle_eof(eof)
            if self._rowcount == -1:
                self._rowcount = 1
            else:
                self._rowcount += 1
        if eof:
            self._handle_eof(eof)

        return row

    def fetchone(self):
        row = self._fetch_row()
        if row:
            return row
        return None

    def fetchmany(self, size=None):
        res = []
        cnt = (size or self.arraysize)
        while cnt > 0 and self._have_unread_result():
            cnt -= 1
            row = self.fetchone()
            if row:
                res.append(row)
        return res

    def fetchall(self):
        if not self._have_unread_result():
            raise errors.InterfaceError("No result set to fetch from.")
        (rows, eof) = self._connection.get_rows()
        if self._nextrow[0]:
            rows.insert(0, self._nextrow[0])

        self._handle_eof(eof)
        rowcount = len(rows)
        if rowcount >= 0 and self._rowcount == -1:
            self._rowcount = 0
        self._rowcount += rowcount
        return rows

    @property
    def column_names(self):
        if not self.description:
            return ()
        return tuple([d[0] for d in self.description])

    @property
    def statement(self):
        if self._executed is None:
            return None
        try:
            return self._executed.strip().decode('utf-8')
        except (AttributeError, UnicodeDecodeError):
            return self._executed.strip()

    @property
    def with_rows(self):
        if not self.description:
            return False
        return True

    def __str__(self):
        fmt = "{class_name}: {stmt}"
        if self._executed:
            try:
                executed = self._executed.decode('utf-8')
            except AttributeError:
                executed = self._executed
            if len(executed) > 40:
                executed = executed[:40] + '..'
        else:
            executed = '(Nothing executed yet)'
        return fmt.format(class_name=self.__class__.__name__, stmt=executed)


class MySQLCursorPrepared(MySQLCursor):
    def __init__(self, connection=None):
        super(MySQLCursorPrepared, self).__init__(connection)
        self._rows = None
        self._next_row = 0
        self._prepared = None
        self._binary = True
        self._have_result = None
        self._last_row_sent = False
        self._cursor_exists = False

    def reset(self, free=True):
        if self._prepared:
            try:
                self._connection.cmd_stmt_close(self._prepared['statement_id'])
            except errors.Error:
                # We tried to deallocate, but it's OK when we fail.
                pass
            self._prepared = None
        self._last_row_sent = False
        self._cursor_exists = False

    def _handle_noresultset(self, res):
        self._handle_server_status(res.get('status_flag',
                                           res.get('server_status', 0)))
        super(MySQLCursorPrepared, self)._handle_noresultset(res)

    def _handle_server_status(self, flags):
        self._cursor_exists = flags & ServerFlag.STATUS_CURSOR_EXISTS != 0
        self._last_row_sent = flags & ServerFlag.STATUS_LAST_ROW_SENT != 0

    def _handle_eof(self, eof):
        self._handle_server_status(eof.get('status_flag',
                                           eof.get('server_status', 0)))
        super(MySQLCursorPrepared, self)._handle_eof(eof)

    def callproc(self, procname, args=()):
        raise errors.NotSupportedError()

    def close(self):
        self.reset()
        super(MySQLCursorPrepared, self).close()

    def _row_to_python(self, rowdata, desc=None):
        pass

    def _handle_result(self, result):
        if isinstance(result, dict):
            self._connection.unread_result = False
            self._have_result = False
            self._handle_noresultset(result)
        else:
            self._description = result[1]
            self._connection.unread_result = True
            self._have_result = True

            if 'status_flag' in result[2]:
                self._handle_server_status(result[2]['status_flag'])
            elif 'server_status' in result[2]:
                self._handle_server_status(result[2]['server_status'])

    def execute(self, operation, params=(), multi=False):  # multi is unused
        if operation is not self._executed:
            if self._prepared:
                self._connection.cmd_stmt_close(self._prepared['statement_id'])

            self._executed = operation
            try:
                if not isinstance(operation, bytes):
                    charset = self._connection.charset
                    if charset == 'utf8mb4':
                        charset = 'utf8'
                    operation = operation.encode(charset)
            except (UnicodeDecodeError, UnicodeEncodeError) as err:
                raise errors.ProgrammingError(str(err))

            # need to convert %s to ? before sending it to MySQL
            if b'%s' in operation:
                operation = re.sub(RE_SQL_FIND_PARAM, b'?', operation)

            try:
                self._prepared = self._connection.cmd_stmt_prepare(operation)
            except errors.Error:
                self._executed = None
                raise

        self._connection.cmd_stmt_reset(self._prepared['statement_id'])

        if self._prepared['parameters'] and not params:
            return
        elif len(self._prepared['parameters']) != len(params):
            raise errors.ProgrammingError(
                errno=1210,
                msg="Incorrect number of arguments " \
                    "executing prepared statement")

        res = self._connection.cmd_stmt_execute(
            self._prepared['statement_id'],
            data=params,
            parameters=self._prepared['parameters'])
        self._handle_result(res)

    def executemany(self, operation, seq_params):
        rowcnt = 0
        try:
            for params in seq_params:
                self.execute(operation, params)
                if self.with_rows and self._have_unread_result():
                    self.fetchall()
                rowcnt += self._rowcount
        except (ValueError, TypeError) as err:
            raise errors.InterfaceError(
                "Failed executing the operation; {error}".format(error=err))
        except:
            # Raise whatever execute() raises
            raise
        self._rowcount = rowcnt

    def fetchone(self):
        if self._cursor_exists:
            self._connection.cmd_stmt_fetch(self._prepared['statement_id'])
        return self._fetch_row() or None

    def fetchmany(self, size=None):
        res = []
        cnt = (size or self.arraysize)
        while cnt > 0 and self._have_unread_result():
            cnt -= 1
            row = self._fetch_row()
            if row:
                res.append(row)
        return res

    def fetchall(self):
        if not self._have_unread_result():
            raise errors.InterfaceError("No result set to fetch from.")
        rows = []
        if self._nextrow[0]:
            rows.append(self._nextrow[0])
        while self._have_unread_result():
            if self._cursor_exists:
                self._connection.cmd_stmt_fetch(
                    self._prepared['statement_id'], MAX_RESULTS)
            (tmp, eof) = self._connection.get_rows(
                binary=self._binary, columns=self.description)
            rows.extend(tmp)
            self._handle_eof(eof)
        self._rowcount = len(rows)
        return rows


class MySQLCursorDict(MySQLCursor):
    def _row_to_python(self, rowdata, desc=None):
        row = rowdata

        if row:
            return dict(zip(self.column_names, row))

        return None

    def fetchone(self):
        row = self._fetch_row()
        if row:
            return self._row_to_python(row, self.description)
        return None

    def fetchall(self):
        if not self._have_unread_result():
            raise errors.InterfaceError(ERR_NO_RESULT_TO_FETCH)
        (rows, eof) = self._connection.get_rows()
        if self._nextrow[0]:
            rows.insert(0, self._nextrow[0])
        res = []
        for row in rows:
            res.append(self._row_to_python(row, self.description))
        self._handle_eof(eof)
        rowcount = len(rows)
        if rowcount >= 0 and self._rowcount == -1:
            self._rowcount = 0
        self._rowcount += rowcount
        return res

おわりに

  • 普通に出来ると思ってたことが出来ないと、苦労しますね。
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?