Python の threading パッケージを使いやすくするライブラリ(commonthread)を使ってみました。
https://pypi.org/project/commonthread/
使ってみましたというか、作ってみましたの方が正確です。
インストール方法
commonthreadパッケージのインストール方法
pip install -U commonthread
インストールされるクラス
- class CommonThreadLogger
- class CommonThread(threading.Thread)
- class WorkerThread(CommonThread)
最初のサンプル(01.py)
01.py
from commonthread import *
# source https://techacademy.jp/magazine/28155
def fibonacci_worker(th: WorkerThread, n: int):
if n <= 2:
return 1
else:
return fibonacci_worker(th, n - 2) + fibonacci_worker(th, n - 1)
thread = WorkerThread(fibonacci_worker, 36)
thread.name = 'tfib'
thread.start()
success = thread.join(timeout=0.5)
print('success={}'.format(success))
success = thread.join()
print('success={}'.format(success))
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
01.py実行結果
success=False
success=True
result=14930352
elapsed=3.688599109649658
WorkerThread(name=tfib, result=14930352, elapsed=3.688599109649658, args=(36,), kwargs={}, params={})
解説(01.py)
- このパッケージに含まれる CommonThread クラスを継承した WorkerThread クラスから紹介します。
thread = WorkerThread(fibonacci_worker, 36)
とコードにあるように、WorkerThread のコンストラクタの第一引数はワーカー関数の名前となります。第二引数以降はワーカー関数に渡される引数となります。ワーカー関数の引数の数とマッチしている限りいくつでも渡せます。 - ワーカー関数の第一引数は、
def fibonacci_worker(th: WorkerThread, n: int)
とコードにあるように WorkerThread オブジェクトを取ります。この引数は受け取る必要はありますが、使わなくてもOKです。 -
thread.join()
した後に、thread.result
とthread.elapsed
を print しています。ワーカー関数の fibonacci_worker が返した値が WorkerThread オブジェクトの result メンバに自動的に格納されます。また、ワーカー関数の fibonacci_worker の実行にかかった経過時間が elapsed メンバに格納されています。 - CommonThread およびその子クラスは
print(thread)
のように印字すると name, result, elapsed 等が表示されるようになっています。(デバッグ用途)
ワーカー関数から戻り値を返したいことはたまにあるので、そのような用途向けに最適化されています。
クラスベースのスレッドのサンプル(02.py)
02.py
from commonthread import *
# source https://techacademy.jp/magazine/28155
class FibonacciThread(CommonThread):
def entry(self, n: int):
if n <= 2:
return 1
else:
return self.entry(n - 2) + self.entry(n - 1)
thread = FibonacciThread(36)
thread.name = 'tfib'
thread.start()
thread.join()
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
02.py実行結果
result=14930352
elapsed=4.399198055267334
FibonacciThread(name=tfib, result=14930352, elapsed=4.399198055267334, args=(36,), kwargs={}, params={})
解説(02.py)
-
class FibonacciThread(CommonThread)
とあるように、(ワーカー関数ベースでなく)クラスベースのスレッドを定義するには、CommonThread クラスを継承します。 - スレッドの入口は
run(self)
ではなくentry(self...)
となります。このフィボナッチ数を計算するスレッドでは整数の引数を一つだけとりますので、def entry(self, n: int)
となってます。CommonThread を継承したクラスのコンストラクタをいじることなく、entry メソッドに任意の数の引数を持たせることができます。簡単な処理であれば、わざわざコンストラクタの定義を作るまでもありませんので重宝します。 - それ以外は 01.py とほぼ同じです。
コンストラクを定義したクラスベースのスレッドのサンプル(03.py)
03.py
from commonthread import *
class AddThread(CommonThread):
def __init__(self, x: int, y: int):
CommonThread.__init__(self)
self.x = x
self.y = y
def entry(self):
time.sleep(2.0)
return self.x + self.y
thread = AddThread(11, 22)
thread.start()
thread.join()
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
03.py実行結果
result=33
elapsed=2.0015878677368164
AddThread(name=Thread-1, result=33, elapsed=2.0015878677368164, args=(), kwargs={}, params={})
解説(03.py)
-
class AddThread(CommonThread)
の場合はdef __init__(self, x: int, y: int)
というようにコンストラクタを定義して x と y という引数を受け付けることを指示しています。このように CommonThread の子クラスにおいて__init__
を定義する際には、その最初の実行行としてCommonThread.__init__(self)
を記述しなければなりません。これはおまじないのようなものです。 - そして、スレッドの実行の入口は
def entry(self)
となり、スレッドオブジェクト自身以外の引数は受け付けられません。そのため、コンストラクタ内で self.x、slef.y のメンバに引数を格納しておきます。
同一クラスのスレッド全てと join するサンプル(04.py)
04.py
from commonthread import *
lg = CommonThreadLogger()
lg.setup_basic()
class ShortThread(CommonThread):
def entry(self, duration):
lg.debug('start')
time.sleep(duration)
lg.debug('end')
return 'finished'
class LongThread(CommonThread):
def entry(self, duration):
lg.debug('start')
time.sleep(duration)
lg.debug('end')
return 'finished'
lg.debug('start')
sth1 = ShortThread(1.0); sth1.name = 'sth1'
sth2 = ShortThread(1.5); sth2.name = 'sth2'
lth1 = LongThread(5.0); lth1.name = 'lth1'
lth2 = LongThread(6.0); lth2.name = 'lth2'
sth1.start()
sth2.start()
lth1.start()
lth2.start()
lg.debug(CommonThread.list_alive())
CommonThread.join_all(type=ShortThread)
lg.debug(CommonThread.list_alive())
CommonThread.join_all()
lg.debug(CommonThread.list_alive())
04.py実行結果
MainThread: start
sth1: start
sth2: start
lth1: start
lth2: start
MainThread: [ShortThread(name=sth1, result=None, elapsed=0.0, args=(1.0,), kwargs={}, params={}), ShortThread(name=sth2, result=None, elapsed=0.0, args=(1.5,), kwargs={}, params={}), LongThread(name=lth1, result=None, elapsed=0.0, args=(5.0,), kwargs={}, params={}), LongThread(name=lth2, result=None, elapsed=0.0, args=(6.0,), kwargs={}, params={})]
sth1: end
sth2: end
MainThread: [LongThread(name=lth1, result=None, elapsed=0.0, args=(5.0,), kwargs={}, params={}), LongThread(name=lth2, result=None, elapsed=0.0, args=(6.0,), kwargs={}, params={})]
lth1: end
lth2: end
MainThread: []
解説(04.py)
- このサンプルでは初めて(デバッグ用)ログ出力を使ってみます。
lg = CommonThreadLogger()
とした後にlg.setup_basic()
でデバッグログが有効になります。デバッグが不要になったらlg.setup_basic()
の行をコメントアウトしてください。 -
lg.setup_basic()
を実行した後で、lg.debug(~)
を実行するとメッセージの先頭に「スレッド名: 」がついたログが標準エラーに出力されます。 - CommonThread クラスを継承した ShortThread と 同じく CommonThread を継承した LongThread を定義しました。定義内容はどちらも同じです。引数 duration に渡す待ち時間を調整することで ShortThread の方が早めに終了するようにしています。
-
CommonThread.join_all(type=ShortThread)
は ShortThread のインスタンス全てと join して終了を待ちます。 -
CommonThread.join_all()
は (LongThread も含め)すべての CommonThread のインスタンスと join して終了を待ちます。 -
CommonThread.list_alive()
は、全てのアクティブな(終了していない) CommonThread オブジェクトの一覧を返します。 -
CommonThread.list_alive(type=ShortThread)
は、全てのアクティブな(終了していない) ShortThread オブジェクトの一覧を返します。 - サンプルコードに含まれていませんが、
CommonThread.are_alive()
で CommonThread でアクティブなものが残っているかどうかを確認できます。CommonThread.are_alive(type=ShortThread)
のように指定すると ShortThread に限定してチェックできます。
最後に
今回紹介しきれなかった機能としては:
- CommonThread および WorkerThread に備わった入出力用キュー関連機能
- コマンドラインの引数解析用ライブラリ argparse を使った引数解析機能
- CommonThread および WorkerThread に備わった params メンバの利用方法
などがあります。別の記事でご紹介したいと思います。