概要
前回記事でローパスフィルターを使用した進捗予測を行いました。
しかし実際にはいくつか問題があります。
不均一な更新時間
前回のローパスフィルター計算では、均一な更新間隔を前提としたパラメータalphaを設定しました。
更新間隔を考慮しない式なので、更新回数が多いものほど強く影響します。
実際には処理時間が不均一なので、処理時間が短いものほど更新回数が多く強く影響し、処理時間が長いものほど更新回数が少なくなるので、弱い影響力となります。
早い話が、処理時間の長いものは反映されずらく、処理時間が短く見積もられがちということです。
解決策
処理時間に応じて重み(という表現が正しいかわかりませんが)を変える必要があります。
係数alphaを処理時間に応じて変えてみましょう。
alphaの換算
処理時間の間隔dtに応じて、alphaの値を変形(換算)してあげましょう。
\alpha(dt) = 1 - \left(1 - \alpha_{\mathrm{static}}\right)^{\,dt}
pythonで計算すると
alpha_dt = 1 - (1 - alpha_static) ** dt
となります。
ローパスフィルターの計算式
y[k] = \alpha\, x[k] + (1-\alpha)\, y[k-1]
pythonで計算すると
smoothed_speed = alpha * speed + (1 - alpha) * smoothed_speed
ですね。これは前回記事と同じになります。
さきほどのalpha換算式を組み合わせると、以下のようになります。
ここでalpha_staticが元のalpha値、alphaが換算後のalpha値です。
alpha = 1 - (1 - alpha_static) ** dt
smoothed_speed = alpha * speed + (1 - alpha) * smoothed_speed
時定数
ついでにalphaだと感覚的に設定ができないので、今回から時定数τを導入してみます。
alphaからの換算は以下の式です。
\tau = -\frac{\Delta t}{\ln(1-\alpha)}
pythonで計算すると
tau = -delta_t / math.log(1 - alpha)
ですね。
今回は時定数τを秒単位で扱いたいので、delta_t=1
とします。
下図は時定数τ=1
でのグラフです。
あるステップ入力を与えたとき、1秒後に63.2%に到達します。
さらにその2.3倍の時間(=2.3秒)で約90%に到達します。
時定数を30秒に設定すれば、90%に到達するのは69秒後になります。
これはalpha=0.032に相当します。
時定数のほうが直感的でわかりやすいですね。
話をまとめると、
「時定数はalphaと相互に換算可能なパラメータで、時間単位で指定できるもの」
と考えてください。
時定数でのローパスフィルターの計算式
さきほどの時定数→alpha換算と、処理時間dtに応じた換算を一気に計算します。
\alpha(dt) = 1 - \exp\left(-\frac{dt}{\tau_{\mathrm{static}}}\right)
pythonだとこうですね。
alpha = 1 - math.exp(-dt / tau_static)
tau_staticは時定数になります。
alpha = 1 - math.exp(-dt / tau_static)
smoothed_speed = alpha * speed + (1 - alpha) * smoothed_speed
あるいはワンライナーで以下のようにも書けます。
smoothed_speed = (1 - math.exp(-dt/tau)) * speed + math.exp(-dt/tau) * smoothed_speed
プログラム実装
import time
import math
class ProgressTracker:
"""
処理の進捗を管理し、進行状況(進捗、処理速度、ETA)を表示するクラス。
"""
def __init__(self, total_num, tau=5):
"""
コンストラクタ
:param total_num: 処理する総数
:param tau: LPFの時定数 秒
alpha(平滑化係数):
alpha が 大きい → 最新の値(limited_speed)を重視する。
alpha が 小さい → 過去の値(smoothed_speed)を重視する。
"""
self.total_num = total_num
self.progress_count = 0
self.start_time = time.time()
self.previous_time = self.start_time # 前回の更新時刻
self.smoothed_speed = None # LPFで平滑化された速度
self.current_speed = 0
self.average_speed = 0
self.task_speeds = [] # 各タスクの処理速度
self.weights = [] # 各タスク処理時間
self.eta_seconds = 0 # ETA 残り時間_秒
self.diff_time = 0 # 差分時間_秒
self.total_time = 0 # かかった時間_秒
self.last_value = 0.0
self.tau = tau #時定数_秒
def update_smooth_value(self,dt,speed):
"""
時間差 dt と時定数 τ を使って新たなフィルタ値を計算する。
:param dt: 最終計測時刻と現在時刻の差(秒)
:param speed: 現在の速度などの値
:return: 更新されたフィルタ値
"""
alpha = 1 - math.exp(-dt / self.tau)
new_value = alpha * speed + (1 - alpha) * self.last_value
self.last_value = new_value
return new_value
def update_progress(self):
"""
進捗を1つ更新し、現在の進捗、速度、ETAを表示します。
"""
self.progress_count += 1
current_time = time.time()
self.elapsed_time = current_time - self.start_time # 開始してからの経過時間
self.diff_time = current_time - self.previous_time # 前回の更新からの経過時間
self.previous_time = current_time # 現在の時間を記録
# 差分で速度を計算
self.current_speed = 1 / self.diff_time if self.diff_time > 0 else 0 # 処理1件あたりの速度
# LPFを使用して速度を平滑化
if self.smoothed_speed is None:
self.smoothed_speed = self.current_speed
else:
self.smoothed_speed = self.update_smooth_value(self.diff_time,self.current_speed)
self.task_speeds.append(self.diff_time)
self.weights.append(self.smoothed_speed)
self.average_speed = self.smoothed_speed
# 残りの処理数とETA(予測完了時間)
remaining = self.total_num - self.progress_count
self.eta_seconds = remaining / self.average_speed if self.average_speed > 0 else -1
self.display_progress()
def display_progress(self):
"""
現在の進捗、速度、ETAを計算して表示します。
"""
formatted_eta = self.format_time(self.eta_seconds)
# 進捗率と速度のフォーマット
progress_percentage = (self.progress_count / self.total_num) * 100
formatted_speed = f"{self.average_speed:.2f} ops/sec"
formatted_raw_speed = f"{self.current_speed:.2f} ops/sec"
#プログレスバー
bar_length = 40
filled_length = int(bar_length * self.progress_count / self.total_num)
bar = "█" * filled_length + "-" * (bar_length - filled_length)
# プログレスを1行ですべて表示
print(
f"\rProgress: {self.progress_count}/{self.total_num} ({progress_percentage:.1f}%) | "
f"speed: {formatted_raw_speed} | Avg speed: {formatted_speed} | ETA: {formatted_eta} ",
f"[{bar}]",
end='',
flush=True
)
if self.progress_count >= self.total_num:
# 処理完了後にトータル時間を表示
print(f"\n処理完了! トータル時間: {self.total_elapsed_time()}")
@staticmethod
def format_time(seconds):
"""
秒を時:分:秒の形式にフォーマットします。
:param seconds: 秒数
:return: フォーマットされた時間
"""
if seconds < 0:
return "N/A"
hours, remainder = divmod(int(seconds), 3600)
minutes, seconds = divmod(remainder, 60)
return f"{hours:02}:{minutes:02}:{seconds:02}"
def total_elapsed_time(self):
"""
処理開始から現在までの経過時間を計算しフォーマットする。
"""
self.total_time = time.time() - self.start_time
return self.format_time(self.total_time)
# サンプル使用例
if __name__ == "__main__":
total_operations = 100 # 処理の総数
tracker = ProgressTracker(total_operations, tau=3) # LPFと速度リミットの設定
# サンプル処理(処理ごとに時間がかかる処理を模倣)
for i in range(total_operations):
if i > 20:
time.sleep(0.1 + (i * 0.02)) # 処理時間が徐々に増加
else:
time.sleep(0.001) # 序盤は速い
tracker.update_progress()
比較
前回記事と比較してみます。
緑が実際の経過時間、青が表示された推定ETAです。
1.差分+LPF(前回手法) alpha=0.3
2.差分+LPF(今回手法) 時定数2.8秒(alpha=0.3相当)
あまり変わったようには見えませんが、60step,70stepの高速処理の影響がほとんど出ていませんね。
また最初の20stepまではフィルタの立ち上がりの遅さが原因で、非常に長い時間が表示されています。
おわりに
数時間、数千stepの処理を行う場合は時定数tauを100秒などの極端な値にしたほうが予測が安定すると思います。
しかしあまりtauを大きくしすぎると、予測の立ち上がりが遅くなります。
全体の10%を処理するまでは時定数を1/10にする。みたいに可変させても良さそうですね。
これでひとまずはローパスフィルターを使った進捗表示の解説は終了です。
それではまた。