0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Python の threading パッケージを使いやすくするライブラリ(commonthread)を使ってみました

Last updated at Posted at 2020-09-16

Python の threading パッケージを使いやすくするライブラリ(commonthread)を使ってみました。
https://pypi.org/project/commonthread/
使ってみましたというか、作ってみましたの方が正確です。

インストール方法

commonthreadパッケージのインストール方法
pip install -U commonthread

インストールされるクラス

  • class CommonThreadLogger
  • class CommonThread(threading.Thread)
  • class WorkerThread(CommonThread)

最初のサンプル(01.py)

01.py
from commonthread import *


# source https://techacademy.jp/magazine/28155
def fibonacci_worker(th: WorkerThread, n: int):
    if n <= 2:
        return 1
    else:
        return fibonacci_worker(th, n - 2) + fibonacci_worker(th, n - 1)


thread = WorkerThread(fibonacci_worker, 36)
thread.name = 'tfib'
thread.start()
success = thread.join(timeout=0.5)
print('success={}'.format(success))
success = thread.join()
print('success={}'.format(success))
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
01.py実行結果
success=False
success=True
result=14930352
elapsed=3.688599109649658
WorkerThread(name=tfib, result=14930352, elapsed=3.688599109649658, args=(36,), kwargs={}, params={})

解説(01.py)

  • このパッケージに含まれる CommonThread クラスを継承した WorkerThread クラスから紹介します。thread = WorkerThread(fibonacci_worker, 36) とコードにあるように、WorkerThread のコンストラクタの第一引数はワーカー関数の名前となります。第二引数以降はワーカー関数に渡される引数となります。ワーカー関数の引数の数とマッチしている限りいくつでも渡せます。
  • ワーカー関数の第一引数は、def fibonacci_worker(th: WorkerThread, n: int) とコードにあるように WorkerThread オブジェクトを取ります。この引数は受け取る必要はありますが、使わなくてもOKです。
  • thread.join() した後に、thread.resultthread.elapsed を print しています。ワーカー関数の fibonacci_worker が返した値が WorkerThread オブジェクトの result メンバに自動的に格納されます。また、ワーカー関数の fibonacci_worker の実行にかかった経過時間が elapsed メンバに格納されています。
  • CommonThread およびその子クラスは print(thread) のように印字すると name, result, elapsed 等が表示されるようになっています。(デバッグ用途)

ワーカー関数から戻り値を返したいことはたまにあるので、そのような用途向けに最適化されています。

クラスベースのスレッドのサンプル(02.py)

02.py
from commonthread import *


# source https://techacademy.jp/magazine/28155
class FibonacciThread(CommonThread):

    def entry(self, n: int):
        if n <= 2:
            return 1
        else:
            return self.entry(n - 2) + self.entry(n - 1)


thread = FibonacciThread(36)
thread.name = 'tfib'
thread.start()
thread.join()
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
02.py実行結果
result=14930352
elapsed=4.399198055267334
FibonacciThread(name=tfib, result=14930352, elapsed=4.399198055267334, args=(36,), kwargs={}, params={})

解説(02.py)

  • class FibonacciThread(CommonThread) とあるように、(ワーカー関数ベースでなく)クラスベースのスレッドを定義するには、CommonThread クラスを継承します。
  • スレッドの入口は run(self) ではなく entry(self...) となります。このフィボナッチ数を計算するスレッドでは整数の引数を一つだけとりますので、def entry(self, n: int) となってます。CommonThread を継承したクラスのコンストラクタをいじることなく、entry メソッドに任意の数の引数を持たせることができます。簡単な処理であれば、わざわざコンストラクタの定義を作るまでもありませんので重宝します。
  • それ以外は 01.py とほぼ同じです。

コンストラクを定義したクラスベースのスレッドのサンプル(03.py)

03.py
from commonthread import *


class AddThread(CommonThread):

    def __init__(self, x: int, y: int):
        CommonThread.__init__(self)
        self.x = x
        self.y = y

    def entry(self):
        time.sleep(2.0)
        return self.x + self.y


thread = AddThread(11, 22)
thread.start()
thread.join()
print('result={}'.format(thread.result))
print('elapsed={}'.format(thread.elapsed))
print(thread)
03.py実行結果
result=33
elapsed=2.0015878677368164
AddThread(name=Thread-1, result=33, elapsed=2.0015878677368164, args=(), kwargs={}, params={})

解説(03.py)

  • class AddThread(CommonThread) の場合は def __init__(self, x: int, y: int) というようにコンストラクタを定義して x と y という引数を受け付けることを指示しています。このように CommonThread の子クラスにおいて __init__ を定義する際には、その最初の実行行として CommonThread.__init__(self) を記述しなければなりません。これはおまじないのようなものです。
  • そして、スレッドの実行の入口は def entry(self) となり、スレッドオブジェクト自身以外の引数は受け付けられません。そのため、コンストラクタ内で self.x、slef.y のメンバに引数を格納しておきます。

同一クラスのスレッド全てと join するサンプル(04.py)

04.py
from commonthread import *


lg = CommonThreadLogger()
lg.setup_basic()


class ShortThread(CommonThread):

    def entry(self, duration):
        lg.debug('start')
        time.sleep(duration)
        lg.debug('end')
        return 'finished'


class LongThread(CommonThread):

    def entry(self, duration):
        lg.debug('start')
        time.sleep(duration)
        lg.debug('end')
        return 'finished'


lg.debug('start')

sth1 = ShortThread(1.0); sth1.name = 'sth1'
sth2 = ShortThread(1.5); sth2.name = 'sth2'

lth1 = LongThread(5.0); lth1.name = 'lth1'
lth2 = LongThread(6.0); lth2.name = 'lth2'

sth1.start()
sth2.start()
lth1.start()
lth2.start()

lg.debug(CommonThread.list_alive())

CommonThread.join_all(type=ShortThread)

lg.debug(CommonThread.list_alive())

CommonThread.join_all()

lg.debug(CommonThread.list_alive())
04.py実行結果
MainThread: start
sth1: start
sth2: start
lth1: start
lth2: start
MainThread: [ShortThread(name=sth1, result=None, elapsed=0.0, args=(1.0,), kwargs={}, params={}), ShortThread(name=sth2, result=None, elapsed=0.0, args=(1.5,), kwargs={}, params={}), LongThread(name=lth1, result=None, elapsed=0.0, args=(5.0,), kwargs={}, params={}), LongThread(name=lth2, result=None, elapsed=0.0, args=(6.0,), kwargs={}, params={})]
sth1: end
sth2: end
MainThread: [LongThread(name=lth1, result=None, elapsed=0.0, args=(5.0,), kwargs={}, params={}), LongThread(name=lth2, result=None, elapsed=0.0, args=(6.0,), kwargs={}, params={})]
lth1: end
lth2: end
MainThread: []

解説(04.py)

  • このサンプルでは初めて(デバッグ用)ログ出力を使ってみます。lg = CommonThreadLogger() とした後に lg.setup_basic() でデバッグログが有効になります。デバッグが不要になったら lg.setup_basic() の行をコメントアウトしてください。
  • lg.setup_basic() を実行した後で、lg.debug(~) を実行するとメッセージの先頭に「スレッド名: 」がついたログが標準エラーに出力されます。
  • CommonThread クラスを継承した ShortThread と 同じく CommonThread を継承した LongThread を定義しました。定義内容はどちらも同じです。引数 duration に渡す待ち時間を調整することで ShortThread の方が早めに終了するようにしています。
  • CommonThread.join_all(type=ShortThread) は ShortThread のインスタンス全てと join して終了を待ちます。
  • CommonThread.join_all() は (LongThread も含め)すべての CommonThread のインスタンスと join して終了を待ちます。
  • CommonThread.list_alive() は、全てのアクティブな(終了していない) CommonThread オブジェクトの一覧を返します。
  • CommonThread.list_alive(type=ShortThread) は、全てのアクティブな(終了していない) ShortThread オブジェクトの一覧を返します。
  • サンプルコードに含まれていませんが、CommonThread.are_alive() で CommonThread でアクティブなものが残っているかどうかを確認できます。CommonThread.are_alive(type=ShortThread) のように指定すると ShortThread に限定してチェックできます。

最後に

今回紹介しきれなかった機能としては:

  • CommonThread および WorkerThread に備わった入出力用キュー関連機能
  • コマンドラインの引数解析用ライブラリ argparse を使った引数解析機能
  • CommonThread および WorkerThread に備わった params メンバの利用方法

などがあります。別の記事でご紹介したいと思います。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?