0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

QtPy で重たい関数を動かしたときプログレスバーを表示する (任意の関数で実装)

Last updated at Posted at 2023-10-27

アプリケーション概要

  1. QtPy を用いて GUI アプリを作成します。

  2. GUI 上で「重たい関数」を実行すると GUI がフリーズする問題があるため,
    「重たい関数」は別スレッドで動かします。

  3. 関数実行中に GUI が沈黙したままですと Bad UX なので,
    プログレスバーを表示して 0-100% まで適宜インクリメントします。

  4. 関数は任意の引数を設定でき, 任意の戻り値を取得できるようにします。

  5. 関数は GUI と独立しており途中経過を関数から GUI に進捗を伝えることができないとします (関数から「今 x %」のような Signal を送るのではなく, ただ実行されるだけです)。
    そのため, GUI上であらかじめ予測終了時刻を設定し, 適切な経過時間でプログレスバーを更新します。

  6. 関数があらかじめ設定された終了時間になっても終わっていない場合, プログレスバー 99% を維持します。
    一方, 予測終了時間より早く終わった場合はそこでプログレスバーの表示も終了します。

  7. 予測終了時間は直近 3 回の実行時間の平均とします。
    この予測時間は関数とその引数で識別され計算されます。

上のような条件を満たしたクラスを作成します。

終了時刻計算

関数の終了時間は直近 3 回分を保持し, その平均を算出します。
また時間は key (関数名と引数で決定される文字列) ごとに管理されます。

これを Queue を値とする辞書で管理しています。

  • 保持する数を QUEUE_LEN で設定できます。

  • _set_time関数はQueueに値を追記します。
    Queue が QUEUE_LEN より多くなるとき, 値をひとつ pop out します。

  • update_time で値を追加します。

  • init_time では新しい key を追加します。

  • get_time では Queue 内の平均値を取得します。

このクラスはグローバル変数で宣言します (QUEUE を保持するため)。

class PredictionTime:
    """
    A class to keep track of prediction time for different functions.
    """
    QUEUE_LEN = 3

    def __init__(self, dict_: Optional[dict[str, deque]] = None):
        """
        Initialize the PredictionTime instance.

        Parameters
        ----------
        dict_ : Optional[dict[str, deque]], optional
            A dictionary to initialize the times, by default None
        """
        if dict_ is None:
            self.times = {}
        else:
            self.times = dict_

    def _set_time(self, key: str, end_time: float):
        """
        Set the time for a given key.

        Parameters
        ----------
        key : str
            The key identifier for the function.
        end_time : float
            The end time for the function execution.
        """
        if key in self.times:
            if len(self.times[key]) >= self.QUEUE_LEN:
                self.times[key].popleft()
            self.times[key].append(end_time)
        else:
            self.times[key] = deque([end_time])

    def update_time(self, key: str, end_time: float):
        """
        Update the time for a given key.

        Parameters
        ----------
        key : str
            The key identifier for the function.
        end_time : float
            The end time for the function execution.
        """
        self._set_time(key=key, end_time=end_time)

    def init_time(self, key: str, end_time: float):
        """
        Initialize the time for a given key if it doesn't exist.

        Parameters
        ----------
        key : str
            The key identifier for the function.
        end_time : float
            The end time for the function execution.
        """
        if key not in self.times:
            self._set_time(key=key, end_time=end_time)

    def get_time(self, key: str) -> float:
        """
        Get the average time for a given key.

        Parameters
        ----------
        key : str
            The key identifier for the function.

        Returns
        -------
        float
            The average time for the function execution.
        """
        time_queue = self.times.get(key, [])
        return sum(time_queue) / len(time_queue) if time_queue else 0


prediction_time = PredictionTime()

終了時刻に対しパーセンテージを設定するタイマー

関数とGUIが独立しているため, 終了時刻に対しプログレスバーのパーセンテージを算出
適宜設定する必要があります。

そのためのクラスを定義しています。

ここでは QTimer を用いて更新タイミングを制御しています。

start, increment 関数でおよそ終了時刻のおよそ 1/100 = 1% ずつ, i をインクリメントしています。
当初はこの値でパーセンテージを更新する予定でしたが, 処理のラグなどで実際の 1% と開きがあるため get_percentage 関数で算出しています。
(なので QTimer はおおまかな更新タイミングを計測しています)

100% の時刻を超えても表示は 99 を維持します。
finished 関数が呼ばれたとき, Timer は停止し, 表示も 100% となります。

class FunctionTimer(QWidget):
    """
    A class to keep track of the progress of a running function.
    """

    progress_changed = Signal(int)

    def __init__(self, parent: Optional[QWidget] = None):
        """
        Initialize the FunctionTimer instance.

        Parameters
        ----------
        parent : Optional[QWidget], optional
            The parent widget, by default None
        """
        super().__init__(parent)

        self.timer = QTimer()

        self.timer.timeout.connect(self.increment)

    def set_timer(self, end_time: float):
        """Set the time for the function execution and reset the timer.

        Parameters
        ----------
        end_time : float
            The time for the function execution.
        """
        self.end_time = end_time
        self.millisec = int(self.end_time*10)  # the millisec of 1 percent progress
                                               # self.end_time/100*1000

        self.i = 0
        self.start_time = time.time()
        self.finish_flag = False

    def start(self):
        """
        Start the timer.
        """
        self.timer.start(self.millisec)

    def increment(self):
        """
        Increment the progress and emit the progress_changed signal.
        """
        if not self.finish_flag:
            self.i += 1
            if self.i < 100:
                self.timer.start(self.millisec)
                self.progress_changed.emit(self.get_percentage())

    def get_percentage(self, max_per: int = 99) -> float:
        """
        Calculate the percentage of progress.

        Parameters
        ----------
        max_per : int, optional
            The maximum percentage value, by default 99

        Returns
        -------
        float
            The percentage of progress.
        """
        percentage = int((time.time() - self.start_time)/self.end_time*100)
        return min(percentage, max_per)

    def finish(self):
        """
        Finish the progress and emit the progress_changed signal with 100.
        """
        self.progress_changed.emit(100)
        self.finish_flag = True
        self.timer.stop()

関数を別スレッドで実行する Worker

GUI を停止させないため, 関数は別スレッドで実行します。
また, 任意の関数, 任意の引数で実行でき, 任意の戻り値を取得できます。
エラーのハンドルも行います。

Closure (後述) を用いて関数と引数をまとめて渡します。

try-except-else-finally を用いることで

  • 正常終了:result_signal
  • 異常終了:error_signal
  • どちらでも関数の終了タイミング:finished_signal
    を発します。
class FunctionWorker(QThread):
    """
    A worker thread to execute a function.
    """

    result_signal = Signal(object)
    error_signal = Signal(object)
    finished_signal = Signal()

    def __init__(self, parent: Optional[QWidget] = None):
        """
        Initialize the FunctionWorker instance.

        Parameters
        ----------
        parent : Optional[QWidget], optional
            The parent object, by default None
        """
        super().__init__(parent)

    def set_closure(self, closure: Closure[[], R]):
        """Set the executed closure.

        Parameters
        ----------
        closure : Closure[[], R]
            The closure function to be executed in the worker thread.
        """
        self.closure = closure

    def run(self):
        """
        Run the closure function and emit the result or error signal.

        If the closure function raises an exception, the error signal is emitted.
        Otherwise, the result signal is emitted with the result values.
        """
        try:
            r = self.closure()
        except Exception as e:
            self.error_signal.emit((e, traceback.format_exc()))
        else:
            self.result_signal.emit(r)
        finally:
            self.finished_signal.emit()

プログレスバーの表示とWorker・Timer・終了時刻を管理するクラス

Closure クラス

実行関数する関数の型

make_closure 関数で作成する。

RunFunctionProgressBar

  • make_closure
    関数と引数をまとめたラッパー関数を作成する。
    このとき, args と kwargs とうメソッドを追加する。

  • __init__

    • title, parent, offset_pos:GUI の設定用。Window タイトル, 親の Widget (Font と Window 位置が継承される), Window を開く位置 (parent からの相対座標, parent がないと無効)
  • _init_ui, _init_func_thread, _init_timer
    それぞれ初期化関数

    • プログレスバー表示画面の初期化
    • 関数実行用 thread の初期化
    • タイマーの初期化
  • set_closure
    self.key_name = (closure.__name__ + repr(closure.args) + repr(closure.kwargs))
    という文字列を用意しこれを PredictionTime のキーとする

    • closure:make_closureで作られたラッパー関数
    • init_end_time:その関数と引数のペアが最初に実行されたとき, この時間に合わせてプログレスバーが更新される。2 回目以降は必要ない。
  • run
    プログレスバーのWindowsの表示して, thread, timer の実行を開始する。

  • _finished, _result, _error
    それぞれ関数終了時の Signal を受け取る

    • _finished:関数が終了したとき (正常・異常問わず) 実行され,
      今回の関数の実行時間を Queue に追加, timer の停止, Window を閉じたり
      finishe_signal を発したりする。
    • _result:関数が正常終了したとき実行され, 結果を保存する。
    • _error:関数が異常終了したとき実行され, 結果やエラー内容を保存し, error_signal を発する。
  • 変数 result_values, error_status
    関数終了時に生成される変数であり result_values で戻り値を取得できる。
    関数が正常終了したとき error_status = None であるため, error_signal を用いずに判定することも可能である。
    異常終了したとき result_values = None となるが, 戻り値のない関数も None となるためこれで判断はできない。

    • result_values;関数の戻り値
    • error_status:(Exception のクラス, traceback の文字列) のタプル
  • closeEvent
    関数が終了する前に Window が閉じられた場合, そこで関数の実行がキャンセルされる。
    (キャンセルボタンとかで応用できるかも)

class RunFunctionProgressBar(QWidget):
    """
    A class to display a progress bar for a running function.
    """

    finish_signal = Signal()
    error_signal = Signal()

    def __init__(self,
               title: Optional[str] = None,
               parent: Optional[QWidget] = None,
               offset_pos: Optional[tuple[int, int]] = None,
               ):
        """
        Initialize the RunFunctionWorker instance.

        Parameters
        ----------
        parent : Optional[QWidget], optional
            The parent widget for the progress bar window, by default None
        """
        super().__init__()

        self.setWindowFlags(Qt.Window | Qt.WindowSystemMenuHint
                            | Qt.WindowTitleHint | Qt.WindowCloseButtonHint)

        if parent is not None:
            self.setFont(parent.font())
            if offset_pos is None:
                offset_pos = (150, 0)
            self.move(
                parent.geometry().x() + offset_pos[0],
                parent.geometry().y() + offset_pos[1]
            )

        self._init_ui(title=title)
        self._init_func_thread()

        self.function_timer = FunctionTimer(parent=self)
        self.function_timer.progress_changed.connect(self._update_progressbar)

        self.function_name = None

    def _init_ui(self, title: Optional[str]):
        """
        Initialize the user interface.

        Parameters
        ----------
        title : Optional[str]
            The title of the progress bar window.
        """
        self.setWindowTitle(
            "Progress Bar" if title is None else title)
        self.v_layout = QVBoxLayout()
        self.setLayout(self.v_layout)

        self.progress_bar = QProgressBar(self)
        self.progress_bar.setRange(0, 100)
        self.progress_bar.setValue(0)

        self.v_layout.addWidget(self.progress_bar)

    def _init_func_thread(self):
        """
        Initialize the function worker thread.
        """
        self.func_thread = FunctionWorker(parent=self)

        self.func_thread.finished_signal.connect(self._finished)
        self.func_thread.result_signal.connect(self._result)
        self.func_thread.error_signal.connect(self._error)

    def set_closure(self, closure: Closure[[], R], init_end_time: float):
        """
        Set the closure function and initialize the time for the function.

        Parameters
        ----------
        closure : Closure[[], R]
            The closure function to be executed.
        init_end_time : float
            The initial end time for the function execution.
        """
        self.func_thread.set_closure(closure=closure)

        self.function_name = closure.__name__
        self.key_name = (self.function_name
                         + repr(closure.args) + repr(closure.kwargs) + repr(closure.option))
        print(self.key_name)

        prediction_time.init_time(
            key=self.key_name, end_time=init_end_time if init_end_time > 0 else 0.1)

    def _reset_timer(self):
        """
        Reset the function timer based on predicted time.
        """
        self.predicted_time = prediction_time.get_time(key=self.key_name)

        print(f'Get predicted_time: {self.predicted_time}')

        self.function_timer.set_timer(end_time=self.predicted_time)

    def run(self):
        """
        Start the execution of the closure function.

        If the function thread is already running, a message is printed.
        """
        if self.func_thread.isRunning():
            print("Working now")
        else:
            self.show()

            self.working_flag = True

            self._reset_timer()
            self.start_time = time.time()
            self.func_thread.start()
            self.function_timer.start()

    def _finished(self):
        """
        Handle the finishing of the function execution.

        This method is called when the function execution finished.
        It emits the finished_signal and updates the prediction time.
        """
        print('Finished!')
        self.finish_signal.emit()
        p_time = time.time() - self.start_time

        prediction_time.update_time(key=self.key_name, end_time=p_time)

        print('Take time:', p_time)

        self.function_timer.finish()
        self.close()

    def _result(self, values: object):
        """
        Handle the result of the function execution.

        Parameters
        ----------
        values : object
            The result values of the function execution.
        """
        print('Get result!')
        self.result_values = values
        self.error_status = None

    def _error(self, err: tuple[Exception, str]):
        """
        Handle the error during the function execution.

        Parameters
        ----------
        err : tuple[Exception, str]
            The exception raised during the function execution.
        """
        print('Error')
        self.error_status = err
        self.result_values = None
        self.error_signal.emit()

    def _update_progressbar(self, i: int):
        """
        Update the progress bar value.

        Parameters
        ----------
        i : int
            The progress bar value.
        """
        self.progress_bar.setValue(i)

    def closeEvent(self, event: QCloseEvent):
        """"
        Handle the closing of the progress bar window.

        If the function thread is running, it terminates the thread.
        """
        if self.func_thread.isRunning():
            self.result_values = None
            self.error_status =\
                (WindowClosed, f"Closed the window during the {self.function_name} working")
            self.func_thread.terminate()
            self.finish_signal.emit()
            self.error_signal.emit()
        return super().closeEvent(event)

    @staticmethod
    def make_closure(func: Callable[P, R], *args: P.args, **kwargs: P.kwargs) -> Closure[[], R]:
        """
        Create a closure function with arguments.

        Parameters
        ----------
        func : Callable
            The original function to be executed.
        *args
            The positional arguments for the function.
        **kwargs
            The keyword arguments for the function.

        Returns
        -------
        Closure[[], R]
            The closure function with arguments.
        """
        @wraps(func)
        def _func():
            return func(*args, **kwargs)

        _func.args = args
        _func.kwargs = kwargs
        _func.option = None

        return _func

テスト用の関数

引数で指定した時間sleepする関数, 途中でエラーが発生する関数, listの長さの時間sleepする関数を用意


def heavy_function(t: int) -> int:
    """
    The heavy function for test.
    """
    for i in range(t):
        time.sleep(1)
        print("Count: ", i+1)

    return t*10

def error_function(t: int) -> int:
    """
    The heavy function for test.
    """
    for i in range(t):
        time.sleep(1)
        print("Count: ", i+1)

        if i == 5:
            raise ValueError("Five!!")

    return t*10

def list_function(l : list[float]) -> float:
    sum = 0
    for f in l:
        for _ in range(10):
            time.sleep(0.1)
        sum += f
        print('Sum: ', f)
    return sum

実行用 Window

3 種類のボタンを用意

  1. heavy_function(10)
    10 秒カウントする
  2. heavy_function(t=5)
    5 秒カウントする
  3. error_function(10)
    10 秒カウントしようとするが 5 秒目に ValueError
  4. スピンボックスを用意し, その長さのsleepが実行される。引数をそのままPredictionTimeのキーにすると, リストが同じでもリストの中身が異なると、
    正確な実行時間がわからないため, args, kwargs を使わずに, option というメソッドを利用している
# Alias for RunFunctionProgressBar
RFPB = RunFunctionProgressBar


class MainWindow(QMainWindow):
    """
    The main window class of the application.
    """
    def __init__(self, parent: QWidget | None = None):
        super().__init__(parent)

        self.setWindowTitle("Main Window")
        self.setGeometry(300, 300, 400, 100)

        self.central_widget = QWidget(self)
        self.setCentralWidget(self.central_widget)

        self.v_layout = QVBoxLayout()
        self.central_widget.setLayout(self.v_layout)

        self.object_dict: dict[int, RFPB] = {}

        self.start_button1 = QPushButton("Start 1")
        self.progress_bar_window1 = self.init_progress_bar(0, self.start_button1)
        self.start_button1.clicked.connect(
            partial(self.show_progress_bar, self.progress_bar_window1,
                    closure=RFPB.make_closure(heavy_function, 10),
                    init_time=7, button=self.start_button1))
        self.v_layout.addWidget(self.start_button1)

        self.start_button2 = QPushButton("Start 2")
        self.progress_bar_window2 = self.init_progress_bar(1, self.start_button2)
        self.start_button2.clicked.connect(
            partial(self.show_progress_bar, self.progress_bar_window2,
                    closure=RFPB.make_closure(heavy_function, t=5),
                    init_time=7, button=self.start_button2))
        self.v_layout.addWidget(self.start_button2)

        self.start_button3 = QPushButton("Start 3")
        self.progress_bar_window3 = self.init_progress_bar(2, self.start_button3)
        self.start_button3.clicked.connect(
            partial(self.show_progress_bar, self.progress_bar_window3,
                    closure=RFPB.make_closure(error_function, 10),
                    init_time=7, button=self.start_button3))
        self.v_layout.addWidget(self.start_button3)

        self.h_layout = QHBoxLayout()
        self.start_button4 = QPushButton("Start 4")
        self.spin_box = QSpinBox(self)
        self.progress_bar_window4 = self.init_progress_bar(4, self.start_button4)
        self.start_button4.clicked.connect(
            partial(self.show_progressbar_w_spinbox, self.spin_box,
                    progress_bar=self.progress_bar_window4, button=self.start_button4))
        self.h_layout.addWidget(self.start_button4)
        self.h_layout.addWidget(self.spin_box)
        self.v_layout.addLayout(self.h_layout)

    def finished(self, window: RFPB, button: QPushButton):
        """
        Handle the finishing of function.
        """
        print("Returned Values: ", window.result_values)
        err = window.error_status
        if err is not None:
            print("Raise Error: \n" + err[1])
        button.setEnabled(True)

    def init_progress_bar(self, number: int, button: QPushButton) -> RFPB:
        progress_bar = RFPB(parent=self, offset_pos=(400, number*100))
        progress_bar.finish_signal.connect(partial(self.finished, progress_bar, button))
        self.object_dict[number] = progress_bar
        return progress_bar

    def show_progress_bar(self, progress_bar: RFPB, closure: Callable, init_time: float,
                          button: QPushButton, title: Optional[str] = None):
        """
        Show the progress bar window
        """
        button.setEnabled(False)
        progress_bar.set_closure(closure=closure, init_end_time=init_time)
        progress_bar.setWindowTitle(closure.__name__ if title is None else title)
        progress_bar.run()

    def show_progressbar_w_spinbox(self, spin_box: QSpinBox, *args, **kwargs):
        v = spin_box.value()
        closure = RFPB.make_closure(
            list_function, [random.random() for _ in range(v)])
        closure.args = None
        closure.kwargs = None
        closure.option = v
        title = f'List len = {v}'
        self.show_progress_bar(closure=closure, title=title, init_time=v, *args, **kwargs)

    def closeEvent(self, event: QCloseEvent):
        print(self.object_dict)
        for w in self.object_dict.values():
            w.close()
        return super().closeEvent(event)

if __name__=="__main__":
    app = QApplication([])
    window = MainWindow()
    window.show()
    app.exec_()

コード全景

記事で説明した順番などとは少し変わっています.

更新履歴

初稿: 2023-10-28
修正: 2023-11-06 コードの内容を修正

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?