タイムアウトを付けるコンテキストマネージャがあるときっと便利だろうな、記事ネタにもなるし。そんな安易な考えから、これは駄目、あれも駄目、と試行錯誤した話を紹介します。結論だけ知りたい方は 通知イベント編 を見てください。
コンテキストマネージャとは
簡単に言うと、withブロックで使うオブジェクトのこと。withブロックに入るときはコンテキストマネージャのメソッド__enter__が実行され、withブロックから抜けるときはメソッド__exit__が実行されます。こうすることで特定のコードブロックの前処理と後処理を定義できます。
ちゃんとした説明は以下参照してください。3. データモデル — Python 3.7.4 ドキュメント
コンテキストマネージャを自作する場合、メソッド__enter__、__exit__が定義されたクラスを用意する必要があり少々面倒です。しかし、contextlib.contextmanagerというデコレーターを使うと簡単にコンテキストマネージャを作ることができます。試しに1つ作ってみます。
import os
import contextlib
@contextlib.contextmanager
def pushd(path):
olddir = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(olddir)
print(os.getcwd())
with pushd('../utils'):
print(os.getcwd())
print(os.getcwd())
pushdは、withブロックに入ると作業ディレクトリをpathに移動して、withブロックから抜けると元ディレクトリに戻るコンテキストマネージャです。contextlib.contextmanagerでコンテキストマネージャを作る際の注意点を1つ挙げると、yieldはtry-finally節で囲む必要があります。これは、withブロック中でExceptionが発生した場合でも、後処理を実行できるようにするためです。
実行結果は以下になります。(私の作業ディレクトリが丸見えですね...)
D:\機械学習\コード\Jacomb\reusable_python_utils\samples
D:\機械学習\コード\Jacomb\reusable_python_utils\utils
D:\機械学習\コード\Jacomb\reusable_python_utils\samples
タイムアウトのコンテキストマネージャを作ってみる
あるプロジェクトでタイムアウトが必要な処理が複数あるとします。タイムアウト処理自体を共通化できるのであればそれに越したことはないので、コンテキストマネージャで頑張って実装してみましょう。
SIGALRM編
SIGALRMとは、タイマー割り込みを行うシグナルです。シグナルとは、他プロセスに送れる信号のことで、非同期で特定処理(大抵の場合は割り込み終了)を行わせることができます。SIGALRMを自プロセスに送って割り込みを行うことでコードブロックのタイマーを実現します。コードを書くと以下のようになります。
import errno
import contextlib
import signal
class TimeoutException(IOError):
errno = errno.EINTR
@contextlib.contextmanager
def time_limit_with_sigalrm(timeout_secs):
orig_sialrm_handler = signal.getsignal(signal.SIGALRM)
def raise_excetion(signum, frame):
raise TimeoutException
signal.signal(signal.SIGALRM, raise_excetion)
signal.alarm(timeout_secs)
try:
yield
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, orig_sialrm_handler)
まず、signal.getsignalで元のシグナルハンドラを取得します。続いて、signal.signalでSIGALRM受信時にTimeoutExceptionを発生させるようにします。signal.alarmでtimeout_secs秒後にSIGALRMを発生させるようにします。withブロック実行後、signal.alarm(0)でSIGALRMが発生しないようにし、元のシグナルハンドラに戻します。
このコンテキストマネージャを実際に使ってみます。
import time
from utils.logging import logput as _logput
from utils.logging import logging_function_decorator as _logging_function_decorator
from utils.logging import DEBUG, init_logging, LOPT_NO_ARGUMENTS, LOPT_NO_RETUEN_VALUES
from utils.canceler import time_limit_with_sigalrm, TimeoutException
MYLOGGER = 'sample.time_limit'
def logput(msg, **kargs):
return _logput(msg, logger=MYLOGGER, wrapper_depth=1, **kargs)
def logging_function_decorator(**kargs):
# 引数と返値は今回確認したい情報ではないので出力しない
return _logging_function_decorator(logger=MYLOGGER, logput_options=LOPT_NO_ARGUMENTS|LOPT_NO_RETUEN_VALUES, **kargs)
@logging_function_decorator(level=DEBUG)
def main1():
"""タイムアウトしなかった場合"""
with time_limit_with_sigalrm(3):
try:
time.sleep(1)
except TimeoutException:
logput('timeout')
@logging_function_decorator(level=DEBUG)
def main2():
"""タイムアウトした場合"""
with time_limit_with_sigalrm(3):
try:
time.sleep(10)
except TimeoutException:
logput('timeout')
if __name__ == '__main__':
init_logging()
main1()
main2()
デコレータlogging_function_decoratorは関数の開始と終了をログに記録する関数です。ログ出力関係の説明は省略しますが、気になる方は先日公開した記事 【python】loggingでプログラムの動きを可視化する を見てください。
withブロックは3秒でタイムアウトするようにしています。タイムアウトするとTimeoutExceptionが発生してexcept節を通過し"timeout"とログ出力されます。関数main1はタイムアウトしなかった場合を、main2はタイムアウトした場合を確認します。このコードを実行すると、ログ出力は以下のようになりました。
|2019-09-12 22:53:33,424|sample.time_limit|DEBUG|main1 >> start.|
|2019-09-12 22:53:34,425|sample.time_limit|DEBUG|main1 >> end.|
|2019-09-12 22:53:34,426|sample.time_limit|DEBUG|main2 >> start.|
|2019-09-12 22:53:37,427|sample.time_limit|DEBUG|main2 >> timeout|
|2019-09-12 22:53:37,427|sample.time_limit|DEBUG|main2 >> end.|
main1はタイムアウトが発生しないで終了しており、main2は3秒後にタイムアウトしている様子が分かります。うん、意図した通りに動いてますね。ですが、実は次の問題があります。
- SIGALRMが利用できる環境はUNIXのみ。(上記ログ出力はUbuntuで実行したときのものです。)
- シグナルはメインスレッドで実行されるので、マルチスレッドなプログラムでは使うことができない。
限定的な環境ならば問題ないかもしれませんが、もう少しクロスプラットフォームな実装に挑戦してみます。
スレッド編
スレッドならばUNIX以外の環境でも実装されているので、クロスプラットフォームになるかも。そういう発想のもと作成したコンテキストマネージャが以下です。
import errno
import contextlib
import ctypes
import threading
class TimeoutException(IOError):
errno = errno.EINTR
@contextlib.contextmanager
def time_limit_with_thread(timeout_secs):
thread_id = ctypes.c_long(threading.get_ident())
def raise_exception():
modified_thread_state_nums = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(TimeoutException))
if modified_thread_state_nums == 0:
raise ValueError('Invalid thread id. thread_id:{}'.format(thread_id))
elif modified_thread_state_nums > 1:
# 通常このパスを通ることはないが、念のため保留中のExceptionをクリアしておく
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
raise SystemError('PyThreadState_SetAsyncExc failure.')
timer = threading.Timer(timeout_secs, raise_exception)
timer.setDaemon(True)
timer.start()
try:
yield
finally:
timer.cancel()
timer.join()
まず、現スレッドIDを取得します。続いて、timeout_secs秒後に関数raise_exceptionを実行するサブスレッドtimerを作成します。関数raise_exceptionは現スレッドに対してTimeExceptionを発生させる関数です。実はthreadingモジュールにはスレッドの割り込みを行う機能は用意されていません。
引用元: threading --- スレッドベースの並列処理 — Python 3.7.4
現状では、優先度 (priority)やスレッドグループがなく、スレッドの破壊 (destroy)、中断 (stop)、一時停止 (suspend)、復帰 (resume)、割り込み (interrupt) は行えません。
しかし、pythonapiにあるPyThreadState_SetAsyncExcを使うと、スレッドの割り込みができます。詳細は 初期化 (initialization)、終了処理 (finalization)、スレッド — Python 3.7.4 ドキュメント を参照してください。これで現スレッドに対してTimeExceptionを送ることができます。また、メインスレッドが終了したらこのサブスレッドも終了するようにtimer.setDaemon(True)とします。最後にサブスレッドを実行します。withブロック実行後、timer.cancel()でサブスレッドを止めて、timer.join()でサブスレッド終了まで待ちます。
では実際にこのコンテキストマネージャを実行してみます。 SIGALRM編 で実装したサンプルコードのうち、withブロックのコンテキストマネージャをtime_limit_with_threadに変えるだけです。
@logging_function_decorator(level=DEBUG)
def main1():
"""タイムアウトしなかった場合"""
with time_limit_with_thread(3):
try:
time.sleep(1)
except TimeoutException:
logput('timeout')
@logging_function_decorator(level=DEBUG)
def main2():
"""タイムアウトした場合"""
with time_limit_with_thread(3):
try:
time.sleep(10)
except TimeoutException:
logput('timeout')
今度はWindowsで実行します。ログ出力は以下となりました。
|2019-09-13 19:06:51,350|sample.time_limit|DEBUG|main1 >> start.|
|2019-09-13 19:06:52,352|sample.time_limit|DEBUG|main1 >> end.|
|2019-09-13 19:06:52,352|sample.time_limit|DEBUG|main2 >> start.|
|2019-09-13 19:07:02,354|sample.time_limit|DEBUG|main2 >> timeout|
|2019-09-13 19:07:02,354|sample.time_limit|DEBUG|main2 >> end.|
うん、意図した通りに動いて……ないですね(泣)
関数main2が3秒後ではなく10秒後にタイムアウトしています。この理由は分かっていないのですが、恐らくPyThreadState_SetAsyncExcはpythonのコードを実行中に割り込むのではなく、コードの合間に割り込むからと予想しています。(あるいはsleepシステムコール中は割り込みができないか…) 試しにsleepをfor文で分割してみます。
@logging_function_decorator(level=DEBUG)
def main3():
"""タイムアウトした場合 (sleepを分割)"""
with time_limit_with_thread(3):
try:
for _ in range(10):
time.sleep(1)
except TimeoutException:
logput('timeout')
関数main3のログ出力は以下です。
|2019-09-13 00:21:28,933|sample.time_limit|DEBUG|main3 >> start.|
|2019-09-13 00:21:31,939|sample.time_limit|DEBUG|main3 >> timeout|
|2019-09-13 00:21:31,939|sample.time_limit|DEBUG|main3 >> end.|
今度は3秒後にタイムアウトが発生しているので、さっきの予想はたぶん合ってると思います。とりあえず、これについてはここまでで妥協することにします。
これでSIGALRM編で挙がった問題は解決できました。しかし、実は 致命的な欠点 があります。それは withブロック内で定義されている後処理にまでタイムアウトが割り込む可能性がある ということです。コード例を挙げます。
@logging_function_decorator(level=DEBUG)
def main4():
with time_limit_with_thread(3):
try:
time.sleep(1)
except TimeoutException:
logput('timeout.')
finally:
# 後処理
logput('post process start.')
time.sleep(5)
logput('post process end.')
関数main4はfinally節で後処理が定義されていますが、withブロックにより3秒でタイムアウトします。後処理には5秒かかる想定だとすると、このコードのログ出力は以下の通りになります。
|2019-09-13 18:00:39,240|sample.time_limit|DEBUG|main4 >> start.|
|2019-09-13 18:00:40,242|sample.time_limit|DEBUG|main4 >> post process start.|
|2019-09-13 18:00:45,242|sample.time_limit|DEBUG|main4 >> error happened.|
Traceback (most recent call last):
File "D:\機械学習\コード\Jacomb\reusable_python_utils\utils\logging.py", line 156, in logging_function
rv = func(*args, **kargs)
File "D:\機械学習\コード\Jacomb\reusable_python_utils\samples\time_limit_with_thread.py", line 57, in main4
time.sleep(5)
utils.canceler.TimeoutException
後処理が完了する前にTimeExceptionが発生してしまいました。
上の例に関してだけ言えば、finally節をwithブロックの外で実行してしまえば良いですが、一番の問題は、pythonの組み込みモジュールや自作モジュールなどで後処理をfinally節で定義している場合は数多くあり、それらをこのwithブロック中で呼び出しても実行されない場合があることにあります。つまり、後処理として期待した処理が実行されない場合がある、という潜在的バグを潜ませる危険性があるということです。
通知イベント編
スレッド編 で確認した問題の原因は、タイムアウトしたらTimeoutExceptionを発生させて強制的に現処理を終了させたことです。従って、タイムアウトしたらTimeoutExceptionを発生させるのではなく通知イベントを発行し、終了しても良いタイミングで通知イベントを参照して現処理を終了させると良い、ということになります。こう書くと、なんか極当たり前のところに帰結した感じがします。
@contextlib.contextmanager
def time_limit(timeout_secs):
timeout_event = threading.Event()
timer = threading.Timer(timeout_secs, lambda : timeout_event.set())
timer.setDaemon(True)
timer.start()
try:
yield timeout_event
finally:
timer.cancel()
timer.join()
このコンテキストマネージャは、タイムアウト発生をtimeout_eventで通知します。withブロック中の任意の場所でtimeout_eventを参照しセットされていれ現処理を終了させます。例えば次のような使い方を想定しています。
timeout_secs, interval_secs = 3, 0.01
with time_limit(timeout_secs) as timeout_event:
while not timeout_event.wait(interval_secs):
# ここに本処理
pass
else:
# ここにタイムアウト処理
pass
スレッド編 で起きた問題が解決しているか確認します。
import time
from utils.logging import logput as _logput
from utils.logging import logging_function_decorator as _logging_function_decorator
from utils.logging import DEBUG, init_logging, LOPT_NO_RETUEN_VALUES
from utils.canceler import time_limit
MYLOGGER = 'sample.time_limit'
def logput(msg, **kargs):
return _logput(msg, logger=MYLOGGER, wrapper_depth=1, **kargs)
def logging_function_decorator(**kargs):
# 返値は今回確認したい情報ではないので出力しない
return _logging_function_decorator(logger=MYLOGGER, logput_options=LOPT_NO_RETUEN_VALUES, **kargs)
@logging_function_decorator(level=DEBUG)
def time_limit_example(timeout_secs=3, real_process_time_secs=0, post_process_time_secs=0, repeat_nums=1):
with time_limit(timeout_secs) as timeout_event:
i = 0
while not timeout_event.wait(0):
try:
# 以下が本処理の想定
time.sleep(real_process_time_secs)
finally:
# 以下が後処理の想定
if post_process_time_secs > 0:
logput('post process start.')
time.sleep(post_process_time_secs)
logput('post process end.')
# 一定回数繰り返したら終了
i += 1
if i > repeat_nums:
break
else:
# ここにタイムアウト処理
logput('timeout.')
if __name__ == '__main__':
init_logging()
# タイムアウトしない場合
time_limit_example(timeout_secs=3, real_process_time_secs=1, post_process_time_secs=0, repeat_nums=1)
# タイムアウトする場合
time_limit_example(timeout_secs=3, real_process_time_secs=1, post_process_time_secs=0, repeat_nums=10)
# 後処理実行中にタイムアウトが発生した場合
time_limit_example(timeout_secs=3, real_process_time_secs=1, post_process_time_secs=5, repeat_nums=10)
- タイムアウトしない場合
- タイムアウトする場合
- 後処理実行中にタイムアウトが発生した場合
に分けてコンテキストマネージャtime_limitを使ってみます。このコードのログ出力は以下になります。
|2019-09-13 17:47:31,316|sample.time_limit|DEBUG|time_limit_example >> start. args:(), kargs:{'timeout_secs': 3, 'real_process_time_secs': 1, 'post_process_time_secs': 0, 'repeat_nums': 1}|
|2019-09-13 17:47:33,317|sample.time_limit|DEBUG|time_limit_example >> end.|
|2019-09-13 17:47:33,317|sample.time_limit|DEBUG|time_limit_example >> start. args:(), kargs:{'timeout_secs': 3, 'real_process_time_secs': 1, 'post_process_time_secs': 0, 'repeat_nums': 10}|
|2019-09-13 17:47:36,321|sample.time_limit|DEBUG|time_limit_example >> timeout.|
|2019-09-13 17:47:36,321|sample.time_limit|DEBUG|time_limit_example >> end.|
|2019-09-13 17:47:36,322|sample.time_limit|DEBUG|time_limit_example >> start. args:(), kargs:{'timeout_secs': 3, 'real_process_time_secs': 1, 'post_process_time_secs': 5, 'repeat_nums': 10}|
|2019-09-13 17:47:37,324|sample.time_limit|DEBUG|time_limit_example >> post process start.|
|2019-09-13 17:47:42,324|sample.time_limit|DEBUG|time_limit_example >> post process end.|
|2019-09-13 17:47:42,324|sample.time_limit|DEBUG|time_limit_example >> timeout.|
|2019-09-13 17:47:42,324|sample.time_limit|DEBUG|time_limit_example >> end.|
それぞれちゃんと動作しているようです。3番目の「後処理実行中にタイムアウトが発生した場合」ですが、今度はちゃんと後処理が完了するようになりました。
まとめ
強制的にタイムアウト処理を発生させることは、期待した後処理を実行させない場合がある、という潜在的バグを潜ませる危険性があるため、コードブロックにタイムアウトを付けるコンテキストマネージャとしての私のベストプラクティスは、通知イベント編 で紹介した方法となりました。
長かったのにここまで読んでくれてありがとう!!より良い解決方法がある、まだ問題点がある、という方はコメントで教えて頂けると幸いです。
今回作成したコードは私の GitHub レポジトリの utils/canceler.py にまとめられています。
https://github.com/Jacomb/reusable_python_utils