はじめに
- 環境
- windows7
- python3.5.4
- wxPython4.0.0b1
- 利用シーン
- エクセルでのデータ管理に限界を感じて、mysqlを使用している AND
- 自身はMySQL Workbenchを使用できる AND
- 上司はデータの閲覧のみ出来ればよく、MySQL Workbenchの使い方を覚える気がない
そんな方には活用していただけるかもしれません。
説明
コード全文
test.py
import csv
import datetime
import decimal
import json
import os.path
import re
from operator import itemgetter
import mysql.connector
import wx
import wx.adv
import wx.lib.mixins.listctrl as listmix
COLOR_INPUT_REQUIRED = "#f9cda9"
COLOR_ENABLE_MODIFY = "#ffdead"
COLOR_READ_ONLY = "white"
#設定ファイルのパス取得
dir_path = os.path.dirname(os.path.realpath(__file__))
settings_dir = r"settings.json"
settings_json_dir = os.path.join(dir_path, settings_dir)
class FrameRoot(wx.Frame):
def __init__(self, my_width, my_height, my_x, my_y):
wx.Frame.__init__(self, None, id = wx.ID_ANY, title = "test", size = (my_width, my_height), pos = (my_x, my_y))
# 二重起動の防止
self.instance = wx.SingleInstanceChecker(self.GetTitle())
if self.instance.IsAnotherRunning():
wx.Exit()
self.my_notebook = None
self.my_notebook_tabs = None
self.my_sql = None
self.target_list_ctrl = None
#メニューバーの初期化
mymenu = self.SetMenuBar(MyMenu())
panel_header = wx.Panel(self, wx.ID_ANY, size = (0,23))
panel_header.SetBackgroundColour("#59A6E5")
self.panel_top = wx.Panel(self, wx.ID_ANY)
panel_bottom = wx.Panel(self, wx.ID_ANY)
panel_bottom.SetBackgroundColour("#4682b4")
txt_search = wx.StaticText(panel_header, wx.ID_ANY, "filter rows", pos=(10,3))
self.txtbox_search = wx.TextCtrl(panel_header, wx.ID_ANY,size=(100,20),pos=(80,2),style=wx.TE_PROCESS_ENTER)
self.Bind(wx.EVT_TEXT_ENTER, self.OnEnterTextBoxSearch, self.txtbox_search)
# タブの名前設定
self.tab_name = []
for i in range(len(json_dict["root_frame"]["tabs"])):
self.tab_name.append(json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["tab_name"])
sizer_root = wx.FlexGridSizer(3, 1, 1, 1)
sizer_root.Add(panel_header, flag=wx.GROW)
sizer_root.Add(self.panel_top, flag=wx.GROW)
sizer_root.Add(panel_bottom, flag=wx.GROW)
sizer_root.AddGrowableRow(1)
sizer_root.AddGrowableCol(0)
self.SetSizer(sizer_root)
# ボタン作成
btn_update = wx.Button(panel_bottom, wx.ID_ANY, "更新")
btn_export_csv = wx.Button(panel_bottom, wx.ID_ANY, "Export-Csv")
btn_close = wx.Button(panel_bottom, wx.ID_ANY, "閉じる")
# ボタン設置
sizer_bottom = wx.BoxSizer(wx.HORIZONTAL)
sizer_bottom.Add(btn_update, flag=wx.GROW)
sizer_bottom.Add(btn_export_csv, flag=wx.GROW)
sizer_bottom.Add(btn_close, flag=wx.GROW)
panel_bottom.SetSizer(sizer_bottom)
# イベント設定
self.Bind(wx.EVT_MENU, self.ExitHandler, menu_Exit)#メニューの終了
self.Bind(wx.EVT_CLOSE, self.ExitHandler)#×ボタン
self.Bind(wx.EVT_BUTTON, self.OnClickBtnUpdate, btn_update)#更新ボタン
self.Bind(wx.EVT_BUTTON, self.OnClickBtnExportCsv, btn_export_csv)#csvボタン
self.Bind(wx.EVT_BUTTON, self.ExitHandler, btn_close)#閉じるボタン
def CreateMyFrameNotebook(self):
self.my_notebook = MyNotebookPanel(self.panel_top, len(self.tab_name), self.tab_name)
def CreateMyFrameListCtrl(self):
# タブを取得
self.my_notebook_tabs = self.my_notebook.GetTabs()
my_connect_db = MyConnectDb()
self.my_sql = []
for i in range(len(json_dict["root_frame"]["tabs"])):
self.my_sql.append(json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["sql"])
# 列の幅を設定
my_list_width = []
try:
for i in range(len(json_dict["root_frame"]["tabs"])):
my_list_width.append([])
for v in range(len(json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["col_width"])):
my_list_width[i].append(json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["col_width"]["col" + str(v) + "_width"])
except Exception as err:
wx.MessageBox(str(err), 'Info')
# リストをタブに設置
my_listctrl = []
for i in range(len(self.my_notebook_tabs)):
record, col_name = my_connect_db.ExecuteSelect(self.my_sql[i])
my_listctrl.append(MyListCtrl(self.my_notebook_tabs[i], my_list_width[i], col_name))
my_listctrl[i].AddColumn(record)
my_listctrl[i].AddRecord(record)
#イベント設定
my_listctrl[i].Bind(wx.EVT_LIST_COL_CLICK, self.OnClickCol)
# 以下の関数で使用するためにリストを取得
target_tab = self.my_notebook.GetChildren()
self.target_list_ctrl = []
for i in range(len(target_tab)):
self.target_list_ctrl.append(target_tab[i].GetChildren()[0])
def UpdateMyFrameListCtrl(self):
my_connect_db = MyConnectDb()
# リストの中身を削除して再作成
for i in range(len(self.target_list_ctrl)):
self.target_list_ctrl[i].DeleteAllItems()
record, col_name = my_connect_db.ExecuteSelect(self.my_sql[i])
self.target_list_ctrl[i].AddRecord(record)
self.target_list_ctrl[i].filter_flag = False
def OnClickBtnUpdate(self, event):
self.UpdateMyFrameListCtrl()
def OnClickCol(self, event):
# クリックした列をソートする
event.GetEventObject().SortListCtrl(event)
def OnEnterTextBoxSearch(self, event):
# アクティブなタブのリストを検索
target = self.my_notebook_tabs[self.my_notebook.GetSelection()].GetChildren()
target[0].FilterRows(event, event.GetEventObject().GetLineText(0))
def OnClickBtnExportCsv(self, event):
# ファイル選択ダイアログを表示
dialog = wx.FileDialog(None, '名前を付けて保存', style=wx.FD_SAVE, wildcard=".csv" )
active_tab_index = self.my_notebook.GetSelection()
tab_name = self.my_notebook.GetPageText(active_tab_index)
dialog.SetFilename(tab_name)
res = dialog.ShowModal()
if res == wx.ID_CANCEL: return None
save_file_path = dialog.GetPath()
dialog.Destroy()
# アクティブなタブのリストをCSVに出力する
target_list_ctrl = self.my_notebook_tabs[self.my_notebook.GetSelection()].GetChildren()
csv_list = []
for row_cnt in range(target_list_ctrl[0].GetItemCount()):
csv_list.append([])
for col_cnt in range(target_list_ctrl[0].GetColumnCount()):
csv_list[row_cnt].append(target_list_ctrl[0].GetItemText(row_cnt,col_cnt))
col_cnt = 0
# 列名を先頭行に追加
col_list = []
for col_cnt in range(target_list_ctrl[0].GetColumnCount()):
col_list.append(target_list_ctrl[0].GetColumn(col_cnt).GetText())
csv_list.insert(0, col_list)
try:
# windowsではnewlineパラメーターが必要
with open(save_file_path,'w', newline='') as f:
writer = csv.writer(f)
for row_cnt in range(0, len(csv_list)):
writer.writerow(csv_list[row_cnt])
wx.MessageBox("csvに保存しました。", 'info')
except Exception as err:
wx.MessageBox(str(err), 'error')
def ExitHandler(self, event):
try:
window_rect = self.GetRect()
global json_dict
# windowサイズと位置を保存
json_dict["root_frame"]["window_rect"]["x"] = window_rect[0]
json_dict["root_frame"]["window_rect"]["y"] = window_rect[1]
json_dict["root_frame"]["window_rect"]["width"] = window_rect[2]
json_dict["root_frame"]["window_rect"]["height"] = window_rect[3]
# 列の幅を初期化
for i in range(len(json_dict["root_frame"]["tabs"])):
json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["col_width"] = {}
# 列の幅を保存
for i in range(len(self.target_list_ctrl)):
for v in range(self.target_list_ctrl[i].GetColumnCount()):
json_dict["root_frame"]["tabs"]["tab" + str(i + 1)]["col_width"]["col" + str(v) + "_width"] = self.target_list_ctrl[i].GetColumnWidth(v)
global settings_json_dir
f2 = open(settings_json_dir, 'w', encoding='utf-8')
json.dump(json_dict, f2, indent = 4, separators = (',', ': '), ensure_ascii = False)
f2.close()
except Exception as err:
wx.MessageBox(str(err), 'Info')
return None
wx.Exit()
class MyMenu(wx.MenuBar):
def __init__(self):
wx.MenuBar.__init__(self)
menu_file = wx.Menu()
global menu_Exit
menu_Exit = menu_file.Append(wx.ID_EXIT, "終了")
self.Append(menu_file, "ファイル")
class MyNotebookPanel(wx.Notebook):
def __init__(self, parent, cnt_tab, tab_name):
wx.Notebook.__init__(self, parent, wx.ID_ANY)
self.SetBackgroundColour("#b0c4de")
self.panel_tab = []
for i in range(cnt_tab):
self.panel_tab.append(i)
self.panel_tab[i] = wx.Panel(self, wx.ID_ANY)
self.InsertPage(i, self.panel_tab[i], tab_name[i])
self.sizer_notebook = wx.BoxSizer(wx.VERTICAL)
self.sizer_notebook.Add(self, proportion=1, flag=wx.GROW)
parent.SetSizer(self.sizer_notebook)
def GetTabs(self):
return self.panel_tab
class MyListCtrl(wx.ListCtrl):
def __init__(self, parent, col_width, col_name):
wx.ListCtrl.__init__(self,parent,wx.ID_ANY,style = wx.LC_REPORT | wx.LC_HRULES)
self.col_width = col_width
self.col_name = col_name
# 列のデータ型がint,decimalである列番号を保存
self.col_int_type = []
self.col_decimal_type = []
# 小数点以下の桁数
self.col_decimal_type_after_point = None
# sortの状態を保存 False:昇順, True:降順
self.sort_flag = False
# リスト検索に使用
self.filter_data = None
self.filter_data_sorted = None
self.filtered_data = None
# リストを検索したかのフラグ
self.filter_flag = False
self.my_listctrl_data_sorted = None
self.initial_my_listctrl_data = None
# settings_jsonのwidth数とカラム数が異なる場合にデフォルトでwidth100を設定する。
if len(self.col_width) != len(self.col_name): self.col_width = [100 for i in range(len(self.col_name))]
sizer_mylistctrl = wx.BoxSizer(wx.VERTICAL)
sizer_mylistctrl.Add(self, proportion = 1, flag = wx.GROW)
parent.SetSizer(sizer_mylistctrl)
def AddColumn(self, record):
# 1列目が右寄せにならないのは仕様?
# recordがない場合は列のみ作成
if record == []:
for i in range(len(self.col_name)):
self.AppendColumn(self.col_name[i], wx.LIST_FORMAT_LEFT, self.col_width[i])
else:
# 列の設定 列のデータ型を取得するためintとdecimalで分岐(後ほど整数はカンマ区切りにするため)
for i in range(len(self.col_name)):
if isinstance(record[0][self.col_name[i]], decimal.Decimal):
self.AppendColumn(self.col_name[i], wx.LIST_FORMAT_RIGHT, self.col_width[i])
self.col_decimal_type.append(i)
# ここで小数点以下の桁数を取得する。
str_len = len(str(record[0][self.col_name[i]]))
point = str(record[0][self.col_name[i]]).find('.') + 1
self.col_decimal_type_after_point = str_len - point
elif isinstance(record[0][self.col_name[i]], int):
self.AppendColumn(self.col_name[i], wx.LIST_FORMAT_RIGHT, self.col_width[i])
self.col_int_type.append(i)
else:
self.AppendColumn(self.col_name[i], wx.LIST_FORMAT_LEFT , self.col_width[i])
def AddRecord(self, record):
# recordがない場合処理中断
if record == []: return None
# 1列目をセット
# 行の設定
for i in range(len(record)):
self.InsertItem(i,str(record[i][self.col_name[0]]))
# 2列目移行をセット
for row_i in range(len(record)):
for col_i in range(len(self.col_name)):
# データ型をチェックして数値ならカンマ区切りにする
if col_i in self.col_int_type:
self.SetItem(row_i, col_i, '{:,}'.format(record[row_i][self.col_name[col_i]]))
else:
self.SetItem(row_i, col_i, str(record[row_i][self.col_name[col_i]]))
def AddList(self, list_data):
if list_data == []: return None
# 1列目をセット
# 行の設定
for i in range(len(list_data)):
self.InsertItem(i,str(list_data[i][0]))
# 2列目移行をセット
for row_i in range(len(list_data)):
for col_i in range(len(self.col_name)):
self.SetItem(row_i, col_i, str(list_data[row_i][col_i]))
# リストを配列に入れる
def GetListItem(self):
my_listctrl_data = []
for row_cnt in range(self.GetItemCount()):
my_listctrl_data.append([])
for col_cnt in range(self.GetColumnCount()):
if col_cnt in self.col_int_type:
my_listctrl_data[row_cnt].append(int(self.GetItemText(row_cnt,col_cnt).replace(",", "")))
elif col_cnt in self.col_decimal_type:
my_listctrl_data[row_cnt].append(decimal.Decimal(self.GetItemText(row_cnt,col_cnt)))
else:
my_listctrl_data[row_cnt].append(self.GetItemText(row_cnt,col_cnt))
col_cnt = 0
return my_listctrl_data
def SetInitialData():
self.initial_my_listctrl_data = self.GetListItem()
def SortList(self, my_listctrl_data, event):
# 表のデータを並び替える
# itemgetterの引数に設定した列(0から始まる)をkeyにして二次元配列をsortする。
my_listctrl_data.sort(key = itemgetter(event.GetColumn()), reverse = self.sort_flag)
# sortしたdataを配列にいれてカンマ区切りにする。
self.my_listctrl_data_sorted = []
for row_cnt in range(self.GetItemCount()):
self.my_listctrl_data_sorted.append([])
for col_cnt in range(self.GetColumnCount()):
if col_cnt in self.col_int_type:
self.my_listctrl_data_sorted[row_cnt].append('{:,}'.format(my_listctrl_data[row_cnt][col_cnt]))
elif col_cnt in self.col_decimal_type:
tmp = '{:.' + str(self.col_decimal_type_after_point) + 'f}'
self.my_listctrl_data_sorted[row_cnt].append(tmp.format(my_listctrl_data[row_cnt][col_cnt]))
else:
self.my_listctrl_data_sorted[row_cnt].append(my_listctrl_data[row_cnt][col_cnt])
col_cnt = 0
# filter_dataがあればそれもsortする
if self.filter_data != None:
self.filter_data.sort(key = itemgetter(event.GetColumn()), reverse = self.sort_flag)
self.filter_data_sorted = []
for row_cnt in range(self.GetItemCount()):
self.filter_data_sorted.append([])
for col_cnt in range(self.GetColumnCount()):
if col_cnt in self.col_int_type:
self.filter_data_sorted[row_cnt].append('{:,}'.format(self.filter_data[row_cnt][col_cnt]))
elif col_cnt in self.col_decimal_type:
tmp = '{:.' + str(self.col_decimal_type_after_point) + 'f}'
self.filter_data_sorted[row_cnt].append(tmp.format(self.filter_data[row_cnt][col_cnt]))
else:
self.filter_data_sorted[row_cnt].append(self.filter_data[row_cnt][col_cnt])
col_cnt = 0
# sortのreversを反転
self.sort_flag = not(self.sort_flag)
def SortListCtrl(self, event):
self.SortList(self.GetListItem(), event)
self.DeleteAllItems()
self.AddList(self.my_listctrl_data_sorted)
def FilterRows(self, event, filter_txt):
if self.filter_flag == False:
# 格納した配列を検索
self.filter_data = self.GetListItem()
else:
pass
# filter_txtに一致する行を配列に格納
# filter_txtが空白の場合は上記で格納したデータがすべて下記配列に格納されるので、空白の場合の条件分岐はしていない。
filtered_data = []
for i in range(len(self.filter_data)):
for v in range(len(self.filter_data[0])):
pattern = filter_txt
text = str(self.filter_data[i][v])
match = re.search(pattern, text)
if match is None:
pass
else:
filtered_data.append(self.filter_data[i])
break
# 上記で格納して配列をformatしてリストに表示する
filtered_data_format = []
for row_cnt in range(len(filtered_data)):
filtered_data_format.append([])
for col_cnt in range(len(filtered_data[0])):
if col_cnt in self.col_int_type:
filtered_data_format[row_cnt].append('{:,}'.format(filtered_data[row_cnt][col_cnt]))
elif col_cnt in self.col_decimal_type:
tmp = '{:.' + str(self.col_decimal_type_after_point) + 'f}'
filtered_data_format[row_cnt].append(tmp.format(filtered_data[row_cnt][col_cnt]))
else:
filtered_data_format[row_cnt].append(filtered_data[row_cnt][col_cnt])
col_cnt = 0
self.DeleteAllItems()
self.AddList(filtered_data_format)
self.filter_flag = True
class MyConnectDb:
def __init__(self):
self.user = json_dict['mysql_config']['user']
self.password = json_dict['mysql_config']['password']
self.host = json_dict['mysql_config']['host']
self.database = json_dict['mysql_config']['database']
def ExecuteSelect(self, sql):
try:
# mysql接続
conn = mysql.connector.connect(
user = self.user,
password = self.password,
host = self.host,
database = self.database
)
cur = conn.cursor(dictionary = True)
cur.execute(sql)
result_record = cur.fetchall()
result_col_name = cur.column_names
cur.close()
conn.close()
except mysql.connector.Error as err:
wx.MessageBox(str(err), 'Info')
return None
return (result_record, result_col_name)
def ExecuteSelectParam(self, sql, param):
try:
# mysql接続
conn = mysql.connector.connect(
user = self.user,
password = self.password,
host = self.host,
database = self.database
)
cur = conn.cursor(dictionary = True)
cur.execute(sql, param)
result_record = cur.fetchall()
result_col_name = cur.column_names
cur.close()
conn.close()
except mysql.connector.Error as err:
wx.MessageBox(str(err), 'Info')
return None
return (result_record, result_col_name)
def ExecuteInsert(self, sql, param):
try:
# mysql接続
conn = mysql.connector.connect(
user = self.user,
password = self.password,
host = self.host,
database = self.database
)
cur = conn.cursor()
cur.execute(sql, param)
conn.commit()
cur.close()
conn.close()
result = True
return result
except mysql.connector.Error as err:
wx.MessageBox(str(err), 'Info')
return None
return None
def ExecuteInsertMulti(self, sql, param):
try:
# mysql接続
conn = mysql.connector.connect(
user = self.user,
password = self.password,
host = self.host,
database = self.database
)
cur = conn.cursor()
cur.executemany(sql, param)
conn.commit()
cur.close()
conn.close()
result = True
return result
except mysql.connector.Error as err:
wx.MessageBox(str(err), 'Info')
return None
return None
def ExecuteDeleteMulti(self, sql, param):
if param == []: return None
try:
# mysql接続
conn = mysql.connector.connect(
user = self.user,
password = self.password,
host = self.host,
database = self.database
)
cur = conn.cursor()
cur.executemany(sql, param)
conn.commit()
cur.close()
conn.close()
result = True
return result
except mysql.connector.Error as err:
wx.MessageBox(str(err), 'Info')
return None
return None
class OpenSettingsJson:
def __init__(self):
return None
def OpenSettingsJson():
global settings_json_dir
try:
f = open(settings_json_dir, 'r', encoding="utf-8_sig")
global json_dict
json_dict = json.load(f)
f.close()
except Exception as e:
print(e)
return None
return json_dict
if __name__ == "__main__":
my_open_settings_json = OpenSettingsJson
json_dict = my_open_settings_json.OpenSettingsJson()
my_width = json_dict["root_frame"]["window_rect"]["width"]
my_height = json_dict["root_frame"]["window_rect"]["height"]
my_x = json_dict["root_frame"]["window_rect"]["x"]
my_y = json_dict["root_frame"]["window_rect"]["y"]
application = wx.App()
root_frame = FrameRoot(my_width, my_height, my_x, my_y)
root_frame.CreateMyFrameNotebook()
root_frame.CreateMyFrameListCtrl()
root_frame.Show()
application.MainLoop()
settings.json
{
"mysql_config": {
"password": "password",
"database": "database",
"user": "user",
"host": "host"
},
"root_frame": {
"tabs": {
"tab1": {
"col_width": {
"col1_width": 64,
"col0_width": 47,
"col2_width": 70,
"col3_width": 93
},
"tab_name": "tab1",
"sql": "SELECT * FROM base;"
},
"tab2": {
"col_width": {
"col1_width": 217,
"col0_width": 48
},
"tab_name": "tab2",
"sql": "SELECT id, CAST(sum(count) AS SIGNED) FROM base;"
},
"tab3": {
"col_width": {
"col1_width": 276,
"col0_width": 39
},
"tab_name": "tab3",
"sql": "SELECT id, CAST(amount/count AS DECIMAL(3,2)) FROM base;"
}
},
"window_rect": {
"height": 575,
"x": 282,
"width": 923,
"y": 100
}
}
}
mysqlのデータ構造
下記のようなテーブルを作成して、テストデータを挿入します。
CREATE TABLE `base` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`year` varchar(45) DEFAULT NULL,
`count` int(11) DEFAULT NULL,
`amount` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
)
実行
test.pyとsettings.jsonを同じフォルダに設置して、コマンドプロンプトで下記のコマンドを実行します。
python test.py
tab3はCASTしたとおり小数点を表示しているのがわかります。
注意点
- mysqlのデータ型について
本プログラムは、数値は右揃え、整数値は3桁ごとにカンマを付けるように作成してあります。
しかし、テーブル作成で設定した列のデータ型とSELECTした結果のデータ型が異なる事があるので適切な型にCASTして下さい。
改善点
mysqlに接続するための情報(特にパスワード)をsettings.jsonに直接記載してあるので、暗号化などの工夫が必要と思います。