LoginSignup
0
0

More than 3 years have passed since last update.

japronto備忘録(2) CRUDアプリケーションを自動的に生成する

Posted at

japronto備忘録(1)で、プロジェクトテンプレートを作成したが、Djangoのadmin機能のようなものがないと、開発効率が上がらないので、elixirのphoenixや、phpのphalconのdev-toolsようなCRUDのアプリを自動的に生成する仕組み(なんちゃってですが)を作成してみたのでそのアプリを投稿します。

1. 仕様

データベースの列情報、プライマリキー情報を取得して、以下の3つの処理を呼び出し、プロジェクトにテーブルのCRUDアプリを自動的に追加します。具体的には、以下の処理を行います。

(1). サービス層(project/apps/[テーブル名]/service.py)の作成

  1. 関数find_all()を出力する。
  2. 関数find_by_id()を出力する。
  3. 関数insertdata()を出力する。
  4. 関数updatedata()を出力する。
  5. 関数deletedata()を出力する。

(2). View層(project/apps/[テーブル名]/views.py)の作成

  1. 関数index()を出力する。
  2. 関数create()を出力する。
  3. 関数update()を出力する。
  4. 関数delete()を出力する。
  5. 上記を、japrontoのルータに追加する。

(3). HTMLテンプレートの作成

  1. project/templates/[テーブル名]/index.htmlを出力します。
  2. project/templates/[テーブル名]/create.htmlを出力します。
  3. project/templates/[テーブル名]/update.htmlを出力します。
  4. project/templates/[テーブル名]/delete.htmlを出力します。

※内容については、japronto備忘録(1)にサンプルファイルを掲載しています。

2. フォルダ/ファイル構成

│  createcrud.py
│  
└─packages
    │  create_services.py
    │  create_views.py
    │  create_templates.py
    │  tableinfo.py

3. ソース

以下、ソースです。ツールなので、あまりきれいにかけてません、ご容赦ください。

(1) packages/create_services.py

import os

FIND_ALL_FUNC ="""
def find_all(cur):
    sql = \"select * from {tablename}\"
    dict_result = []
    cur.execute(sql)
    rows = cur.fetchall()
    for row in rows:
        dict_result.append(dict(row))
    return dict_result
"""

def get_find_by_all(columndefs):
    return FIND_ALL_FUNC.format(tablename=columndefs.tablename).strip()

FIND_BY_ID_FUNC ="""
def find_by_id(cur, query):
    sql = \"select * from {tablename} {where} \"
    cur.execute(sql, ({params}))
    row = cur.fetchone()
    return dict(row)
"""

def get_find_by_id(columndefs):
    where = ""
    joinstr =" where"
    for pk in columndefs.pk_dict:
        where+= f"{joinstr} {pk} = %s"
        joinstr=" and"
    params =""
    for pk in columndefs.pk_dict:
        params+= f"query['{pk}'],"
    return FIND_BY_ID_FUNC.format(
        tablename=columndefs.tablename, where=where, params=params ).strip()


INSERT_FUNC ="""
def insertdata(cur, query):
    print(query)
    sql = \"insert into {tablename} ({columns}) values ({values})\"
    cnt = cur.execute(sql, ({params}))
    return cnt
"""

def get_insert(columndefs):
    columns = ""
    params = ""
    values = ""
    comma =""
    for column in columndefs.columns_dict:
        #新規の場合、デフォルト値の設定がある場合は入力不要なのでinsertの項目を出力しない
        if not (column['column_default']):
            columns+=f"{comma}{column['column_name']}"
            params+= f"query['{column['column_name']}'], "
            values+=f"{comma}%s"
            comma=","
    return INSERT_FUNC.format(
        tablename=columndefs.tablename, 
        columns=columns, 
        values=values,
        params=params,
        ).strip()

UPDATE_FUNC ="""
def updatedata(cur, query):
    print(query)
    sql = \"update {tablename} set {columns} {where}\"
    cnt = cur.execute(sql, ({params}))
    return cnt
"""

def get_update(columndefs):
    columns = ""
    params = ""
    comma = ""
    for column in columndefs.columns_dict:
        #変更の場合、制約の設定がある場合は入力不要なのでinsertの項目を出力しない
        if not column['constraint_name']:
            columns+=f"{comma}{column['column_name']} = %s"
            params+= f"query['{column['column_name']}'], "
            comma=","
    where = ""
    joinstr =" where"
    #プライマリキーから、where句を出力する
    for pk in columndefs.pk_dict:
        where+= f"{joinstr} {pk} = %s"
        joinstr=" and"
    for pk in columndefs.pk_dict:
        params+= f"query['{pk}'], "
    return UPDATE_FUNC.format(
        tablename=columndefs.tablename, 
        columns=columns, 
        where=where, 
        params=params,
        ).strip()

DELETE_FUNC ="""
def deletedata(cur, query):
    print(query)
    sql = \"delete from {tablename} {where}\"
    cnt = cur.execute(sql, ({params}))
    return cnt
"""

def get_delete(columndefs):
    where = ""
    joinstr =" where"
    for pk in columndefs.pk_dict:
        where+= f"{joinstr} {pk} = %s"
        joinstr=" and"
    params =""
    for pk in columndefs.pk_dict:
        params+= f"query['{pk}'], "
    return DELETE_FUNC.format(
        tablename=columndefs.tablename, 
        where=where, 
        params=params,
        ).strip()

def create_service(app_path, columndefs, methods, admin, schema):

    dirname = os.path.join(app_path, columndefs.tablename)
    if (not os.path.exists(dirname)):
        os.mkdir(dirname)

    filepath = os.path.join(dirname, "service.py")
    with open(filepath, 'w')as f:
        f.write(get_find_by_all(columndefs))
        f.write("\n")
        f.write("\n")
        f.write(get_find_by_id(columndefs))
        f.write("\n")
        f.write("\n")
        f.write(get_insert(columndefs))
        f.write("\n")
        f.write("\n")
        f.write(get_update(columndefs))
        f.write("\n")
        f.write("\n")
        f.write(get_delete(columndefs))
    print(filepath)

def write_services(prjpath, columndefs, methods, admin, schema):

    app_path = os.path.join(prjpath, "apps")

    if (not os.path.exists(app_path)):
        os.mkdir(app_path)

    create_service(app_path, columndefs, methods, admin, schema)

(2) packages/create_views.py

import os

def write_header(f, columndefs):
    f.write("from api.app import app, controller, get_tpl\n")
    f.write("from psycopg2.extras import DictCursor\n")
    f.write("from apps.%s.service import find_all, find_by_id, insertdata, updatedata, deletedata\n" % columndefs.tablename)
    f.write("\n")

def write_comon(f, columndefs, crud, admin):
    f.write("@controller\n")
    f.write("def %s(request):\n" % crud)
    f.write("\tloginuser = request.is_loggedin\n")
    if admin:
        f.write("\thtml=get_tpl('/admin/%s/%s.html')\n" % (columndefs.tablename, crud))
    else:
        f.write("\thtml=get_tpl('/%s/%s.html')\n" % (columndefs.tablename, crud))
    f.write("\tresult={}\n")

def write_index(f, columndefs, crud):
    f.write("\tconn = request.connection\n")
    f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
    f.write("\t\tresult=find_all(cur)\n")

def write_create(f, columndefs, crud):
    f.write("\tif request.method.lower()==\"post\":\n")
    f.write("\t\tconn = request.connection\n")
    f.write("\t\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
    f.write("\t\t\tresult=insertdata(cur, request.form)\n")
    f.write("\t\tconn.commit()\n")

def write_update(f, columndefs, crud):
    f.write("\tconn = request.connection\n")
    f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
    f.write("\t\tif not request.method.lower()==\"post\":\n")
    f.write("\t\t\tresult=find_by_id(cur, request.query)\n")
    f.write("\t\telse:\n")
    f.write("\t\t\tresult=updatedata(cur, request.form)\n")
    f.write("\t\tconn.commit()\n")

def write_delete(f, columndefs, crud):
    f.write("\tconn = request.connection\n")
    f.write("\twith conn.cursor(cursor_factory=DictCursor) as cur:\n")
    f.write("\t\tif not request.method.lower()==\"post\":\n")
    f.write("\t\t\tresult=find_by_id(cur, request.query)\n")
    f.write("\t\telse:\n")
    f.write("\t\t\tresult=deletedata(cur, request.form)\n")
    f.write("\t\tconn.commit()\n")

def create_views(app_path, columndefs, methods, admin):
    filepath=os.path.join(app_path, columndefs.tablename+"/views.py")
    with open(filepath, 'w')as f:
        write_header(f, columndefs)
        for crud in methods:
            write_comon(f, columndefs, crud, admin)
            if crud=="index":
                write_index(f, columndefs, crud)
            elif crud=="create":
                write_create(f, columndefs, crud)
            elif crud=="update":
                write_update(f, columndefs, crud)
            elif crud=="delete":
                write_delete(f, columndefs, crud)
            f.write("\treturn request.Response(text=html.render(data=result, user=loginuser),mime_type='text/html') \n")
            f.write("\n")
        for crud in methods:
            url=""
            if admin:
                url="/admin"
            url+="/"+columndefs.tablename

            if not crud=="index":
                url+="/"+crud

            f.write("app.router.add_route('%s', %s)\n" % (url, crud))

    print(filepath)

def write_views(prjpath, columndefs, methods, admin):

    app_path = os.path.join(prjpath, "apps")

    if (not os.path.exists(app_path)):
        os.mkdir(app_path)

    create_views(app_path, columndefs, methods, admin)

(3) packages/create_templates.py

import os

_btnsubmit="<button class='btn btn-sm btn-success ml-2' type='submit'>submit</button>"

def get_title(crud, tablename):
  return "<h1>%s %s</h1>\n" % (tablename, crud, )

def get_url_path(tablename, crud, admin):
  urladmin = "/admin" if admin else ""
  url = "/"+tablename if crud=="index" else "/"+tablename+"/"+crud
  return "%s%s" %(urladmin, url)

def get_anchor(tablename, crud, admin, params=None):
  url = get_url_path(tablename, crud, admin)
  if params:
    url+=params
  btnclass="btn-primary"
  if crud=="update":
    btnclass="btn-info"
  elif crud=="delete":
    btnclass="btn-danger"

  return "<a class='btn btn-sm %s ml-2' style='width: 60px;' href='%s'>%s</a>" %(btnclass, url, crud)

def output_index(f,columndefs, admin):
  get_params=""
  joinstr="?"
  f.write("{% extends 'base.html' %}\n{% block content %}\n")
  f.write(get_title("list", columndefs.tablename))
  f.write("<table class='table table-sm table-striped table-borderd'>\n")
  f.write("<thead>\n<tr>\n")
  for row in columndefs.columns_dict:
    f.write("<th>%s</th>" % row["column_name"])
    if row["column_name"] in columndefs.pk_dict:
      get_params+="%s%s={{ row.%s }}" % (joinstr, row["column_name"], row["column_name"])
      joinstr="&"
  anchor = get_anchor(columndefs.tablename, "create", admin)
  f.write("<th style='width: 160px;'>%s</th>\n" % (anchor, ))
  f.write("</tr>\n</thead>\n")
  f.write("<tbody>\n")
  f.write("{% for row in data %}\n")
  f.write("<tr>\n")
  for row in columndefs.columns_dict:
    f.write("")
    f.write("<td>{{ row.%s }}</td>\n" % row["column_name"])
  updateanchor = get_anchor(columndefs.tablename, "update", admin, get_params)
  deleteanchor = get_anchor(columndefs.tablename, "delete", admin, get_params)
  f.write("<td>%s%s</td>\n" % (updateanchor, deleteanchor ))
  f.write("</tr>\n")
  f.write("{% endfor %}\n")
  f.write("</tbody>\n</table>\n")
  f.write("{% endblock %}\n")

def output_cud(f,columndefs, method, admin):
  f.write("{% extends 'base.html' %}\n{% block content %}\n")
  f.write(get_title(method, columndefs.tablename))
  url=get_url_path(columndefs.tablename, method, admin)
  f.write("\t<form class='needs-validation' method='post' action='%s' enctype='application/x-www-form-urlencoded'>\n" % (url,))
  require=""
  readonly=""
  style=""
  inputtype="text"
  if method=="delete":
    readonly="readonly"
  for row in columndefs.columns_dict:
    # print("%s is_null=%s" % (row['column_name'], row['is_nullable']))
    if (method in ("create", "update") and row['is_nullable']=='NO'):
      require='required'
      # print("require=%s" % require)
    if not (method=="create" and row['column_default']):
      if row['column_default']:
        style="style='display:none;'"
      else:
        style=""
      f.write("\t\t<div class='form-group' %s>\n" % (style, ))
      f.write("\t\t\t<label class='form-label'>%s</label>\n" % (row["column_name"]))
      if row["column_name"]=="password":
        inputtype="password"
      else:
        inputtype="text"
      if method in ("update", "delete", ):
        # value="value='{{ data.%s }}'"
        f.write("\t\t\t<input class='form-control' type='%s' name='%s' value='{{ data.%s }}' %s %s/>\n" % 
          (inputtype, row["column_name"], row["column_name"], readonly, require))
      else:
        f.write("\t\t\t<input class='form-control' type='%s' name='%s' %s/>\n" % 
          (inputtype, row["column_name"], require, ))
      f.write("\t\t</div>\n")
  gobackbtn = get_anchor(columndefs.tablename, "index", admin)
  f.write("\t\t%s\n%s\n" % (_btnsubmit, gobackbtn))
  f.write("\t</form>\n")
  f.write("{% endblock %}\n")

def write_templates(prjpath, columndefs, methods, admin):

  templates_root_path = os.path.join(prjpath, "templates")

  if admin:
    adminroot = os.path.join(templates_root_path, "admin")
    templates_root_path = adminroot

  if (not os.path.exists(templates_root_path)):
      os.mkdir(templates_root_path)

  templates_path = os.path.join(templates_root_path, "%s" % columndefs.tablename)

  if (not os.path.exists(templates_path)):
      os.mkdir(templates_path)

  for method in methods:
    filepath = os.path.join(templates_path, method + ".html")
    with open(filepath, 'w') as f:
      if method == "index":
        output_index(f, columndefs, admin)
      else:
        output_cud(f, columndefs, method, admin)
    print("create template path=%s" % filepath)

(4) packages/tableinfo.py

プロジェクトのデータベース情報を読み込んでデータベースに接続し、テーブルの列およびプライマリキー情報を取得します。

import psycopg2
import psycopg2.extras
import configparser

_sql = "select " \
"  col.column_name " \
" ,col.ordinal_position " \
" ,col.column_default " \
" ,col.is_nullable " \
" ,col.data_type " \
" ,col.character_maximum_length as max_length " \
" ,col.character_octet_length   as oct_length " \
" ,col.numeric_precision        as num_precision " \
" ,col.numeric_precision_radix  as num_radix " \
" ,col.numeric_scale            as num_scale " \
" ,col.datetime_precision       as dt_precision " \
" ,cu.constraint_name " \
" ,kcu.ordinal_position         as pk_ordinal " \
"from " \
"  information_schema.columns col " \
"left join " \
"  information_schema.constraint_column_usage cu " \
"on " \
"  col.table_catalog=cu.table_catalog " \
"and " \
"  col.table_schema=cu.table_schema " \
"and " \
"  col.table_name=cu.table_name " \
"and " \
"  col.column_name=cu.column_name " \
"left join " \
"  information_schema.key_column_usage kcu " \
"on " \
"  cu.constraint_catalog =  kcu.constraint_catalog " \
"and " \
"  cu.constraint_schema  =  kcu.constraint_schema " \
"and " \
"  cu.constraint_name    =  kcu.constraint_name " \
"and " \
"  cu.table_catalog      =  kcu.table_catalog " \
"and " \
"  cu.table_schema       =  kcu.table_schema " \
"and " \
"  cu.table_name         =  kcu.table_name " \
"and " \
"  cu.column_name        =  kcu.column_name " \
"where " \
"  col.table_catalog=%s " \
"and " \
"  col.table_schema=%s " \
"and " \
"  col.table_name=%s " \
"order by " \
"  col.ordinal_position "

class ColumnDefs:
    def __init__(self, prjpath, tablename, schema):
        config = configparser.ConfigParser()
        config.read(prjpath+"/config/database.conf")
        host=config['DB']['host']
        port=config['DB']['port']
        dbname=config['DB']['dbname']
        user=config['DB']['user']
        password=config['DB']['password']
        dsn = f"host={host} port={port} dbname={dbname} user={user} password={password}" 

        #
        self.tablename = tablename
        self.pk_dict = list()
        self.columns_dict = []
        with psycopg2.connect(dsn) as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
                schemaname = "public" if not schema else schema
                cur.execute(_sql, (dbname, schemaname, tablename))
                rows = cur.fetchall()
                if len(rows)==0:
                    print("Error ColumnDef Not Found!!")
                    print(_sql)
                    print((dbname, schemaname, tablename))
                for row in rows:
                    dictrow = dict(row)
                    # print(dictrow)
                    if dictrow['pk_ordinal'] is not None:
                        self.pk_dict.append(dictrow['column_name'])
                    self.columns_dict.append(dictrow)
        # print(self.columns_dict)

(5) createcrud.py

実行用。プロジェクトのパス、テーブル名などを引数に処理を実行する、メイン処理です。

import os
import sys
import argparse
from packages.tableinfo import ColumnDefs
from packages.create_services import write_services
from packages.create_views import write_views
from packages.create_templates import write_templates

parser = argparse.ArgumentParser()
parser.add_argument("projectpath", type=str, help="need <project_abs_path>")
parser.add_argument("table", type=str, help="need tablename")
parser.add_argument("--schema", type=str, help="--schema=<schemaname>")
parser.add_argument("--admin", help="--admin", action="store_true")
args = parser.parse_args()

try:

  if args.schema:
      print("schema=%s" % args.schema)

  if args.admin:
      print("admin controller")

  PROJECT_PATH = args.projectpath
  TABLE_NAME = args.table

  print("tablename="+TABLE_NAME)

  columndefs = ColumnDefs(PROJECT_PATH, TABLE_NAME, args.schema )

  if len(columndefs.columns_dict)==0:
    errmsg = """Error!! not found table. 
        confirm your database. table={0} schema={1}
      """.format(TABLE_NAME,"public" if not args.schema else args.schema)
    raise TableNotFoundException( errmsg )

  METHODS = ["index","create","update","delete"]

  write_services(PROJECT_PATH, columndefs, METHODS, args.admin, args.schema)

  write_views(PROJECT_PATH, columndefs, METHODS, args.admin)

  write_templates(PROJECT_PATH, columndefs, METHODS, args.admin)

except:
  print("Error!! not found table. confirm your database. table=%s" % TABLE_NAME)
  traceback.print_exc()
  sys.exit()

4. 使い方

(1). ターゲットとなるデータベース(postgreSQL)にテーブルを追加します。

create table "public".todos (
    id bigserial primary key
  , todo character varying(255) not null
);

(2). ターゲットとなるプロジェクトのデータベース(postgreSQL)情報を設定します。

conf/database.conf

[DB]
minconn=10
maxconn=50 
host=localhost
port=5432 
dbname=japrontodb
user=testuser
password=************************

(3). コマンドプロンプトから、以下のコマンドを実行します。

python .\createcrud.py C:\linux_home\japronto\mysite todos

(4). プロジェクトファイルに追加したアプリを設定する。

templates/base.htmlの「header」ブロックに、追加したアプリのURLを追加します。

templates/base.html
・・・・(省略)・・・・・
    <header style="background: #263238;padding-left:5px;padding-right:5px;">
        <div class="navi">
          <div class="navi left" style="width:70%; ">
            <div class="title">My Project</div>
            {% if user  %}
            <div>
              <a href="/todos">todo</a>
            </div>
            {% endif %}
        </div>
・・・・(省略)・・・・・

5. 出来上がり

python main.py

image.png

http://127.0.0.1:7777/todos/update?id=1
image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0