外部から処理を止めたかった
とある、長い処理を行っていたのだが途中で止めたい事が発生しました。
問題は、処理の途中で止まると都合が悪い作りになっていました。
その為、停止タイミングをプロセス側で都合の良い時に停止できる仕組みを実装した時のメモとなります。
test.py
import time
from task import Task
def is_some_conditions(c):
"""何かの継続条件を判定"""
return c < 10
count = 0
with Task() as t:
while t.is_doing() and is_some_conditions(count):
# 長い処理A
print(f"\r{count} - doing A ", end='')
time.sleep(3)
# 長い処理B
print(f"\r{count} - doing B ", end='')
time.sleep(3)
print(f"\r{count} - finished ", end='')
count += 1
print(f"\nexit")
このコードは、処理Aと処理Bがセットで実行される事を想定している作りだと思ってください。
ここで問題になるのは、処理Aと処理Bの間で停止が発生すると困る場合です。
シグナル版は、停止シグナル(CTRL+C等)を受けるか終了条件を満たすと終了します。
ファイル版は、.task_doingファイルが削除されるか終了条件を満たすと終了します。
※シグナルを受けると停止するので注意してください。
withブロックを抜けると、通常通りの処理に戻ります。
シグナル版
task.py
import signal
class Task:
"""停止系シグナルを切りが良いところまで遅延させるクラス"""
def __init__(self):
self._is_doing = True
self.original_handlers = {}
self.stop_signals = [signal.SIGINT, signal.SIGTERM]
def _handler(self, signum, frame):
self._is_doing = False
def __enter__(self):
# 現在のハンドラーを保存し、カスタムハンドラーを設定
for sig in self.stop_signals:
self.original_handlers[sig] = signal.getsignal(sig)
signal.signal(sig, self._handler)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# 元のハンドラーに戻す
for sig in self.stop_signals:
signal.signal(sig, self.original_handlers[sig])
def is_doing(self):
"""タスクの実行中かどうか"""
return self._is_doing
def is_stopped(self):
"""タスクが停止された"""
return not self._is_doing
def end(self):
"""タスクを終了する"""
self._is_doing = False
ファイル版
task.py
import os
from pathlib import Path
class Task:
TASK_FILENAME = '.task_doing'
def __init__(self):
self.task_file = Path(self.TASK_FILENAME)
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end()
def start(self):
"""タスクを開始する"""
if self.task_file.exists():
raise FileExistsError("既にタスクを実行中です")
# タスク管理用のファイルを作成(空ファイルを作成)
self.task_file.touch()
def is_doing(self):
"""タスクの実行中かどうか"""
return self.task_file.exists()
def is_stopped(self):
"""タスクが停止された"""
return not self.is_doing()
def end(self):
"""タスクを終了する"""
if self.task_file.exists():
self.task_file.unlink()