LoginSignup
0
0

More than 1 year has passed since last update.

Python3でThreadをキャンセル(中断)できるようにする

Last updated at Posted at 2022-04-07

前提

threading.Threadのバックエンドとしてpthreadが用いられている環境,要はPOSIX環境で動くCPythonを前提にしています。
JythonとかWindows上のCPythonとかは本記事の対象外です。

何をやってるのかわからない方は,わからないまま本記事のコードを利用するより,別の方法を探したほうがいいです。

実装

import threading
import ctypes
import errno

class CancelabelThread(threading.Thread):
  def cancel(self,timeout=0.00001):
    # thread IDを取得
    tid=self.ident

    # スレッドが未開始または終了済の場合は何もしない
    if((tid is None) or (not self.is_alive())):
      return False
    
    # libpthread.soをロード(1.)
    libpthread=ctypes.cdll.LoadLibrary('libpthread.so')
    
    # pthread_cancel()でthreadへキャンセルの送信を試みる (2.)
    ret=libpthread.pthread_cancel(ctypes.c_ulong(tid))
    if(ret==0):
      # 送信に成功したらtimeout秒待機 (2.1.)
      self._tstate_lock.acquire(timeout=timeout)
      # pthread_kill()でthreadの存在を確認 (2.2.)
      ret=libpthread.pthread_kill(
           ctypes.c_ulong(tid),
           ctypes.c_int(0)
      )
    
    if(ret==errno.ESRCH):
      # threadが見つからなくなったら死活監視用ロックを解除 (3.)
      self._tstate_lock.release()
      return True
    else:
      return False

if __name__=='__main__':
  import time
  
  def sleeper():
    print('start sleeping')
    time.sleep(60)
    print('*')  # 60秒後に'*'を表示
  
  t=CancelabelThread(target=sleeper)
  t.start()
  time.sleep(1)
  t.cancel()    # => True; 1秒後にスレッドをキャンセルするので'*'は表示されない
  t.join()      # ブロックされない
  print('!')

解説

以下,便宜上,pythonレベルのスレッドを「スレッド」,OSレベル(バックエンドのpthread)のスレッドを「thread」と表記します。

やっているのは,動的ライブラリlibpthread.soを通じてpthread操作用APIを呼び出し,threadに対してキャンセルを送信後その存在が確認できなくなったらpythonレベルのスレッドの後始末をする,ということです。

  1. libpthread.soを読み込む
  2. libpthread.soからpthread_cancel()を呼び出し,IDがtidのthreadにキャンセルの送信を試みる。対象threadが存在しており送信に成功したら(戻り値が0だったら),以下を実行
    2.1. timeout秒(デフォルト10usec)待機
    2.2. pthread_kill()でthreadにシグナル0を投げ,threadの存在を確認1
  3. threadの存在が確認できなかったらキャンセルに成功したとみなし,pythonレベルでのスレッドの死活監視に使われているロック_tstate_lockを解除

pthread_cancel(), pthread_kill()の詳細はmanしてください。

pthreadでthreadのキャンセル完了を知る唯一の方法はpthread_join()の終了ステータスを見ることとされています。
しかし,pythonが生成するthreadはデタッチされており,pthread_join()が常に失敗するため,この方法を用いることができません。

そこで本記事では,pthreadの実装依存になりますが,「デタッチされたthreadは動作停止の直後にOSに回収される(消滅する)」ことを期待2して,pthread_cancel()pthread_kill()の戻り値がESRCH (No such process) の場合に,「対象threadが消滅した」と判断してキャンセル成功の判定を行うようにしています。

ただ,threadのキャンセル処理は即時に行われるわけではなく,停止直後の回収も保証された動作ではないため,キャンセル送信からthreadの存在確認までの間には,ある程度の時間が必要となります。

ここではとりあえず,キャンセル送信からthreadの存在確認までに,指定された時間(デフォルトでは10u秒)待つようにしてみました。
スレッド内の処理内容や環境によって必要な待ち時間は変わってきますので,適時timeoutの値を調整してください。
また,待ち時間を指定するのはいささか不格好ですので,良い方法があれば是非コメントをください。

なお,本記事では余分なモジュールをロードしたくなかったので,時間待ちやシグナル処理をありもののオブジェクト,メソッドで代替しています。
time.sleep()signal.pthread_kill()を用いても問題ありません。
ただしsignal.pthread_kill()は,対象スレッドが見つからなかった場合,ESRCHを返すのではなく,ProcessLookupError例外を投げることに注意してください

制限

本記事の手法を用いても,以下のような,取り消しポイント(cancellation point)の設定できないスレッドはキャンセルすることができません。

def restlesser(i):
  while(True):
    i+=1

t=CancelabelThread(target=restlesser,args=(0,))
t.start()
t.cancel(1.) # => 1秒後にFalse

取り消しポイントについてはman pthreadsを参照してください。

I/O待ちが無くひたすら計算を繰り返すだけのようなスレッドは,キャンセルできない可能性が高いです。
CPythonのスレッドはもともとこのような処理に適していませんので,multiprocessingパッケージの使用など他の手段を検討してください。

どうしてもスレッドを用いる必要がある場合

どうしてもスレッドを用いる必要がある場合には,OSや言語,ライブラリが提供している 安全なリソース保護機能を損なうことを覚悟した上で ,次のように,スレッド内で実行するメソッド内でthreadのcancelability typeにPTHREAD_CANCEL_ASYNCHRONOUSをセットしてみてください。
(この例はそのままでは動きません。)

# スレッド内で実行されるメソッドの **外で** ロードしておく
libpthread=ctypes.cdll.LoadLibrary('libpthread.so')

# pthread.h から適切な値を見つけ書き換えること
PTHREAD_CANCEL_ASYNCHRONOUS=x

def restlesser(i):  
  # cancelability typeをPTHREAD_CANCEL_ASYNCHRONOUSに設定
  libpthread.pthread_setcanceltype(
    ctypes.c_int(PTHREAD_CANCEL_ASYNCHRONOUS),
    None
  )
  
  while(True):
    i+=1

t=CancelabelThread(target=restlesser,args(0,))
t.start()
t.cancel(1.) # => True

繰り返しますが,この方法は 安全ではありません
スレッド内でリソースの確保を行っていないことに確信が持てる場合以外,この方法は 使用してはいけません

PTHREAD_CANCEL_ASYNCHRONOUSをセットした場合に何が起きるかについてはman pthread_setcancelstaeに詳しく述べられています。

cancelability type を PTHREAD_CANCEL_ASYNCHRONOUS に設定して役に立つことはめったにない。スレッドはいつでもキャンセルすることができることになるので、スレッドが安全にリソースの確保 (例えば malloc(3) でメモリーを割り当てる) や mutex、セマフォ、ロックなどの獲得を行うことができない。アプリケーションは、スレッドがキャンセルされる際に、これらのリソースがどのような状態にあるかを知る術はないので、リソースの確保が安全ではなくなる。つまり、キャンセルが起こったのが、リソースの確保前なのか、確保中なのか、確保後なのかが分からない。さらに、関数呼び出しの最中にキャンセルが発生すると、いくつかの内部データ構造 (例えば、malloc(3) 関連の関数が管理している未使用ブロックのリンクリスト) が一貫性のない状態のままになってしまう可能性がある。その結果、クリーンアップハンドラーが役に立たないものになってしまう。

...(中略)...

非同期でのキャンセルが有効な数少ない状況としては、純粋に計算だけを行うループに入っているスレッドをキャンセルするといった場面がある。

-- JM Project訳: "Linux Programmer's Manual (3) PTHREAD_SETCANCELSTATE" 3

You shoot yourself in the foot.

-- Brad Templeton: "The Internet Joke Book" 4

Hey babe, take a walk on the wild side

-- Lou Reed: "Walk on the Wild Side"

蛇足

スレッドの中で動く(target=... で指定する)オブジェクトに手を加えることが可能な場合は,本記事のような強引なことをせず,そのオブジェクトが行う処理を外部からキャンセルできるように書き換えるべきでしょう。

たとえば「実装」節のコードの実行部(if __name__=='__main__': 以下)の場合,python3.1以降だとthreading.Eventを用いて以下のように書き換えることで,書き換え前とほぼ同等の処理を行うことができます。

import threading
import time

def sleeper(e):
  print('start sleeping')
  if(e.wait(60.)): # イベントがセットされるかタイムアウト(60秒)するのを待つ
    return # イベントがセットされたら以降の処理をキャンセル
  
  print('*') # タイムアウトした場合'*'を表示

e=threading.Event()
t=threading.Thread(target=sleeper,args=(e,))
t.start()
time.sleep(1)
e.set() # 1秒後にイベントをセットするので'*'は表示されない
t.join() # スレッドは終了しているのでブロックされない
print('!')
  1. POSIX環境ではプロセスやthreadにシグナル0を投げることで対象に影響を与えず存在確認を行うことが可能

  2. 少なくともGNU pthreadでは期待通りの動作をします

  3. https://linuxjm.osdn.jp/html/LDP_man-pages/man3/pthread_setcancelstate.3.html#lbAK

  4. Annabooks (1995), ISBN-10:1573980250

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0