概要
長時間かかる処理やバッチ処理を実行していると、「今どこまで進んでいるのか?」「残りはどれぐらい時間がかかるのか?」が気になりますよね。
心理学的にも、終わりが見えない作業やタスクは不安や心理的負担が大きくなることが知られています。そのため、多くの場合、プログレスバーで進捗を見せたり、ETA(終了予測時間)を表示したりするのが一般的です。
しかし、進捗表示には以下のような課題があります。
進捗速度の不均一性
たとえば、初期には高速に進んでも、後半で処理が急に重くなることもあります。逆に、一部の処理をスキップできる場合は一気に進むこともあるため、表示される速度にムラが出てしまいます。
ETAの信頼性
開始時刻からの経過時間だけで速度を算出し、単純にETAを計算すると、極端に長すぎたり短すぎたりする結果になりがちです。
今回のポイント
ということで今回はローパスフィルタを使ったプログレスバーを実装してみます。
進捗速度を平滑化し、より現実的なETA表示を目標とします。
ポイントとしては2つです。
- 各ステップ間での差分速度(微分)を算出
- 上記の差分速度はバラツキ(高周波)成分が多いので、ローパスフィルターによる平均化を行う。
- 上限・下限速度でクリップ(制限)する。
ローパスフィルタ(LPF)による平滑化
ローパスフィルタ(LPF)は信号処理の分野でよく使われる手法で、新しい観測値(今回は最新の進捗速度)と、これまでに平滑化した過去の値を一定の重み(α: alpha)で組み合わせることで、急激な変化を抑えながらなめらかな値に近づけることができます。
αが大きい:最新の値を強く反映し、急激な変化にも敏感に反応
αが小さい:過去の値を重視し、進捗速度がゆるやかに変化
なお、ローパスフィルタは厳密には平均ではなく平滑となります。
プログラム実装
今回はtimeライブラリのみで実装してみます。
import time
class ProgressTracker:
"""
処理の進捗を管理し、進行状況(進捗、処理速度、ETA)を表示するクラス。
"""
def __init__(self, total_num, alpha=0.1, min_speed=0.0, max_speed=9999999):
"""
コンストラクタ
:param total_num: 処理する総数
:param alpha: LPFの平滑化係数(0~1)
:param min_speed: 速度の最小値(リミット)
:param max_speed: 速度の最大値(リミット)
alpha(平滑化係数):
alpha の範囲は 0~1。
alpha が 大きい → 最新の値(limited_speed)を重視する。
alpha が 小さい → 過去の値(smoothed_speed)を重視する。
"""
self.total_num = total_num
self.progress_count = 0
self.start_time = time.time()
self.previous_time = self.start_time # 前回の更新時刻
self.smoothed_speed = None # LPFで平滑化された速度
self.alpha = alpha # LPFの平滑化係数
self.min_speed = min_speed # 速度の最小値
self.max_speed = max_speed # 速度の最大値
self.eta_seconds = 0 # ETA 残り時間_秒
self.diff_time = 0 # 差分時間_秒
self.total_time = 0 # かかった時間_秒
def update_progress(self):
"""
進捗を1つ更新し、現在の進捗、速度、ETAを表示します。
"""
self.progress_count += 1
self.display_progress()
def display_progress(self):
"""
現在の進捗、速度、ETAを計算して表示します。
"""
current_time = time.time()
self.elapsed_time = current_time - self.start_time # 開始してからの経過時間
self.diff_time = current_time - self.previous_time # 前回の更新からの経過時間
self.previous_time = current_time # 現在の時間を記録
# 差分で速度を計算
current_speed = 1 / self.diff_time if self.diff_time > 0 else 0 # 処理1件あたりの速度
# 現在の速度にリミットを適用してからLPFに渡す
limited_speed = max(self.min_speed, min(current_speed, self.max_speed))
if self.smoothed_speed is None:
self.smoothed_speed = limited_speed
else:
# LPFを使用して速度を平滑化
self.smoothed_speed = self.alpha * limited_speed + (1 - self.alpha) * self.smoothed_speed
# 残りの処理数とETA(予測完了時間)
remaining = self.total_num - self.progress_count
self.eta_seconds = remaining / self.smoothed_speed if self.smoothed_speed > 0 else -1
formatted_eta = self.format_time(self.eta_seconds)
# 進捗率と速度のフォーマット
progress_percentage = (self.progress_count / self.total_num) * 100
formatted_speed = f"{self.smoothed_speed:.2f} ops/sec"
formatted_raw_speed = f"{current_speed:.2f} ops/sec"
#プログレスバー
bar_length = 40
filled_length = int(bar_length * self.progress_count / self.total_num)
bar = "█" * filled_length + "-" * (bar_length - filled_length)
# プログレスを1行ですべて表示
print(
f"\rProgress: {self.progress_count}/{self.total_num} ({progress_percentage:.1f}%) | "
f"speed: {formatted_raw_speed} | Avg speed: {formatted_speed} | ETA: {formatted_eta} ",
f"[{bar}]",
end='',
flush=True
)
if self.progress_count >= self.total_num:
# 処理完了後にトータル時間を表示
print(f"\n処理完了! トータル時間: {self.total_elapsed_time()}")
@staticmethod
def format_time(seconds):
"""
秒を時:分:秒の形式にフォーマットします。
:param seconds: 秒数
:return: フォーマットされた時間
"""
if seconds < 0:
return "N/A"
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def total_elapsed_time(self):
"""
処理開始から現在までの経過時間を計算しフォーマットする。
"""
self.total_time = time.time() - self.start_time
return self.format_time(self.total_time)
# サンプル使用例
if __name__ == "__main__":
total_operations = 100 # 処理の総数
alpha = 0.2
tracker = ProgressTracker(total_operations, alpha=alpha, min_speed=0.0, max_speed=6) # LPFと速度リミットの設定
# サンプル処理(処理ごとに時間がかかる処理を模倣)
for i in range(total_operations):
if i > 20:
time.sleep(0.1 + (i * 0.02)) # 処理時間が徐々に増加
else:
time.sleep(0.001) # 序盤は速い
tracker.update_progress()
比較
全100stepで、~20stepまで、および50,60,70stepを高速にスキップ処理しています。
逆に55stepを長時間処理としました。
これを速度ムラの再現(しようと)しています。
緑が実際の経過時間、青が表示された推定ETAです。
なお、速度のクリップはすべて以下で設定しています。
min_speed = 0
max_speed = 10
1.従来タイプ
開始時刻から速度算出する従来タイプです。
数値的には安定しているものの、結構乖離が大きいですね。
2.差分(今回手法) alpha=1.0
今回の方法ですが、まずはalpha=1.0です。1.0を指定した場合、LPFは無効化されます。(純粋な差分速度)
速度ムラの影響をもろに受けていますね...。
ただ、従来方法よりも緑線に近づいています。
3.差分+LPF(今回手法) alpha=0.5
ローパスフィルター(LPF)により、かなり速度ムラが収まりました。
4.差分+LPF(今回手法) alpha=0.3
よりローパスフィルターを強くしたものです。かなり良いのではないでしょうか。
5.差分+LPF(今回手法) alpha=0.1
ローパスフィルターのalpha=0.1としてみましたが、
全体的に平滑が効いてきたものの、逆に実際の経過時間(緑)との乖離が大きくなりました。
おわりに
今回、比較的シンプルになローパスフィルタによる進捗表示を行ってみました。
しかしいくつかの問題を含みます。
次回記事では問題点とその解決を目指します。
それではまた。