概要
DB系のプロダクト開発でバックエンドがほぼほぼ完成した際に、
pythonやjavaで標準的に使われている形のドライバをラッパーとして用意し、ユーザの使いやすさを向上したいと考えました。
javaはまだ書いたことないので勉強が必要ですが、まずはpythonでラッパーを書くことになり、標準のドライバーの仕様について勉強し、実装の流れについてまとめてみます。
参考文献
- python の Mysql用のドライバ
- python のdb APIの仕様
内容
今回ナイーブながら実装してみたものをまとめると、以下になります。
- Our_Api クラス
- Connection クラス
- Cursor クラス
- Error クラス
- Type クラス
コード的には、われわれのドライバをつかって
#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# execute SQL query using execute() method.
cursor.execute("SELECT VERSION()")
# Fetch a single row using fetchone() method.
data = cursor.fetchone()
print "Database version : %s " % data
# disconnect from server
db.close()
みたいなことがしたいわけです。
ここでのMySQLdb
が自分たちの提供するドライバのライブラリになって、
MySQLdb
が Connection
クラスのオブジェクトを必要な設定変数とともに作ります。
また、Connection
クラスは Cursor
クラスのオブジェクトを生成します。
Cursor
クラスのはcursor.execute()
とか、cursor.fetchone()
のようなメソッドを持ち、
渡されたSQLの結果が格納されますし、それらの結果にアクセスして見ることができます。
また、Insert や Updateなどが行われるときは、
#!/usr/bin/python
import MySQLdb
# Open database connection
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# prepare a cursor object using cursor() method
cursor = db.cursor()
# Prepare SQL query to INSERT a record into the database.
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# Execute the SQL command
cursor.execute(sql)
# Commit your changes in the database
db.commit()
except:
# Rollback in case there is any error
db.rollback()
# disconnect from server
db.close()
のようにして、db.coimmit()
すなわちconnection
クラスのcommit()
メソッドを呼ぶことで変更したオペレーションを実際にDBに反映させ、もし戻したいと成った場合にはdb.rollback()
などのメソッドで元に戻すことができます。
また、Error
クラスはエラーのハンドリングを指定された形で行うために実装されます。
さらに、Type
クラスは、仮にDB操作のバックエンドがPython以外で書かれていた場合など、戻ってくるデータ型を的確にpythonの対応するデータ型に変更する際に使われます。
以上を踏まえて実装したものを簡単にまとめて紹介します。
実装
メインディレクトリでの
tree .
の結果は
.
├── api_core
│ ├── api_connection.py
│ ├── api_cursor.py
│ ├── api_error.py
│ ├── api_type.py
│ ├── __init__.py
│ └── our_api.py
├── main.py
└── requirements.txt
のように成っています。
from .api_cursor import Cursor
class Connection:
'''
Connection class
class member:
api_host: str
api_port: int
e.g) http://{api_host}:{api_port}
class method:
cursor: returns Cursor class object
close: call destructor to delete the class object itself
'''
def __init__(self, proxy_conf_dict = None, proxy_conf_file=None):
if proxy_conf_dict:
self.api_host = proxy_conf_dict['api_host']
self.api_port = proxy_conf_dict['api_port']
self.proxy_conf_dict = proxy_conf_dict
def __del__(self):
pass
def cursor(self):
return Cursor(self.proxy_conf_dict)
def close(self):
self.__del__()
import requests
import json
from .api_type import Type, Type_Enum
class Cursor:
def __init__(self, proxy_conf_dict=None):
'''
Cursor class
class member:
result: list of retrieved records (list of dictionary)
type_dict: dict of (telling which column is datetime, time or date)
conf_dict: dict of (host and port)
index which: integer telling where cursor points to
query_execute_que: list of raw_query (query not related to select statement and they will be sent when connection.commit)
class method:
execute: params(raw_query:str)
update self.result and self.type_dict
commit: params()
send post request for "not" select statement
fetchone: params()
return self.result(self.index)
fetchall: params()
return self.result
next: params()
increment self.index by one
previous: params()
increment self.index by minus one
'''
# Cursor contains followings
self.result = None
self.type_dict = None
self.proxy_conf_dict = None
self.index = 0
self.query_execute_que = []
if conf_dict is not None:
self.conf_dict = conf_dict
def execute(self, raw_query:str):
# execute function
# sends query by post request to proxy, when sql is select statement
# if not select statement, store the sql to self.query_execute_que
if self.__parse_query_select_or_not(raw_query):
url = f'http://{self.conf_dict["api_host"]}:{self.conf_dict["api_port"]}/query'
result = requests.post(url, data=dict(sql=raw_query))
# post request to /query endpoint returns result (list of dictionary) and type_dict (dictionary)
self.result = json.loads(result.text)['result']
self.type_dict = json.loads(result.text)['type_dict']
# if type_dict contains key, mean that result contains either datetime, time or date
# therefore those records needs to be converted to the python class object instead of string.
if self.type_dict.keys() is not None and len(self.type_dict.keys()) > 0:
for i in range(len(self.result)):
for key in self.type_dict.keys():
self.result[i][key] = Type.parse_string_to(self.result[i][key], self.type_dict[key])
else:
self.query_execute_que.append(raw_query)
def commit(self):
# commit function
# if there are stored raw_query in self.query_execute_que, send post request
if len(self.query_execute_que) == 0:
pass
else:
url = f'http://{self.conf_dict["api_host"]}:{self.conf_dict["api_port"]}/query'
result = requests.post(url, data=dict(sql=self.query_execute_que, transaction=True))
self.query_execute_que = None
def fetchone(self):
# fetchone function
# if there is record (dictionary) in self.result,
# and if self.index is whithin the len(self.result)
# return result[self.index] (one correspoinding record)
assert self.result is not None, 'cursor does not have a result to fetch'
if len(self.result) > 0:
try:
return self.result[self.index]
except:
raise Exception('cursor index is not appropriate for result')
else:
pass
pass
def fetchall(self):
# fetch all function
# if there is records (dictonary) in self.result,
# return all teh result (as a list of dictionary)
assert self.result is not None, 'cursor does not have a result to fetch'
if len(self.result) > 0:
return self.result
else:
pass
pass
def next(self):
# next function
# move index one forward
self.index += 1
def previous(self):
# previous function
# move index one backward
self.index -= 1
def __parse_query_select_or_not(self, raw_query:str):
# parser for raw_query
# raw_query: str
# return True if raw_query is select statement,
# False if raw_query is not select statement
if raw_query.lower().startswith('select'):
return True
else:
False
class Error:
'''
Error class
TODO implementation
'''
def __init__(self):
pass
from enum import Enum
import datetime
class Type_Enum(Enum):
'''
Type_Enum class
contains Data type which needs to be converted to python object from string in query result.
class member:
DATE = 'date'
DATETIME = 'datetime'
TIME = 'time'
'''
DATE = 'date'
DATETIME = 'datetime'
TIME = 'time'
class Type:
'''
Type class
class member:
class method:
@staticmethod
parse_string_to: params(target_string:str, target_type:str)
return either python datetime, date, time object which parsed string to.
'''
def __init__(self):
pass
## some data type such as date, datetime, time is returned as string, needs to be converted to corresponding python object.
@staticmethod
def parse_string_to(target_string:str, target_type:str):
if target_type == Type_Enum.DATE.value:
date_time_obj = datetime.datetime.strptime(target_string, '%Y-%m-%d')
return date_time_obj.date()
elif target_type == Type_Enum.TIME.value:
date_time_obj = datetime.datetime.strptime(target_string, '%H:%M:%S.%f')
return date_time_obj.time()
elif target_type == Type_Enum.DATETIME.value:
date_time_obj = datetime.datetime.strptime(target_string, '%Y-%m-%d %H:%M:%S.%f')
return date_time_obj
from .api_connection import Connection
class Our_API:
'''
Our_API class
class member:
class method:
connect: params(conf_dict, conf_file)
return Connection class object
'''
def __init__(self):
pass
def connect(conf_dict=None, conf_file=None):
if conf_dict:
return Connection(conf_dict)
elif conf_file:
return Connection(conf_file)
おわりに
これがJavaで書くとどんな感じになるのか楽しみです。
おわり。