結論
threading.Thread
を継承したカスタムスレッドクラスを用意し、強制終了する関数を用意させておきます。あとは、join
関数を使い、タイムアウトするまでの時間を待機しておくことで対象の関数にタイムアウトを実装できます。
はじめに
Pythonで、対象にしたいある関数が実行を開始してから一定時間経過するとタイムアウトすることができないか検討していました。
そのようなライブラリが確かにあります。ですが、私の環境だとライブラリを導入してもエラーが出続けて結局使用するのを諦めました。ということで、自前で実装できないか試した結果を記事として残します。
方針
対象となる関数を別のスレッドで実行させておきます。メインスレッドでは、別スレッドで実行された対象の関数が完了するまで待機しておきます。もし、メインスレッドが待機中に一定時間を経過するとスレッドの処理を強制終了させます。下記の図が参考になります。
一定時間経過前に対象の関数が終了したケース
メインスレッド | 別スレッド |
---|---|
開始 | |
対象の関数を実行する別スレッドを作成 | |
別スレッドをスタート | 対象の関数を実行開始 |
別スレッドが終了するまでい一定時間待機 | 対象の関数を実行中 |
別スレッドが終了したため待機終了 | 対象の関数の実行終了 |
別スレッドが生きていないことを確認 | |
終了 |
一定時間経過でタイムアウトさせるケース
メインスレッド | 別スレッド |
---|---|
開始 | |
対象の関数を実行する別スレッドを作成 | |
別スレッドをスタート | 対象の関数を実行開始 |
別スレッドが終了するまで一定時間待機 | 対象の関数を実行中 |
一定時間が経過したため、待機終了 | 対象の関数を実行中 |
別スレッドが生きているため、別スレッドを強制終了 | 対象の関数の実行終了 |
終了 |
強制終了について
threading.Thread
には直接強制終了する手段を提供していません。そこで、threading.Thread
を継承したカスタムクラスを用意することで強制終了させる関数を実装します。詳しくは以前に投稿した私の記事が参考になります。
サンプルコード
import threading
import ctypes
import time
# カスタムスレッドクラス
import threading
import ctypes
import time
# カスタムスレッドクラス
class twe(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
threading.Thread.__init__(self, group=group, target=target, name=name)
self.args = args
self.kwargs = kwargs
return
def run(self):
self._target(*self.args, **self.kwargs)
def get_id(self):
if hasattr(self, '_thread_id'):
return self._thread_id
for id, thread in threading._active.items():
if thread is self:
return id
# 強制終了させる関数
def raise_exception(self):
thread_id = self.get_id()
resu = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(SystemExit))
if resu > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), 0)
print('Failure in raising exception')
# 対象となる関数
def sample_func(a, b, c, hoge1 = None, hoge2 = None, hoge3 = None):
try:
while True:
print((a, b, c, hoge1, hoge2, hoge3))
time.sleep(1)
finally:
print('ended')
def main():
# nameは任意
# targetは対象となる関数
# args, kwargsには関数に渡す引数
x = twe(name = 'Thread A', target=sample_func, args=(1, 2, 3), kwargs={'hoge1': 'hogehoge1', 'hoge2': 'hogehoge2', 'hoge3': 'hogehoge3'})
# 対象の関数を別スレッドで実行する
x.start()
# スレッドが終了するか、10秒間経過(タイムアウト)するか、いずれかまで待機
x.join(10)
# スレッドが終了していない場合、強制終了処理を実行
if x.is_alive(): x.raise_exception()
# 終了しているのでもう待機しないはず
x.join()
# スレッドが生きていないことを確認
print(x.is_alive())
if __name__ == '__main__':
main()
実行
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
(1, 2, 3, 'hogehoge1', 'hogehoge2', 'hogehoge3')
ended
False
確かに、無限ループの関数でも10秒後にタイムアウトして、別スレッドが強制終了していることが確認できます。
以上です。
参考