はじめに
本記事は、Qiita夏祭り2020のテーマ 「 〇〇(言語)のみを使って、今△△(アプリ)を作るとしたら」 に沿った内容となっております。
2020/7/5に、拙作「VMDサイジングver5.00 (exe|code)」をリリースしました。
このツールは、「VMD(MMDモーションデータ)を、指定されたモデルに適切な頭身で再生成する」ことをテーマとしており、まぁ、この記事をご覧になっている大半の方には響かないであろう、まったくの趣味アプリです。
んが!!!
MMD(MikuMikuDance)を楽しまれているそれこそ大半の方が、ごくごく普通の、Pythonとかプログラムとは無縁の方々。
「蛇?!」と反応される方がいらっしゃったら、むしろ貴重な存在ではないかとすら思えてきます。
そんな方々にどうやったら、自作アプリを使っていただけるか。
この記事は、そんな苦悩と試行錯誤と七転八倒の末に見えてきた、 「UIぽちぽちしたら、後はなんかいい感じに調整してくれるアプリ」 を Pythonで作るには、というテーマに対する、私なりの答えをまとめたものとなります。
ちなみに、「VMDサイジング」は累計DL8500を超える程度には使っていただいております。(32bit版を合わせたら約9600DL)
Python + pyinstaller = exe
というのは、それほど珍しいネタではありませんが、 実運用に耐えうるというレベルに落とし込むには、それなりに工夫が必要です。
<「Cで作ればいいんじゃね?」
Pythonが好きなんです。(だってC分かんな…)
1. 環境構築
1.1. Anaconda のインストール
まずは、開発の環境を整えましょう。
<「機械学習するワケじゃないし、生のままでいいんじゃね?」
とんでもない!!
pyinstallerの記事でよく見かけるのが、「余計なライブラリが入って、exeファイルが大きくなる」という問題です。
開発中に新しいライブラリを試してみたりするのは、よくある事です。
ただ、そのままexeを作ってしまうと、exeの中にいらないライブラリが入ってしまう可能性も高くなります。
開発環境とリリース環境はきっちり分けましょう。
Anaconda公式から、インストーラーをDLします。
今から作られるのでしたら、3系でいかれた方がいいでしょう。
DLしたら、後は手順に沿ってインストールしてください。
1.2. 開発環境の構築
まずは、開発用の仮想環境を構築しましょう。
conda create -n pytest_env pip python=3.7
開発用環境が作れたら、activate
しましょう。
ついでに、ソースコードの管理ディレクトリも作ってしまいましょう。
1.3. リリース環境の構築
同様に、リリース環境も作成します、
conda create -n pytest_release pip python=3.7
1.4. ライブラリインストール
管理ディレクトリまで出来たらそちらに移動して、必要なライブラリをインストールしていきましょう。
ここでコツがひとつ。
pyinstaller
は、リリース環境にのみインストールする
pyinstaller
をリリース環境にのみインストールする事で、開発環境でうっかりリリースしてしまうミスを防ぎます。
せっかくなので、numpyを導入してみましょう。
開発環境用インストールコマンド
pip install numpy wxPython
リリース環境用インストールコマンド
pip install numpy wxPython pypiwin32 pyinstaller
pypiwin32
は pyinstaller
をWindowsで動かす際に必要なライブラリのようです。
GUIは、WxFormBuilderを使うと作りやすいです。
ただ、自動命名規約が分かりづらいとか部品の再利用などは出来ないとか、実運用アプリを作成するにはちょっと物足りないところもありますので、ある程度形になったところで出力してしまって、後は自力がオススメです。
参考:PythonでGUI(WxFormBuilder)(mm_sys様)
https://qiita.com/mm_sys/items/716cb159ea8c9e634300
この後は逆引き形式で、進めていきますね。
気になるセクションをご覧ください。
3. pythonでexe 逆引きTIPS集
3.1. GUIスレッドを動かしつつ、ロジックスレッドを動かす方法 ~中断機能を添えて
この記事に興味を持ってくださってる方の大半が気になってる事かと思いますw
私も正解があるなら是非知りたい。
ので、色々すっ飛ばして先頭に持ってきました。
- GUIスレッドはそのままで、長時間動作するロジックスレッドを別起動する
- ロジックスレッドの中で並列処理(multiprocess)も実行できる
- ロジックスレッドはプロセスを増やさない
- GUIスレッドを終了したら、ロジックスレッドも終了する
- 中断ボタンでロジックスレッドを終了する
- ロジックスレッド用ボタンはシングルクリックのみ有効(ダブルクリック無効化)
上記要件を満たすコードが下記となります。
# -*- coding: utf-8 -*-
#
import wx
import sys
import argparse
import numpy as np
import multiprocessing
from pathlib import Path
from form.MainFrame import MainFrame
from utils.MLogger import MLogger
VERSION_NAME = "ver1.00"
# 指数表記なし、有効小数点桁数6、30を超えると省略あり、一行の文字数200
np.set_printoptions(suppress=True, precision=6, threshold=30, linewidth=200)
# Windowsマルチプロセス対策
multiprocessing.freeze_support()
if __name__ == '__main__':
# 引数解釈
parser = argparse.ArgumentParser()
parser.add_argument("--verbose", default=20, type=int)
args = parser.parse_args()
# ロガー初期化
MLogger.initialize(level=args.verbose, is_file=False)
# GUI起動
app = wx.App(False)
frame = MainFrame(None, VERSION_NAME, args.verbose)
frame.Show(True)
app.MainLoop()
まずは呼び出し口の executor.py
。
ここからGUIを起動します。
# -*- coding: utf-8 -*-
#
from time import sleep
from worker.LongLogicWorker import LongLogicWorker
from form.ConsoleCtrl import ConsoleCtrl
from utils.MLogger import MLogger
import os
import sys
import wx
import wx.lib.newevent
logger = MLogger(__name__)
TIMER_ID = wx.NewId()
(LongThreadEvent, EVT_LONG_THREAD) = wx.lib.newevent.NewEvent()
# メインGUI
class MainFrame(wx.Frame):
def __init__(self, parent, version_name: str, logging_level: int):
self.version_name = version_name
self.logging_level = logging_level
self.elapsed_time = 0
self.worker = None
# 初期化
wx.Frame.__init__(self, parent, id=wx.ID_ANY, title=u"c01 Long Logic {0}".format(self.version_name), \
pos=wx.DefaultPosition, size=wx.Size(600, 650), style=wx.DEFAULT_FRAME_STYLE)
self.sizer = wx.BoxSizer(wx.VERTICAL)
# 処理回数
self.loop_cnt_ctrl = wx.SpinCtrl(self, id=wx.ID_ANY, size=wx.Size(100, -1), value="2", min=1, max=999, initial=2)
self.loop_cnt_ctrl.SetToolTip(u"処理回数")
self.sizer.Add(self.loop_cnt_ctrl, 0, wx.ALL, 5)
# 並列処理有無チェックボックス
self.multi_process_ctrl = wx.CheckBox(self, id=wx.ID_ANY, label="並列処理を実行したい場合、チェックを入れて下さい")
self.sizer.Add(self.multi_process_ctrl, 0, wx.ALL, 5)
# ボタン用Sizer
self.btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
# 実行ボタン
self.exec_btn_ctrl = wx.Button(self, wx.ID_ANY, u"長いロジック処理開始", wx.DefaultPosition, wx.Size(200, 50), 0)
# マウス左クリックイベントとのバインド 【Point.01】
self.exec_btn_ctrl.Bind(wx.EVT_LEFT_DOWN, self.on_exec_click)
# マウス左ダブルクリックイベントとのバインド 【Point.03】
self.exec_btn_ctrl.Bind(wx.EVT_LEFT_DCLICK, self.on_doubleclick)
self.btn_sizer.Add(self.exec_btn_ctrl, 0, wx.ALIGN_CENTER, 5)
# 中断ボタン
self.kill_btn_ctrl = wx.Button(self, wx.ID_ANY, u"長いロジック処理中断", wx.DefaultPosition, wx.Size(200, 50), 0)
# マウス左クリックイベントとのバインド
self.kill_btn_ctrl.Bind(wx.EVT_LEFT_DOWN, self.on_kill_click)
# マウス左ダブルクリックイベントとのバインド
self.kill_btn_ctrl.Bind(wx.EVT_LEFT_DCLICK, self.on_doubleclick)
# 初期状態は非活性
self.kill_btn_ctrl.Disable()
self.btn_sizer.Add(self.kill_btn_ctrl, 0, wx.ALIGN_CENTER, 5)
self.sizer.Add(self.btn_sizer, 0, wx.ALIGN_CENTER | wx.SHAPED, 0)
# コンソール【Point.06】
self.console_ctrl = ConsoleCtrl(self)
self.sizer.Add(self.console_ctrl, 1, wx.ALL | wx.EXPAND, 5)
# print出力先はコンソール【Point.05】
sys.stdout = self.console_ctrl
# 進捗ゲージ
self.gauge_ctrl = wx.Gauge(self, wx.ID_ANY, 100, wx.DefaultPosition, wx.DefaultSize, wx.GA_HORIZONTAL)
self.gauge_ctrl.SetValue(0)
self.sizer.Add(self.gauge_ctrl, 0, wx.ALL | wx.EXPAND, 5)
# イベントバインド【Point.05】
self.Bind(EVT_LONG_THREAD, self.on_exec_result)
self.SetSizer(self.sizer)
self.Layout()
# 画面中央に表示する
self.Centre(wx.BOTH)
# ダブルクリック無効化処理
def on_doubleclick(self, event: wx.Event):
self.timer.Stop()
logger.warning("ダブルクリックされました。", decoration=MLogger.DECORATION_BOX)
event.Skip(False)
return False
# 実行1クリックした時の処理
def on_exec_click(self, event: wx.Event):
# タイマーで若干遅らせて起動する(ダブルクリックとのバッティング回避)【Point.04】
self.timer = wx.Timer(self, TIMER_ID)
self.timer.StartOnce(200)
self.Bind(wx.EVT_TIMER, self.on_exec, id=TIMER_ID)
# 中断1クリックした時の処理
def on_kill_click(self, event: wx.Event):
self.timer = wx.Timer(self, TIMER_ID)
self.timer.StartOnce(200)
self.Bind(wx.EVT_TIMER, self.on_kill, id=TIMER_ID)
# 処理実行
def on_exec(self, event: wx.Event):
self.timer.Stop()
if not self.worker:
# コンソールクリア
self.console_ctrl.Clear()
# 実行ボタン無効化
self.exec_btn_ctrl.Disable()
# 中断ボタン有効化
self.kill_btn_ctrl.Enable()
# 別スレッドで実行【Point.09】
self.worker = LongLogicWorker(self, LongThreadEvent, self.loop_cnt_ctrl.GetValue(), self.multi_process_ctrl.GetValue())
self.worker.start()
event.Skip(False)
# 中断処理実行
def on_kill(self, event: wx.Event):
self.timer.Stop()
if self.worker:
# 停止状態でボタン押下時、停止
self.worker.stop()
logger.warning("長いロジック処理を中断します。", decoration=MLogger.DECORATION_BOX)
# ワーカー終了
self.worker = None
# 実行ボタン有効化
self.exec_btn_ctrl.Enable()
# 中断ボタン無効化
self.kill_btn_ctrl.Disable()
# プログレス非表示
self.gauge_ctrl.SetValue(0)
event.Skip(False)
# 長いロジックが終わった後の処理
def on_exec_result(self, event: wx.Event):
# 【Point.12】ロジック終了が明示的に分かるようにする
self.sound_finish()
# 実行ボタン有効化
self.exec_btn_ctrl.Enable()
# 中断ボタン無効化
self.kill_btn_ctrl.Disable()
if not event.result:
event.Skip(False)
return False
self.elapsed_time += event.elapsed_time
logger.info("\n処理時間: %s", self.show_worked_time())
# ワーカー終了
self.worker = None
# プログレス非表示
self.gauge_ctrl.SetValue(0)
def sound_finish(self):
# 終了音を鳴らす
if os.name == "nt":
# Windows
try:
import winsound
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS)
except Exception:
pass
def show_worked_time(self):
# 経過秒数を時分秒に変換
td_m, td_s = divmod(self.elapsed_time, 60)
if td_m == 0:
worked_time = "{0:02d}秒".format(int(td_s))
else:
worked_time = "{0:02d}分{1:02d}秒".format(int(td_m), int(td_s))
return worked_time
Point.01:マウス左クリックイベントと実行メソッドをバインド
何はともあれ、まずはボタン上でのマウス左クリックイベントと、その実行対象メソッドをバインドしましょう。
バインドの方法はいくつかありますが、個人的にはGUIパーツとイベントのバインドならば、パーツ.Bind(イベント種別, 発火メソッド)
という書き方が分かりやすくて好きです。
Point.02:左クリックイベントを、タイマーで若干遅らせて起動する
左クリックイベントを拾ってそのまま実行してしまうと、ダブルクリックイベントと同時発火してしまい、結果的にダブルクリックイベントが走ってしまいます。
(処理的には、ダブルクリックイベントはシングルクリックイベントの一部分なので、2つのイベントが同時発火します)
そのため、シングルクリックから発火された on_exec_click
、 on_kill_click
内でタイマーを設置して、若干遅らせて実行させることで、ダブルクリックイベントとバインドさせた実行メソッドが先に実行されるようにします。
Point.03:マウス左ダブルクリックイベントと実行メソッドをバインド
Point①と同じ手順で、左ダブルクリックイベントをバインドします。
ここで、ダブルクリックを拾う事で、二重処理を防ぐ事ができます。
Point.04:マウス左ダブルクリック実行メソッドでタイマーイベントを止める
ダブルクリックイベントでは、Point②で実行されたタイマーイベントを止めてしまいます。
これで、ダブルクリック無効化ができます。
シングルクリックイベントのみ発火時
… 若干遅れて対応イベントが実行される
ダブルクリックイベント発火時
… タイマーを止めてしまうので、シングルクリックイベントが実行されない
Point.05:print
の出力先をコンソールコントロールにする
print
は、sys.stdout.write
のラッパーなので、出力先をコンソールコントロールにしてしまえば、print
の出力先がコントロール内になります。
Point.06:コンソールコントロールをサブクラスで定義する
で、そのコンソールコントロールって何よ、って話ですが、wx.TextCtrl
のサブクラスです。
# -*- coding: utf-8 -*-
#
import wx
from utils.MLogger import MLogger # noqa
logger = MLogger(__name__)
class ConsoleCtrl(wx.TextCtrl):
def __init__(self, parent):
# 複数行可、読み取り専用、枠線ナシ、縦スクロールあり、横スクロールあり、キーイベント取得あり
super().__init__(parent, wx.ID_ANY, wx.EmptyString, wx.DefaultPosition, wx.Size(-1, -1), \
wx.TE_MULTILINE | wx.TE_READONLY | wx.BORDER_NONE | wx.HSCROLL | wx.VSCROLL | wx.WANTS_CHARS)
self.SetBackgroundColour(wx.SystemSettings.GetColour(wx.SYS_COLOUR_3DLIGHT))
# キーボードイベントバインド
self.Bind(wx.EVT_CHAR, lambda event: self.on_select_all(event, self.console_ctrl))
# コンソール部分の出力処理【Point.07】
def write(self, text):
try:
wx.CallAfter(self.AppendText, text)
except: # noqa
pass
# コンソール部分の全選択処理【Point.08】
def on_select_all(event, target_ctrl):
keyInput = event.GetKeyCode()
if keyInput == 1: # 1 stands for 'ctrl+a'
target_ctrl.SelectAll()
event.Skip()
Point.07:write
メソッド内で追記処理を行う
GUIスレッドとは異なるロジックスレッドから呼び出す事を想定して、CallAfter
で AppendText
メソッドを呼び出します。
これで、ロジックスレッドからの print
出力も安定して行えます。
Point.08:コンソールコントロール内の全選択イベント用メソッドを追加する
文字が出ていたら、それをコピーしたくなるのが人のサガ。
というわけで、全選択イベント(キーボードイベントの組み合わせ)で全選択処理を実行します。
Point.09:ロジックスレッドを別スレッドで実行する
やっと本題に入ってきました。
LongLogicWorker
内で、ロジック処理を実行します。
参考元:https://doloopwhile.hatenablog.com/entry/20090627/1275175850
# -*- coding: utf-8 -*-
#
import os
import wx
import time
from worker.BaseWorker import BaseWorker, task_takes_time
from service.MOptions import MOptions
from service.LongLogicService import LongLogicService
class LongLogicWorker(BaseWorker):
def __init__(self, frame: wx.Frame, result_event: wx.Event, loop_cnt: int, is_multi_process: bool):
# 処理回数
self.loop_cnt = loop_cnt
# マルチプロセスで実行するか
self.is_multi_process = is_multi_process
super().__init__(frame, result_event)
@task_takes_time
def thread_event(self):
start = time.time()
# パラとかオプションとか詰め込み
# max_workersの最大値は、Python3.8のデフォルト値に基づく
options = MOptions(self.frame.version_name, self.frame.logging_level, self.loop_cnt, max_workers=(1 if not self.is_multi_process else min(32, os.cpu_count() + 4)))
# ロジックサービス実行
LongLogicService(options).execute()
# 経過時間
self.elapsed_time = time.time() - start
def post_event(self):
# ロジック処理が終わった後のイベントを呼び出して実行する【Point.11】
wx.PostEvent(self.frame, self.result_event(result=self.result and not self.is_killed, elapsed_time=self.elapsed_time))
LongLogicWorker
は、BaseWorker
を継承しています。
# -*- coding: utf-8 -*-
#
import wx
import wx.xrc
from abc import ABCMeta, abstractmethod
from threading import Thread
from functools import wraps
import time
import threading
from utils.MLogger import MLogger # noqa
logger = MLogger(__name__)
# https://wiki.wxpython.org/LongRunningTasks
# https://teratail.com/questions/158458
# http://nobunaga.hatenablog.jp/entry/2016/06/03/204450
class BaseWorker(metaclass=ABCMeta):
"""Worker Thread Class."""
def __init__(self, frame, result_event):
"""Init Worker Thread Class."""
# 親GUI
self.frame = frame
# 経過時間
self.elapsed_time = 0
# スレッドが終わった後の呼び出しイベント
self.result_event = result_event
# 進捗ゲージ
self.gauge_ctrl = frame.gauge_ctrl
# 処理成功可否
self.result = True
# 停止命令有無
self.is_killed = False
# スレッド開始
def start(self):
self.run()
# スレッド停止
def stop(self):
# 中断FLGをONにする
self.is_killed = True
def run(self):
# スレッド実行
self.thread_event()
# 後処理実行
self.post_event()
def post_event(self):
wx.PostEvent(self.frame, self.result_event(result=self.result))
@abstractmethod
def thread_event(self):
pass
# https://doloopwhile.hatenablog.com/entry/20090627/1275175850
class SimpleThread(Thread):
""" 呼び出し可能オブジェクト(関数など)を実行するだけのスレッド """
def __init__(self, base_worker, acallable):
# 別スレッド内の処理
self.base_worker = base_worker
# 関数デコレータ内で動かすメソッド
self.acallable = acallable
# 関数デコレータの結果
self._result = None
# 中断FLG=OFFの状態で初期化する
super(SimpleThread, self).__init__(name="simple_thread", kwargs={"is_killed": False})
def run(self):
self._result = self.acallable(self.base_worker)
def result(self):
return self._result
def task_takes_time(acallable):
"""
関数デコレータ【Point.10】
acallable本来の処理は別スレッドで実行しながら、
ウィンドウを更新するwx.YieldIfNeededを呼び出し続けるようにする
"""
@wraps(acallable)
def f(base_worker):
t = SimpleThread(base_worker, acallable)
# デーモンで、親が死んだら子も殺す
t.daemon = True
t.start()
# スレッドが生きている間中、ウィンドウ描画を更新し続ける
while t.is_alive():
# 進捗ゲージをクルクル回す
base_worker.gauge_ctrl.Pulse()
# 必要ならばウィンドウを更新する
wx.YieldIfNeeded()
# 少しだけ待機
time.sleep(0.01)
if base_worker.is_killed:
# 【Point.23】呼び出し元から停止命令が出ている場合、自分(GUI)以外の全部のスレッドに終了命令
for th in threading.enumerate():
if th.ident != threading.current_thread().ident and "_kwargs" in dir(th):
th._kwargs["is_killed"] = True
break
return t.result()
return f
Point.10:関数デコレータで、GUIスレッドと別スレッドを動かす
これはもう参考元のサイト様からいただいてきたネタなのですが、関数デコレータで別スレッドを動かしつつ、GUIスレッドの描画を更新し続けます。
SimpleThread
の中で、acallable
を実行します。
この時に、BaseWorker
を保持しているのは、中断フラグを受渡すためです。
SimpleThread
が生きている間は、GUIスレッドは描画の更新と、中断の受付のみを行います。
(この辺は後ほど図示します)
Point.11:ロジック処理が終わった後のイベントを呼び出して実行する
あらかじめ、workerを初期化した際に、処理後の呼び出しイベントを渡していたので、それを wx.PostEvent
で実行します。
これでGUI内の処理に戻ってきます。
Point.12:ロジック終了が明示的に分かるようにする
ロジックが終わるまで、ずっとPCに貼り付いてらっしゃるユーザーさんばかりではないので、終わった時に分かりやすいよう、サイジングでは、INFO音を鳴らすようにしています。
Windowsの環境やLinux等他環境によってはエラーになってしまうので、try-exceptで鳴らせる場合のみ鳴らす、という風にした方がいいっぽいです。
ついでに経過時間を出してあげると、どのくらいかかったかの体感が数値化されるので、納得してもらいやすい気がします。
# -*- coding: utf-8 -*-
#
import logging
from time import sleep
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from service.MOptions import MOptions
from utils.MException import MLogicException, MKilledException
from utils.MLogger import MLogger # noqa
logger = MLogger(__name__)
class LongLogicService():
def __init__(self, options: MOptions):
self.options = options
def execute(self):
logging.basicConfig(level=self.options.logging_level, format="%(message)s [%(module_name)s]")
# 【Point.13】全体をtry-exceptで囲み、エラー内容を出力する
try:
# 普通にロジックを載せてもOK
self.execute_inner(-1)
logger.info("--------------")
# 並列タスクで分散させてもOK
futures = []
# 【Point.14】並列タスクには名前を付けておく
with ThreadPoolExecutor(thread_name_prefix="long_logic", max_workers=self.options.max_workers) as executor:
for n in range(self.options.loop_cnt):
futures.append(executor.submit(self.execute_inner, n))
#【Point.15】並列タスクは、一括発行後、終了を待つ
concurrent.futures.wait(futures, timeout=None, return_when=concurrent.futures.FIRST_EXCEPTION)
for f in futures:
if not f.result():
return False
logger.info("長いロジック処理終了", decoration=MLogger.DECORATION_BOX, title="ロジック終了")
return True
except MKilledException:
# 中断オプションによる終了の場合、そのまま結果だけ返す
return False
except MLogicException as se:
# データ不備エラー
logger.error("処理が処理できないデータで終了しました。\n\n%s", se.message, decoration=MLogger.DECORATION_BOX)
return False
except Exception as e:
# それ以外のエラー
logger.critical("処理が意図せぬエラーで終了しました。", e, decoration=MLogger.DECORATION_BOX)
return False
finally:
logging.shutdown()
def execute_inner(self, n: int):
for m in range(5):
logger.info("n: %s - m: %s", n, m)
sleep(1)
return True
Point.13:ロジックスレッドの中では、全体をtry-exceptで囲む
スレッドを分けているので、エラーをちゃんとexceptしないと、何だか分からないけどいきなり終わった、という事になりかねません。
後で追いかけるのも大変ですから、exceptしてログに出しておきましょう。
Point.14:並列タスクには名前を付けておく
並列タスクを実行する際には、どの処理が問題になっているのかが分かりやすいよう、prefixを付けておくとデバッグしやすいです。
Point.15:並列タスクは、一括発行後、終了を待つ
並列タスクは executor.submit
でまず発行した後、concurrent.futures.wait
で全処理が終了するまで待ちます。
その際に、何かしらExceptionが発生した時には処理を中断させるよう、concurrent.futures.FIRST_EXCEPTION
オプションを付けています。
# -*- coding: utf-8 -*-
#
from datetime import datetime
import logging
import traceback
import threading
from utils.MException import MKilledException
# 【Point.16】ロガーを自前実装する
class MLogger():
DECORATION_IN_BOX = "in_box"
DECORATION_BOX = "box"
DECORATION_LINE = "line"
DEFAULT_FORMAT = "%(message)s [%(funcName)s][P-%(process)s](%(asctime)s)"
DEBUG_FULL = 2
TEST = 5
TIMER = 12
FULL = 15
INFO_DEBUG = 22
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
total_level = logging.INFO
is_file = False
outout_datetime = ""
logger = None
# 初期化
# 【Point.17】モジュール単位の出力最低レベルを定義出来るようにしておく
def __init__(self, module_name, level=logging.INFO):
self.module_name = module_name
self.default_level = level
# ロガー
self.logger = logging.getLogger("PyLogicSample").getChild(self.module_name)
# 標準出力ハンドラ
sh = logging.StreamHandler()
sh.setLevel(level)
self.logger.addHandler(sh)
# 【Point.18】debugよりレベルの低いログメソッドを用意する
def test(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = self.TEST
kwargs["time"] = True
self.print_logger(msg, *args, **kwargs)
def debug(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = logging.DEBUG
kwargs["time"] = True
self.print_logger(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = logging.INFO
self.print_logger(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = logging.WARNING
self.print_logger(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = logging.ERROR
self.print_logger(msg, *args, **kwargs)
def critical(self, msg, *args, **kwargs):
if not kwargs:
kwargs = {}
kwargs["level"] = logging.CRITICAL
self.print_logger(msg, *args, **kwargs)
# 実際に出力する実態
def print_logger(self, msg, *args, **kwargs):
#【Point.22】現在実行しているスレッドに、中断FLG=ONが設定されていたら中断エラーとする
if "is_killed" in threading.current_thread()._kwargs and threading.current_thread()._kwargs["is_killed"]:
# 停止命令が出ている場合、エラー
raise MKilledException()
target_level = kwargs.pop("level", logging.INFO)
# アプリ単位およびモジュール単位のログレベルを両方満たしている場合のみ出力
if self.total_level <= target_level and self.default_level <= target_level:
if self.is_file:
for f in self.logger.handlers:
if isinstance(f, logging.FileHandler):
# 既存のファイルハンドラはすべて削除
self.logger.removeHandler(f)
# ファイル出力ありの場合、ハンドラ紐付け
# ファイル出力ハンドラ
fh = logging.FileHandler("log/PyLogic_{0}.log".format(self.outout_datetime))
fh.setLevel(self.default_level)
fh.setFormatter(logging.Formatter(self.DEFAULT_FORMAT))
self.logger.addHandler(fh)
# モジュール名を出力するよう追加
extra_args = {}
extra_args["module_name"] = self.module_name
# ログレコード生成
if args and isinstance(args[0], Exception):
# 【Point.19】Exceptionを受け取った時には、スタックトレースを出力する
log_record = self.logger.makeRecord('name', target_level, "(unknown file)", 0, "{0}\n\n{1}".format(msg, traceback.format_exc()), None, None, self.module_name)
else:
log_record = self.logger.makeRecord('name', target_level, "(unknown file)", 0, msg, args, None, self.module_name)
target_decoration = kwargs.pop("decoration", None)
title = kwargs.pop("title", None)
print_msg = "{message}".format(message=log_record.getMessage())
# 【Point.20】ログメッセージの装飾はパラメーターで行う
if target_decoration:
if target_decoration == MLogger.DECORATION_BOX:
output_msg = self.create_box_message(print_msg, target_level, title)
elif target_decoration == MLogger.DECORATION_LINE:
output_msg = self.create_line_message(print_msg, target_level, title)
elif target_decoration == MLogger.DECORATION_IN_BOX:
output_msg = self.create_in_box_message(print_msg, target_level, title)
else:
output_msg = self.create_simple_message(print_msg, target_level, title)
else:
output_msg = self.create_simple_message(print_msg, target_level, title)
# 出力
try:
if self.is_file:
# ファイル出力ありの場合はレコードを再生成してコンソールとGUI両方出力
log_record = self.logger.makeRecord('name', target_level, "(unknown file)", 0, output_msg, None, None, self.module_name)
self.logger.handle(log_record)
else:
# 【Point.21】ロジックスレッドは、printとloggerで分けて出力
print(output_msg)
self.logger.handle(log_record)
except Exception as e:
raise e
def create_box_message(self, msg, level, title=None):
msg_block = []
msg_block.append("■■■■■■■■■■■■■■■■■")
if level == logging.CRITICAL:
msg_block.append("■ **CRITICAL** ")
if level == logging.ERROR:
msg_block.append("■ **ERROR** ")
if level == logging.WARNING:
msg_block.append("■ **WARNING** ")
if level <= logging.INFO and title:
msg_block.append("■ **{0}** ".format(title))
for msg_line in msg.split("\n"):
msg_block.append("■ {0}".format(msg_line))
msg_block.append("■■■■■■■■■■■■■■■■■")
return "\n".join(msg_block)
def create_line_message(self, msg, level, title=None):
msg_block = []
for msg_line in msg.split("\n"):
msg_block.append("■■ {0} --------------------".format(msg_line))
return "\n".join(msg_block)
def create_in_box_message(self, msg, level, title=None):
msg_block = []
for msg_line in msg.split("\n"):
msg_block.append("■ {0}".format(msg_line))
return "\n".join(msg_block)
def create_simple_message(self, msg, level, title=None):
msg_block = []
for msg_line in msg.split("\n"):
# msg_block.append("[{0}] {1}".format(logging.getLevelName(level)[0], msg_line))
msg_block.append(msg_line)
return "\n".join(msg_block)
@classmethod
def initialize(cls, level=logging.INFO, is_file=False):
# logging.basicConfig(level=level)
logging.basicConfig(level=level, format=cls.DEFAULT_FORMAT)
cls.total_level = level
cls.is_file = is_file
cls.outout_datetime = "{0:%Y%m%d_%H%M%S}".format(datetime.now())
Point.16:ロガーを自前実装する
多分、一番自分的に工夫があるのはロガー部分だと思います。
ログは一定のフォーマットに従って出力した方が、ユーザーにとっても分かりやすいのですが、それをひとつひとつ定義していくのはとても面倒です。
メッセージの囲みや罫線などのデコレーションをフラグひとつで行えるようにしています。
そして何より中断フラグの判定をロガーで行っています。(詳しくは後述)
Point.17:モジュール単位の出力最低レベルを定義出来るようにしておく
モジュール単位で出力最低レベルを定義しておけば、特にユーティリティメソッドのデバッグログを抑制することができます。
デバッグログを物理的に消してしまったり、コメントアウトするのは問題があったときに、確認に手間取ります。
モジュール単位最低レベルを上げ下げする事で、出力されるログレベルをコントロールできるので、デバッグのしやすさに繋がると思います。
Point.18:debugよりレベルの低いログメソッドを用意する
17とペアなのですが、レベルの低いメソッドを用意する事で、出力の抑制がよりしやすくなります。
Point.19:Exceptionを受け取った時には、スタックトレースを出力する
主にハンドリングしていない例外が発生した場合に役に立ちます。
また、ロガーは、GUIスレッドとロジックスレッドの両方で扱ってますので、ロジックスレッドの源流で出力を調整する、といった手間も不要です。
Point.20:ログメッセージの装飾はパラメーターで行う
コンソールコントロールを普通のテキストコントロールにしたため、分かりやすいようにメッセージのブロッキングを多用しています。
メッセージ量が可変であるため、一概に固定文字列を付与するわけにもいかなかったので、与えられたパラメーター別にブロッキングを呼び出しています。
呼び出し元でメソッドを指定する方法もあるかとは思いますが、意味合い的にもこちらの方が管理しやすいかな、と思います。
テキストの装飾を行う場合も、呼び出し元でバラバラにするよりは、このような形で一箇所で扱った方がコード量は少ないでしょう。
Point.21:出力処理時に、print
と logger.handle
を別々に実行する
コンソールコントロールに出力するためには、print
での出力が必要ですし、ストリームに出力するには、logger.handle
からの出力が必要です。
メッセージ自体はどちらも同じ情報を出力しており、ストリームへの出力には出力モジュールや出力時間等の情報を追加し、より追いかけやすくしています。
Point.22:現在実行しているスレッドに、中断FLG=ONが設定されていたら中断エラーとする
ここが一番悩んだところでした…
Pythonのスレッド的お作法として、以下のようなものがあります。
- 外側からスレッドを強制終了させない
- 中断パラメーターを保持し、内部で処理するごとにパラメーターを参照する
- 中断パラメーターがONになっている場合、内部から終了する
参考:https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread
内部で中断パラメーターを見るといっても、各ロジック処理ごとに見るのは嫌だし、パラメーターを持ち回すのも嫌だなぁ…
で、どのロジックも必ず通るのが、ロガーじゃない?で、ロガーで見れて、かつGUIスレッドから変更できるとしたら「今のスレッド」くらいかなぁ…
と言うことで、こんな形になりました。
よい方法かどうかは分からないのですが、ロジック処理の方では中断の有無を一切関知しない、という点で気に入ってます。
Point.23:GUI側で中断を指示された場合、スレッドの中断FLGをGUI以外すべてONにする
中断FLGを設定しているのは、BaseWorker
の関数デコレータ内です。
GUIスレッド以外の生きている全スレッドの中断FLGをONにすることで、どのスレッドを見ても中断が分かるようにしています。
これで、ログを出力しようとした段階で、エラーとなってGUIまで戻ってくる、というワケです。
正常終了ルート
中断終了ルート
3.2. VSCode からデバッグ実行する
せっかく環境をつくったので、VSCodeから実行できるようにもしてみましょう。
ワークスペースの Python Path
欄に、Anaconda > envs > 開発環境 > python.exe
のフルパスを指定します。
launch
で、exeの実行を指定します。
{
"folders": [
{
"path": "src"
}
],
"settings": {
"python.pythonPath": "C:\\Development\\Anaconda3\\envs\\pytest_env\\python.exe"
},
"launch": {
"version": "0.2.0",
"configurations": [
{
"name": "Python: debug",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/executor.py",
"console": "integratedTerminal",
"pythonPath": "${command:python.interpreterPath}",
"stopOnEntry": false,
"args": [
// "--verbose", "1", // 最小
// "--verbose", "2", // DEBUG_FULL
// "--verbose", "15", // FULL
"--verbose", "10", // TEST
// "--verbose", "20", // INFO
]
}
]
}
}
これでVSCodeからGUIを起動することができるようになりました。
3.3. exeを作成する
色々コードを組んできましたが、最終的にexeにしなくてはなりませんね。
というわけで、PythonExeを作成するバッチと設定ファイルをご紹介。
@echo off
rem ---
rem --- exeを生成
rem ---
rem --- カレントディレクトリを実行先に変更
cd /d %~dp0
cls
rem --- リリース環境に切り替えてから、pyinstaller実行
rem --- 終わったら開発環境に戻す
activate pytest_release && pyinstaller --clean pytest64.spec && activate pytest_env
# -*- coding: utf-8 -*-
# -*- mode: python -*-
# PythonExeサンプル 64bit版
block_cipher = None
a = Analysis(['src\\executor.py'],
pathex=[],
binaries=[],
datas=[],
# 隠蔽されたライブラリインポート
hiddenimports=['wx._adv', 'wx._html', 'pkg_resources.py2_warn'],
hookspath=[],
runtime_hooks=[],
# 除外するライブラリ
excludes=['mkl','libopenblas'],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
# アプリ名
name='PythonExeSample.exe',
# exeにする際のデバッグログ表示有無
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
# コンソールの表示有無
console=False )
Exeに固めたい場合には、バッチを実行するだけで、リリース環境に切り替えてから pyinstaller
を実行し、終わったら開発環境に戻します。
これでうっかりリリース環境に余計なライブラリを入れる心配もありません。
specファイルは、pyinstaller
の設定ファイルですが、コメント行が追加している設定となります。
hiddenimports
pyinstaller
は基本的にはコード内部で呼ばれているライブラリを自動的に同梱しますが、一部そのままでは入らないライブラリがあります。
それを明示的にインポートするのが、hiddenimports
です。
これを探すには、下部の debug=False
を True
に変えて、エラーとなっている箇所を探す、というとても地味な作業をするしかない…と思います。
excludes
逆に、一緒に同梱してしまうと、ファイルサイズが大きくなる等で除外したい場合は、 excludes
で指定します。
今回の場合、https://www.reddit.com/r/pygame/comments/aelypb/why_is_my_pyinstaller_executable_180_mb_large/ を参考にmkl
とlibopenblas
を除外しています。
出来上がったexeは約30Mです。(除外してもこのサイズ…
3.4. 追記したい内容
- アイコンについて
- 外部記憶(json)について
気力が持ち直したら追記するかもです。
VMDサイジングでここはどうやってるの?的な質問も歓迎します。
4. ソースコード
上記のコードは全て、https://github.com/miu200521358/PythonExeSample にあがっています。
興味を持っていただけましたら、ぜひForkして中身をご覧になってください。