LoginSignup
1
0

More than 3 years have passed since last update.

python でdbの標準ドライバを作る話。

Last updated at Posted at 2019-12-02

概要

DB系のプロダクト開発でバックエンドがほぼほぼ完成した際に、
pythonやjavaで標準的に使われている形のドライバをラッパーとして用意し、ユーザの使いやすさを向上したいと考えました。

javaはまだ書いたことないので勉強が必要ですが、まずはpythonでラッパーを書くことになり、標準のドライバーの仕様について勉強し、実装の流れについてまとめてみます。

参考文献

内容

今回ナイーブながら実装してみたものをまとめると、以下になります。

  • Our_Api クラス
  • Connection クラス
  • Cursor クラス
  • Error クラス
  • Type クラス

コード的には、われわれのドライバをつかって

main.py
#!/usr/bin/python

import MySQLdb

# Open database connection
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# prepare a cursor object using cursor() method
cursor = db.cursor()

# execute SQL query using execute() method.
cursor.execute("SELECT VERSION()")

# Fetch a single row using fetchone() method.
data = cursor.fetchone()
print "Database version : %s " % data

# disconnect from server
db.close()

みたいなことがしたいわけです。

ここでのMySQLdb が自分たちの提供するドライバのライブラリになって、
MySQLdb が Connectionクラスのオブジェクトを必要な設定変数とともに作ります。
また、Connectionクラスは Cursorクラスのオブジェクトを生成します。
Cursorクラスのはcursor.execute()とか、cursor.fetchone()のようなメソッドを持ち、
渡されたSQLの結果が格納されますし、それらの結果にアクセスして見ることができます。

また、Insert や Updateなどが行われるときは、

main.py
#!/usr/bin/python

import MySQLdb

# Open database connection
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )

# prepare a cursor object using cursor() method
cursor = db.cursor()

# Prepare SQL query to INSERT a record into the database.
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
   # Execute the SQL command
   cursor.execute(sql)
   # Commit your changes in the database
   db.commit()
except:
   # Rollback in case there is any error
   db.rollback()

# disconnect from server
db.close()

のようにして、db.coimmit()すなわちconnectionクラスのcommit()メソッドを呼ぶことで変更したオペレーションを実際にDBに反映させ、もし戻したいと成った場合にはdb.rollback()などのメソッドで元に戻すことができます。

また、Error クラスはエラーのハンドリングを指定された形で行うために実装されます。
さらに、Typeクラスは、仮にDB操作のバックエンドがPython以外で書かれていた場合など、戻ってくるデータ型を的確にpythonの対応するデータ型に変更する際に使われます。

以上を踏まえて実装したものを簡単にまとめて紹介します。

実装

メインディレクトリでの

tree .

の結果は

.
├── api_core
│   ├── api_connection.py
│   ├── api_cursor.py
│   ├── api_error.py
│   ├── api_type.py
│   ├── __init__.py
│   └── our_api.py
├── main.py
└── requirements.txt

のように成っています。

api_connection.py
from .api_cursor import Cursor

class Connection:
    '''
        Connection class

        class member:
            api_host: str
            api_port: int
            e.g) http://{api_host}:{api_port}

        class method:
            cursor: returns Cursor class object
            close: call destructor to delete the class object itself
    '''

    def __init__(self, proxy_conf_dict = None, proxy_conf_file=None):
        if proxy_conf_dict:

            self.api_host = proxy_conf_dict['api_host']
            self.api_port = proxy_conf_dict['api_port']

            self.proxy_conf_dict = proxy_conf_dict

    def __del__(self):
        pass


    def cursor(self):
        return Cursor(self.proxy_conf_dict)

    def close(self):
        self.__del__()


api_cursor.py
import requests
import json

from .api_type import Type, Type_Enum

class Cursor:
    def __init__(self, proxy_conf_dict=None):
        '''
            Cursor class

            class member:
                result: list of retrieved records (list of dictionary)
                type_dict: dict of (telling which column is datetime, time or date)
                conf_dict: dict of (host and port)
                index which: integer telling where cursor points to
                query_execute_que: list of raw_query (query not related to select statement and they will be sent when connection.commit)


            class method:
                execute: params(raw_query:str)
                        update self.result and self.type_dict
                commit:  params()
                        send post request for "not" select statement
                fetchone: params()
                        return self.result(self.index)
                fetchall: params()
                        return self.result
                next: params()
                        increment self.index by one

                previous: params()
                        increment self.index by minus one


        '''        
        # Cursor contains followings

        self.result = None
        self.type_dict = None
        self.proxy_conf_dict = None
        self.index = 0
        self.query_execute_que = []

        if conf_dict is not None:
            self.conf_dict = conf_dict


    def execute(self, raw_query:str):
        # execute function 
        # sends query by post request to proxy, when sql is select statement
        # if not select statement, store the sql to self.query_execute_que

        if self.__parse_query_select_or_not(raw_query):
            url = f'http://{self.conf_dict["api_host"]}:{self.conf_dict["api_port"]}/query'
            result = requests.post(url, data=dict(sql=raw_query))


            # post request to /query endpoint returns result (list of dictionary) and type_dict (dictionary)
            self.result = json.loads(result.text)['result']
            self.type_dict = json.loads(result.text)['type_dict']


            # if type_dict contains key, mean that result contains either datetime, time or date
            # therefore those records needs to be converted to the python class object instead of string.
            if self.type_dict.keys() is not None and len(self.type_dict.keys()) > 0:
                for i in range(len(self.result)):
                    for key in self.type_dict.keys():
                        self.result[i][key] = Type.parse_string_to(self.result[i][key], self.type_dict[key])

        else:
            self.query_execute_que.append(raw_query)



    def commit(self):
        # commit function
        # if there are stored raw_query in self.query_execute_que, send post request

        if len(self.query_execute_que) == 0:
            pass

        else:
            url = f'http://{self.conf_dict["api_host"]}:{self.conf_dict["api_port"]}/query'
            result = requests.post(url, data=dict(sql=self.query_execute_que, transaction=True))  

        self.query_execute_que = None      




    def fetchone(self):
        # fetchone function
        # if there is record (dictionary) in self.result,
        # and if self.index is whithin the len(self.result)
        # return result[self.index] (one correspoinding record)

        assert self.result is not None, 'cursor does not have a result to fetch'
        if len(self.result) > 0:
            try:
                return self.result[self.index]
            except:
                raise Exception('cursor index is not appropriate for result')
        else:
            pass


        pass

    def fetchall(self):
        # fetch all function
        # if there is records (dictonary) in self.result,
        # return all teh result (as a list of dictionary)

        assert self.result is not None, 'cursor does not have a result to fetch'
        if len(self.result) > 0:
            return self.result
        else:
            pass

        pass

    def next(self):
        # next function
        # move index one forward

        self.index += 1

    def previous(self):
        # previous function
        # move index one backward

        self.index -= 1


    def __parse_query_select_or_not(self, raw_query:str):
        # parser for raw_query
        # raw_query: str
        # return True if raw_query is select statement,
        # False if raw_query is not select statement

        if raw_query.lower().startswith('select'):
            return True
        else:
            False


api_error.py
class Error:
    '''
        Error class
            TODO implementation

    '''      
    def __init__(self):
        pass
api_type.py
from enum import Enum
import datetime

class Type_Enum(Enum):
    '''
        Type_Enum class
            contains Data type which needs to be converted to python object from string in query result.


        class member:
            DATE = 'date'
            DATETIME = 'datetime'
            TIME = 'time'            

    '''     

    DATE = 'date'
    DATETIME = 'datetime'
    TIME = 'time'




class Type:

    '''
        Type class


        class member:

        class method:
            @staticmethod 
                parse_string_to: params(target_string:str, target_type:str)
                                 return either python datetime, date, time object which parsed string to.


    '''         
    def __init__(self):
        pass

    ## some data type such as date, datetime, time is returned as string, needs to be converted to corresponding python object.

    @staticmethod
    def parse_string_to(target_string:str, target_type:str):
        if target_type == Type_Enum.DATE.value:
            date_time_obj = datetime.datetime.strptime(target_string, '%Y-%m-%d')
            return date_time_obj.date()
        elif target_type == Type_Enum.TIME.value:
            date_time_obj = datetime.datetime.strptime(target_string, '%H:%M:%S.%f')
            return date_time_obj.time()               
        elif target_type == Type_Enum.DATETIME.value:
            date_time_obj = datetime.datetime.strptime(target_string, '%Y-%m-%d %H:%M:%S.%f')
            return date_time_obj          


our_api.py
from .api_connection import Connection

class Our_API:
    '''
        Our_API class    

        class member:

        class method:
            connect: params(conf_dict, conf_file)
                     return Connection class object

    '''  

    def __init__(self):
        pass

    def connect(conf_dict=None, conf_file=None):
        if conf_dict:
            return Connection(conf_dict)

        elif conf_file:
            return Connection(conf_file)

おわりに

これがJavaで書くとどんな感じになるのか楽しみです。

おわり。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0