japronto備忘録(1)で、プロジェクトテンプレートを作成したが、Djangoのadmin機能のようなものがないと、開発効率が上がらないので、elixirのphoenixや、phpのphalconのdev-toolsようなCRUDのアプリを自動的に生成する仕組み(なんちゃってですが)を作成してみたのでそのアプリを投稿します。
1. 仕様
データベースの列情報、プライマリキー情報を取得して、以下の3つの処理を呼び出し、プロジェクトにテーブルのCRUDアプリを自動的に追加します。具体的には、以下の処理を行います。
(1). サービス層(project/apps/[テーブル名]/service.py)の作成
- 関数find_all()を出力する。
- 関数find_by_id()を出力する。
- 関数insertdata()を出力する。
- 関数updatedata()を出力する。
- 関数deletedata()を出力する。
(2). View層(project/apps/[テーブル名]/views.py)の作成
- 関数index()を出力する。
- 関数create()を出力する。
- 関数update()を出力する。
- 関数delete()を出力する。
- 上記を、japrontoのルータに追加する。
(3). HTMLテンプレートの作成
- project/templates/[テーブル名]/index.htmlを出力します。
- project/templates/[テーブル名]/create.htmlを出力します。
- project/templates/[テーブル名]/update.htmlを出力します。
- project/templates/[テーブル名]/delete.htmlを出力します。
※内容については、japronto備忘録(1)にサンプルファイルを掲載しています。
2. フォルダ/ファイル構成
│ createcrud.py
│
└─packages
│ create_services.py
│ create_views.py
│ create_templates.py
│ tableinfo.py
3. ソース
以下、ソースです。ツールなので、あまりきれいにかけてません、ご容赦ください。
(1) packages/create_services.py
import os
FIND_ALL_FUNC ="""
def find_all(cur):
sql = \"select * from {tablename}\"
dict_result = []
cur.execute(sql)
rows = cur.fetchall()
for row in rows:
dict_result.append(dict(row))
return dict_result
"""
def get_find_by_all(columndefs):
return FIND_ALL_FUNC.format(tablename=columndefs.tablename).strip()
FIND_BY_ID_FUNC ="""
def find_by_id(cur, query):
sql = \"select * from {tablename} {where} \"
cur.execute(sql, ({params}))
row = cur.fetchone()
return dict(row)
"""
def get_find_by_id(columndefs):
where = ""
joinstr =" where"
for pk in columndefs.pk_dict:
where+= f"{joinstr} {pk} = %s"
joinstr=" and"
params =""
for pk in columndefs.pk_dict:
params+= f"query['{pk}'],"
return FIND_BY_ID_FUNC.format(
tablename=columndefs.tablename, where=where, params=params ).strip()
INSERT_FUNC ="""
def insertdata(cur, query):
print(query)
sql = \"insert into {tablename} ({columns}) values ({values})\"
cnt = cur.execute(sql, ({params}))
return cnt
"""
def get_insert(columndefs):
columns = ""
params = ""
values = ""
comma =""
for column in columndefs.columns_dict:
#新規の場合、デフォルト値の設定がある場合は入力不要なのでinsertの項目を出力しない
if not (column['column_default']):
columns+=f"{comma}{column['column_name']}"
params+= f"query['{column['column_name']}'], "
values+=f"{comma}%s"
comma=","
return INSERT_FUNC.format(
tablename=columndefs.tablename,
columns=columns,
values=values,
params=params,
).strip()
UPDATE_FUNC ="""
def updatedata(cur, query):
print(query)
sql = \"update {tablename} set {columns} {where}\"
cnt = cur.execute(sql, ({params}))
return cnt
"""
def get_update(columndefs):
columns = ""
params = ""
comma = ""
for column in columndefs.columns_dict:
#変更の場合、制約の設定がある場合は入力不要なのでinsertの項目を出力しない
if not column['constraint_name']:
columns+=f"{comma}{column['column_name']} = %s"
params+= f"query['{column['column_name']}'], "
comma=","
where = ""
joinstr =" where"
#プライマリキーから、where句を出力する
for pk in columndefs.pk_dict:
where+= f"{joinstr} {pk} = %s"
joinstr=" and"
for pk in columndefs.pk_dict:
params+= f"query['{pk}'], "
return UPDATE_FUNC.format(
tablename=columndefs.tablename,
columns=columns,
where=where,
params=params,
).strip()
DELETE_FUNC ="""
def deletedata(cur, query):
print(query)
sql = \"delete from {tablename} {where}\"
cnt = cur.execute(sql, ({params}))
return cnt
"""
def get_delete(columndefs):
where = ""
joinstr =" where"
for pk in columndefs.pk_dict:
where+= f"{joinstr} {pk} = %s"
joinstr=" and"
params =""
for pk in columndefs.pk_dict:
params+= f"query['{pk}'], "
return DELETE_FUNC.format(
tablename=columndefs.tablename,
where=where,
params=params,
).strip()
def create_service(app_path, columndefs, methods, admin, schema):
dirname = os.path.join(app_path, columndefs.tablename)
if (not os.path.exists(dirname)):
os.mkdir(dirname)
filepath = os.path.join(dirname, "service.py")
with open(filepath, 'w')as f:
f.write(get_find_by_all(columndefs))
f.write("\n")
f.write("\n")
f.write(get_find_by_id(columndefs))
f.write("\n")
f.write("\n")
f.write(get_insert(columndefs))
f.write("\n")
f.write("\n")
f.write(get_update(columndefs))
f.write("\n")
f.write("\n")
f.write(get_delete(columndefs))
print(filepath)
def write_services(prjpath, columndefs, methods, admin, schema):
app_path = os.path.join(prjpath, "apps")
if (not os.path.exists(app_path)):
os.mkdir(app_path)
create_service(app_path, columndefs, methods, admin, schema)
(2) packages/create_views.py
import os
def write_header(f, columndefs):
f.write("from api.app import app, controller, get_tpl\n")
f.write("from psycopg2.extras import DictCursor\n")
f.write("from apps.%s.service import find_all, find_by_id, insertdata, updatedata, deletedata\n" % columndefs.tablename)
f.write("\n")
def write_comon(f, columndefs, crud, admin):
f.write("@controller\n")
f.write("def %s(request):\n" % crud)
f.write("\tloginuser = request.is_loggedin\n")
if admin:
f.write("\thtml=get_tpl('/admin/%s/%s.html')\n" % (columndefs.tablename, crud))
else:
f.write("\thtml=get_tpl('/%s/%s.html')\n" % (columndefs.tablename, crud))
f.write("\tresult={}\n")
def write_index(f, columndefs, crud):
f.write("\tconn = request.connection\n")
f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
f.write("\t\tresult=find_all(cur)\n")
def write_create(f, columndefs, crud):
f.write("\tif request.method.lower()==\"post\":\n")
f.write("\t\tconn = request.connection\n")
f.write("\t\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
f.write("\t\t\tresult=insertdata(cur, request.form)\n")
f.write("\t\tconn.commit()\n")
def write_update(f, columndefs, crud):
f.write("\tconn = request.connection\n")
f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
f.write("\t\tif not request.method.lower()==\"post\":\n")
f.write("\t\t\tresult=find_by_id(cur, request.query)\n")
f.write("\t\telse:\n")
f.write("\t\t\tresult=updatedata(cur, request.form)\n")
f.write("\t\tconn.commit()\n")
def write_delete(f, columndefs, crud):
f.write("\tconn = request.connection\n")
f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
f.write("\t\tif not request.method.lower()==\"post\":\n")
f.write("\t\t\tresult=find_by_id(cur, request.query)\n")
f.write("\t\telse:\n")
f.write("\t\t\tresult=deletedata(cur, request.form)\n")
f.write("\t\tconn.commit()\n")
def create_views(app_path, columndefs, methods, admin):
filepath=os.path.join(app_path, columndefs.tablename+"/views.py")
with open(filepath, 'w')as f:
write_header(f, columndefs)
for crud in methods:
write_comon(f, columndefs, crud, admin)
if crud=="index":
write_index(f, columndefs, crud)
elif crud=="create":
write_create(f, columndefs, crud)
elif crud=="update":
write_update(f, columndefs, crud)
elif crud=="delete":
write_delete(f, columndefs, crud)
f.write("\treturn request.Response(text=html.render(data=result, user=loginuser),mime_type='text/html') \n")
f.write("\n")
for crud in methods:
url=""
if admin:
url="/admin"
url+="/"+columndefs.tablename
if not crud=="index":
url+="/"+crud
f.write("app.router.add_route('%s', %s)\n" % (url, crud))
print(filepath)
def write_views(prjpath, columndefs, methods, admin):
app_path = os.path.join(prjpath, "apps")
if (not os.path.exists(app_path)):
os.mkdir(app_path)
create_views(app_path, columndefs, methods, admin)
(3) packages/create_templates.py
import os
_btnsubmit="<button class='btn btn-sm btn-success ml-2' type='submit'>submit</button>"
def get_title(crud, tablename):
return "<h1>%s %s</h1>\n" % (tablename, crud, )
def get_url_path(tablename, crud, admin):
urladmin = "/admin" if admin else ""
url = "/"+tablename if crud=="index" else "/"+tablename+"/"+crud
return "%s%s" %(urladmin, url)
def get_anchor(tablename, crud, admin, params=None):
url = get_url_path(tablename, crud, admin)
if params:
url+=params
btnclass="btn-primary"
if crud=="update":
btnclass="btn-info"
elif crud=="delete":
btnclass="btn-danger"
return "<a class='btn btn-sm %s ml-2' style='width: 60px;' href='%s'>%s</a>" %(btnclass, url, crud)
def output_index(f,columndefs, admin):
get_params=""
joinstr="?"
f.write("{% extends 'base.html' %}\n{% block content %}\n")
f.write(get_title("list", columndefs.tablename))
f.write("<table class='table table-sm table-striped table-borderd'>\n")
f.write("<thead>\n<tr>\n")
for row in columndefs.columns_dict:
f.write("<th>%s</th>" % row["column_name"])
if row["column_name"] in columndefs.pk_dict:
get_params+="%s%s={{ row.%s }}" % (joinstr, row["column_name"], row["column_name"])
joinstr="&"
anchor = get_anchor(columndefs.tablename, "create", admin)
f.write("<th style='width: 160px;'>%s</th>\n" % (anchor, ))
f.write("</tr>\n</thead>\n")
f.write("<tbody>\n")
f.write("{% for row in data %}\n")
f.write("<tr>\n")
for row in columndefs.columns_dict:
f.write("")
f.write("<td>{{ row.%s }}</td>\n" % row["column_name"])
updateanchor = get_anchor(columndefs.tablename, "update", admin, get_params)
deleteanchor = get_anchor(columndefs.tablename, "delete", admin, get_params)
f.write("<td>%s%s</td>\n" % (updateanchor, deleteanchor ))
f.write("</tr>\n")
f.write("{% endfor %}\n")
f.write("</tbody>\n</table>\n")
f.write("{% endblock %}\n")
def output_cud(f,columndefs, method, admin):
f.write("{% extends 'base.html' %}\n{% block content %}\n")
f.write(get_title(method, columndefs.tablename))
url=get_url_path(columndefs.tablename, method, admin)
f.write("\t<form class='needs-validation' method='post' action='%s' enctype='application/x-www-form-urlencoded'>\n" % (url,))
require=""
readonly=""
style=""
inputtype="text"
if method=="delete":
readonly="readonly"
for row in columndefs.columns_dict:
# print("%s is_null=%s" % (row['column_name'], row['is_nullable']))
if (method in ("create", "update") and row['is_nullable']=='NO'):
require='required'
# print("require=%s" % require)
if not (method=="create" and row['column_default']):
if row['column_default']:
style="style='display:none;'"
else:
style=""
f.write("\t\t<div class='form-group' %s>\n" % (style, ))
f.write("\t\t\t<label class='form-label'>%s</label>\n" % (row["column_name"]))
if row["column_name"]=="password":
inputtype="password"
else:
inputtype="text"
if method in ("update", "delete", ):
# value="value='{{ data.%s }}'"
f.write("\t\t\t<input class='form-control' type='%s' name='%s' value='{{ data.%s }}' %s %s/>\n" %
(inputtype, row["column_name"], row["column_name"], readonly, require))
else:
f.write("\t\t\t<input class='form-control' type='%s' name='%s' %s/>\n" %
(inputtype, row["column_name"], require, ))
f.write("\t\t</div>\n")
gobackbtn = get_anchor(columndefs.tablename, "index", admin)
f.write("\t\t%s\n%s\n" % (_btnsubmit, gobackbtn))
f.write("\t</form>\n")
f.write("{% endblock %}\n")
def write_templates(prjpath, columndefs, methods, admin):
templates_root_path = os.path.join(prjpath, "templates")
if admin:
adminroot = os.path.join(templates_root_path, "admin")
templates_root_path = adminroot
if (not os.path.exists(templates_root_path)):
os.mkdir(templates_root_path)
templates_path = os.path.join(templates_root_path, "%s" % columndefs.tablename)
if (not os.path.exists(templates_path)):
os.mkdir(templates_path)
for method in methods:
filepath = os.path.join(templates_path, method + ".html")
with open(filepath, 'w') as f:
if method == "index":
output_index(f, columndefs, admin)
else:
output_cud(f, columndefs, method, admin)
print("create template path=%s" % filepath)
(4) packages/tableinfo.py
プロジェクトのデータベース情報を読み込んでデータベースに接続し、テーブルの列およびプライマリキー情報を取得します。
import psycopg2
import psycopg2.extras
import configparser
_sql = "select " \
" col.column_name " \
" ,col.ordinal_position " \
" ,col.column_default " \
" ,col.is_nullable " \
" ,col.data_type " \
" ,col.character_maximum_length as max_length " \
" ,col.character_octet_length as oct_length " \
" ,col.numeric_precision as num_precision " \
" ,col.numeric_precision_radix as num_radix " \
" ,col.numeric_scale as num_scale " \
" ,col.datetime_precision as dt_precision " \
" ,cu.constraint_name " \
" ,kcu.ordinal_position as pk_ordinal " \
"from " \
" information_schema.columns col " \
"left join " \
" information_schema.constraint_column_usage cu " \
"on " \
" col.table_catalog=cu.table_catalog " \
"and " \
" col.table_schema=cu.table_schema " \
"and " \
" col.table_name=cu.table_name " \
"and " \
" col.column_name=cu.column_name " \
"left join " \
" information_schema.key_column_usage kcu " \
"on " \
" cu.constraint_catalog = kcu.constraint_catalog " \
"and " \
" cu.constraint_schema = kcu.constraint_schema " \
"and " \
" cu.constraint_name = kcu.constraint_name " \
"and " \
" cu.table_catalog = kcu.table_catalog " \
"and " \
" cu.table_schema = kcu.table_schema " \
"and " \
" cu.table_name = kcu.table_name " \
"and " \
" cu.column_name = kcu.column_name " \
"where " \
" col.table_catalog=%s " \
"and " \
" col.table_schema=%s " \
"and " \
" col.table_name=%s " \
"order by " \
" col.ordinal_position "
class ColumnDefs:
def __init__(self, prjpath, tablename, schema):
config = configparser.ConfigParser()
config.read(prjpath+"/config/database.conf")
host=config['DB']['host']
port=config['DB']['port']
dbname=config['DB']['dbname']
user=config['DB']['user']
password=config['DB']['password']
dsn = f"host={host} port={port} dbname={dbname} user={user} password={password}"
#
self.tablename = tablename
self.pk_dict = list()
self.columns_dict = []
with psycopg2.connect(dsn) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
schemaname = "public" if not schema else schema
cur.execute(_sql, (dbname, schemaname, tablename))
rows = cur.fetchall()
if len(rows)==0:
print("Error ColumnDef Not Found!!")
print(_sql)
print((dbname, schemaname, tablename))
for row in rows:
dictrow = dict(row)
# print(dictrow)
if dictrow['pk_ordinal'] is not None:
self.pk_dict.append(dictrow['column_name'])
self.columns_dict.append(dictrow)
# print(self.columns_dict)
(5) createcrud.py
実行用。プロジェクトのパス、テーブル名などを引数に処理を実行する、メイン処理です。
import os
import sys
import argparse
from packages.tableinfo import ColumnDefs
from packages.create_services import write_services
from packages.create_views import write_views
from packages.create_templates import write_templates
parser = argparse.ArgumentParser()
parser.add_argument("projectpath", type=str, help="need <project_abs_path>")
parser.add_argument("table", type=str, help="need tablename")
parser.add_argument("--schema", type=str, help="--schema=<schemaname>")
parser.add_argument("--admin", help="--admin", action="store_true")
args = parser.parse_args()
try:
if args.schema:
print("schema=%s" % args.schema)
if args.admin:
print("admin controller")
PROJECT_PATH = args.projectpath
TABLE_NAME = args.table
print("tablename="+TABLE_NAME)
columndefs = ColumnDefs(PROJECT_PATH, TABLE_NAME, args.schema )
if len(columndefs.columns_dict)==0:
errmsg = """Error!! not found table.
confirm your database. table={0} schema={1}
""".format(TABLE_NAME,"public" if not args.schema else args.schema)
raise TableNotFoundException( errmsg )
METHODS = ["index","create","update","delete"]
write_services(PROJECT_PATH, columndefs, METHODS, args.admin, args.schema)
write_views(PROJECT_PATH, columndefs, METHODS, args.admin)
write_templates(PROJECT_PATH, columndefs, METHODS, args.admin)
except:
print("Error!! not found table. confirm your database. table=%s" % TABLE_NAME)
traceback.print_exc()
sys.exit()
4. 使い方
(1). ターゲットとなるデータベース(postgreSQL)にテーブルを追加します。
create table "public".todos (
id bigserial primary key
, todo character varying(255) not null
);
(2). ターゲットとなるプロジェクトのデータベース(postgreSQL)情報を設定します。
conf/database.conf
[DB]
minconn=10
maxconn=50
host=localhost
port=5432
dbname=japrontodb
user=testuser
password=************************
(3). コマンドプロンプトから、以下のコマンドを実行します。
python .\createcrud.py C:\linux_home\japronto\mysite todos
(4). プロジェクトファイルに追加したアプリを設定する。
templates/base.htmlの「header」ブロックに、追加したアプリのURLを追加します。
templates/base.html
・・・・(省略)・・・・・
<header style="background: #263238;padding-left:5px;padding-right:5px;">
<div class="navi">
<div class="navi left" style="width:70%; ">
<div class="title">My Project</div>
{% if user %}
<div>
<a href="/todos">todo</a>
</div>
{% endif %}
</div>
・・・・(省略)・・・・・
5. 出来上がり
python main.py