前提
threading.Threadのバックエンドとしてpthreadが用いられている環境,要はPOSIX環境で動くCPythonを前提にしています。
JythonとかWindows上のCPythonとかは本記事の対象外です。
何をやってるのかわからない方は,わからないまま本記事のコードを利用するより,別の方法を探したほうがいいです。
実装
import threading
import ctypes
import errno
class CancelabelThread(threading.Thread):
def cancel(self,timeout=0.00001):
# thread IDを取得
tid=self.ident
# スレッドが未開始または終了済の場合は何もしない
if((tid is None) or (not self.is_alive())):
return False
# libpthread.soをロード(1.)
libpthread=ctypes.cdll.LoadLibrary('libpthread.so')
# pthread_cancel()でthreadへキャンセルの送信を試みる (2.)
ret=libpthread.pthread_cancel(ctypes.c_ulong(tid))
if(ret==0):
# 送信に成功したらtimeout秒待機 (2.1.)
self._tstate_lock.acquire(timeout=timeout)
# pthread_kill()でthreadの存在を確認 (2.2.)
ret=libpthread.pthread_kill(
ctypes.c_ulong(tid),
ctypes.c_int(0)
)
if(ret==errno.ESRCH):
# threadが見つからなくなったら死活監視用ロックを解除 (3.)
self._tstate_lock.release()
return True
else:
return False
if __name__=='__main__':
import time
def sleeper():
print('start sleeping')
time.sleep(60)
print('*') # 60秒後に'*'を表示
t=CancelabelThread(target=sleeper)
t.start()
time.sleep(1)
t.cancel() # => True; 1秒後にスレッドをキャンセルするので'*'は表示されない
t.join() # ブロックされない
print('!')
解説
以下,便宜上,pythonレベルのスレッドを「スレッド」,OSレベル(バックエンドのpthread)のスレッドを「thread」と表記します。
やっているのは,動的ライブラリlibpthread.so
を通じてpthread操作用APIを呼び出し,threadに対してキャンセルを送信後その存在が確認できなくなったらpythonレベルのスレッドの後始末をする,ということです。
-
libpthread.so
を読み込む -
libpthread.so
からpthread_cancel()
を呼び出し,IDがtidのthreadにキャンセルの送信を試みる。対象threadが存在しており送信に成功したら(戻り値が0だったら),以下を実行
2.1. timeout秒(デフォルト10usec)待機
2.2.pthread_kill()
でthreadにシグナル0を投げ,threadの存在を確認1 - threadの存在が確認できなかったらキャンセルに成功したとみなし,pythonレベルでのスレッドの死活監視に使われているロック
_tstate_lock
を解除
pthread_cancel()
, pthread_kill()
の詳細はmanしてください。
pthreadでthreadのキャンセル完了を知る唯一の方法はpthread_join()
の終了ステータスを見ることとされています。
しかし,pythonが生成するthreadはデタッチされており,pthread_join()
が常に失敗するため,この方法を用いることができません。
そこで本記事では,pthreadの実装依存になりますが,「デタッチされたthreadは動作停止の直後にOSに回収される(消滅する)」ことを期待2して,pthread_cancel()
やpthread_kill()
の戻り値がESRCH
(No such process) の場合に,「対象threadが消滅した」と判断してキャンセル成功の判定を行うようにしています。
ただ,threadのキャンセル処理は即時に行われるわけではなく,停止直後の回収も保証された動作ではないため,キャンセル送信からthreadの存在確認までの間には,ある程度の時間が必要となります。
ここではとりあえず,キャンセル送信からthreadの存在確認までに,指定された時間(デフォルトでは10u秒)待つようにしてみました。
スレッド内の処理内容や環境によって必要な待ち時間は変わってきますので,適時timeoutの値を調整してください。
また,待ち時間を指定するのはいささか不格好ですので,良い方法があれば是非コメントをください。
なお,本記事では余分なモジュールをロードしたくなかったので,時間待ちやシグナル処理をありもののオブジェクト,メソッドで代替しています。
time.sleep()
,signal.pthread_kill()
を用いても問題ありません。
ただしsignal.pthread_kill()
は,対象スレッドが見つからなかった場合,ESRCH
を返すのではなく,ProcessLookupError
例外を投げることに注意してください
制限
本記事の手法を用いても,以下のような,取り消しポイント(cancellation point)の設定できないスレッドはキャンセルすることができません。
def restlesser(i):
while(True):
i+=1
t=CancelabelThread(target=restlesser,args=(0,))
t.start()
t.cancel(1.) # => 1秒後にFalse
取り消しポイントについてはman pthreads
を参照してください。
I/O待ちが無くひたすら計算を繰り返すだけのようなスレッドは,キャンセルできない可能性が高いです。
CPythonのスレッドはもともとこのような処理に適していませんので,multiprocessingパッケージの使用など他の手段を検討してください。
どうしてもスレッドを用いる必要がある場合
どうしてもスレッドを用いる必要がある場合には,OSや言語,ライブラリが提供している 安全なリソース保護機能を損なうことを覚悟した上で ,次のように,スレッド内で実行するメソッド内でthreadのcancelability typeにPTHREAD_CANCEL_ASYNCHRONOUS
をセットしてみてください。
(この例はそのままでは動きません。)
# スレッド内で実行されるメソッドの **外で** ロードしておく
libpthread=ctypes.cdll.LoadLibrary('libpthread.so')
# pthread.h から適切な値を見つけ書き換えること
PTHREAD_CANCEL_ASYNCHRONOUS=x
def restlesser(i):
# cancelability typeをPTHREAD_CANCEL_ASYNCHRONOUSに設定
libpthread.pthread_setcanceltype(
ctypes.c_int(PTHREAD_CANCEL_ASYNCHRONOUS),
None
)
while(True):
i+=1
t=CancelabelThread(target=restlesser,args(0,))
t.start()
t.cancel(1.) # => True
繰り返しますが,この方法は 安全ではありません 。
スレッド内でリソースの確保を行っていないことに確信が持てる場合以外,この方法は 使用してはいけません 。
PTHREAD_CANCEL_ASYNCHRONOUS
をセットした場合に何が起きるかについてはman pthread_setcancelstae
に詳しく述べられています。
cancelability type を PTHREAD_CANCEL_ASYNCHRONOUS に設定して役に立つことはめったにない。スレッドはいつでもキャンセルすることができることになるので、スレッドが安全にリソースの確保 (例えば malloc(3) でメモリーを割り当てる) や mutex、セマフォ、ロックなどの獲得を行うことができない。アプリケーションは、スレッドがキャンセルされる際に、これらのリソースがどのような状態にあるかを知る術はないので、リソースの確保が安全ではなくなる。つまり、キャンセルが起こったのが、リソースの確保前なのか、確保中なのか、確保後なのかが分からない。さらに、関数呼び出しの最中にキャンセルが発生すると、いくつかの内部データ構造 (例えば、malloc(3) 関連の関数が管理している未使用ブロックのリンクリスト) が一貫性のない状態のままになってしまう可能性がある。その結果、クリーンアップハンドラーが役に立たないものになってしまう。
...(中略)...
非同期でのキャンセルが有効な数少ない状況としては、純粋に計算だけを行うループに入っているスレッドをキャンセルするといった場面がある。
-- JM Project訳: "Linux Programmer's Manual (3) PTHREAD_SETCANCELSTATE" 3
You shoot yourself in the foot.
-- Brad Templeton: "The Internet Joke Book" 4
Hey babe, take a walk on the wild side
-- Lou Reed: "Walk on the Wild Side"
蛇足
スレッドの中で動く(target=... で指定する)オブジェクトに手を加えることが可能な場合は,本記事のような強引なことをせず,そのオブジェクトが行う処理を外部からキャンセルできるように書き換えるべきでしょう。
たとえば「実装」節のコードの実行部(if __name__=='__main__':
以下)の場合,python3.1以降だとthreading.Event
を用いて以下のように書き換えることで,書き換え前とほぼ同等の処理を行うことができます。
import threading
import time
def sleeper(e):
print('start sleeping')
if(e.wait(60.)): # イベントがセットされるかタイムアウト(60秒)するのを待つ
return # イベントがセットされたら以降の処理をキャンセル
print('*') # タイムアウトした場合'*'を表示
e=threading.Event()
t=threading.Thread(target=sleeper,args=(e,))
t.start()
time.sleep(1)
e.set() # 1秒後にイベントをセットするので'*'は表示されない
t.join() # スレッドは終了しているのでブロックされない
print('!')
-
POSIX環境ではプロセスやthreadにシグナル0を投げることで対象に影響を与えず存在確認を行うことが可能 ↩
-
少なくともGNU pthreadでは期待通りの動作をします ↩
-
https://linuxjm.osdn.jp/html/LDP_man-pages/man3/pthread_setcancelstate.3.html#lbAK ↩
-
Annabooks (1995), ISBN-10:1573980250 ↩