python3

[Python] スレッドで実装する

スレッドとは

通常Pythonでは逐次処理により一つずつ処理が実行されます。逐次処理では一度に一つの処理しか行うことができないため、一つの処理が終了するまで次の処理を行うことができません。

そのため、ファイル操作や外部との通信といった相手側の応答を待つような処理を行った場合、待っている間は何も処理が行われません。
このような場合にでも、プログラムの処理性能を上げるための方法として並列処理があります。並列処理では複数の処理を同時に実行するため、逐次処理に比べ早く処理を行うことができます。その並列処理を行う方法の一つがスレッドの使用になります。

スレッド(threading)

スレッドの使用方法は色々ありますが、まずはthreadingを使用する方法からみてみましょう。

threadingにてスレッドを実装する

まず、逐次処理から始めてみます。次のうどんを作るサンプルを見てみましょう。

sample_thread1.py
import time

def boil_udon():
  print('  うどんを茹でます。')
  time.sleep(3)
  print('  うどんが茹であがりました。')

def make_tuyu():
  print('  ツユをつくります。')
  time.sleep(2)
  print('  ツユができました。')

print('うどんを作ります。')
boil_udon()
make_tuyu()
print('盛り付けます。')
print('うどんができました。')

うどんを茹でるboil_udonメソッドを呼び出した後に、ツユを作るmake_tuyuメソッドを呼び出してています。
これを実行すると以下のようになります。

うどんを作ります。
  うどんを茹でます。
  うどんが茹であがりました。
  ツユをつくります。
  ツユができました。
盛り付けます。
うどんができました。

逐次処理なのでうどんが茹で上がってからツユを作っています。これだと、ツユが出来上がるまでにうどんが伸びてしまいますね。うどんが伸びないようにするにはどうしますか?うどんを茹でながらツユを作れば大丈夫ですよね。

どうすれば、うどんを茹でながらツユを作ることができるでしょうか?スレッドを使用して並列処理で処理を行えばいいのです。スレッドを使用すれば複数の処理を同時に行うことができます。
スレッドを使用したサンプルを見てみましょう。

sample_thread2.py
import time
import threading

def boil_udon():
  print('  うどんを茹でます。')
  time.sleep(3)
  print('  うどんが茹であがりました。')

def make_tuyu():
  print('  ツユをつくります。')
  time.sleep(2)
  print('  ツユができました。')

print('うどんを作ります。')
thread1 = threading.Thread(target=boil_udon)
thread2 = threading.Thread(target=make_tuyu)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print('盛り付けます。')
print('うどんができました。')

boil_udonとmake_tuyuは変更ありませんが、これらのメソッドの呼び方が変わっています。スレッドを使用して両メソッドを呼び出すようにしました。

threading.Threadを使用し引数に実行するメソッドを指定してインスタンスを生成します。生成されたインスタンスのstartメソッドを呼び出すことで、スレッド上で処理が行われ、すぐに呼び出し元に処理が戻ってきます。

同時に処理を行えるのはいいのですが、それぞれの処理が終わるまで待たないと盛り付けができないので、joinメソッドを呼び出します。joinメソッドは呼び出すとスレッドの処理が終わるまで待ち状態になります。

この実行結果は次のようになります。

うどんを作ります。
  うどんを茹でます。
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
盛り付けます。
うどんができました。

うどんを茹でるのとツユを作るのを同時に行っているのがわかります。どうやらうどんが伸びることもなく、また同時に処理を行うのでうどんを作り終えるまでの時間も短くなりました。

threadingでの実装まとめ

  • threading.Threadにてインスタンス作成、引数にスレッドに実行させたいメソッドを指定する
  • startメソッドでスレッドが開始される
  • スレッドが終わるまで待つにはjoinメソッドを呼び出す

スレッドプール

threadingの使い方を覚えたので、色々な処理を並列で実行することができるようになりました。うどんを同時に100個茹でたり、1000個茹でたりすることも可能でしょう。しかし、現実世界ではうどんの入る数は鍋の大きさを超えることはできません。また、コンロの火力にも限界があるので、うどんをたくさん入れると茹で上がるまでにかかる時間も遅くなってしまいます。

これをプログラムの世界で考えるとお鍋と火力は、メモリやCPUに相当します。スレッドをたくさん使うとメモリを消費しますし、CPUが処理できる量は決まっているため一つ一つの処理が終わるまで時間が長くなってしまいます。

さて、うどん100個を鍋から溢れないように効率よく茹でるにはどうすればよいでしょうか?一度に茹でるうどんの数を制限してやればいいのです。そのためにスレッドプールを使用してみましょう。

ThreadPoolExecutorでの実装

concurrent.futuresモジュールのThreadPoolExecutorはスレッドを使用して並列処理を行うための機能を提供するクラスです。インスタンス生成時にmax_workersを指定して同時に実行するスレッド数を指定することができます。
それではサンプルを見てみましょう。

sample_thread3.py
from concurrent.futures import ThreadPoolExecutor
import time

def boil_udon():
  print('  うどんを茹でます。')
  time.sleep(3)
  print('  うどんが茹であがりました。')

tpe = ThreadPoolExecutor(max_workers=3)

print('うどんを100個茹でます。')
for _ in range(100):
    tpe.submit(boil_udon)

tpe.shutdown()
print('うどんが100個茹で上がりました。')

ThreadPoolExecutorにmax_workersを指定してインスタンスを生成します。今回は3を指定しました。
submitメソッドに実行する処理を指定していきます。submitメソッドで登録された処理はExecutor内のキューに追加され、他の処理が終わった後に順に実行されていきます。
最後にshutdownメソッドを呼び出すと、ThreadPoolExecutorに登録した処理がすべて終わるまで待ち状態になります。

これを実行すると最大3つの処理が同時に行われているのがわかると思います。

ThreadPoolExecutorでの実装まとめ

  • ThreadPoolExecutorインスタンス生成時にmax_workersで同時処理数を設定する
  • 処理はsubmitメソッドにて登録する
  • 登録した処理が全て終わるまで待つにはshutdownメソッドを呼び出す

Future

スレッドで実行した結果を受け取りたい、ということはよくあることだと思います。その場合にはFutureを使用します。Futureはスレッドの状態を知ることや、スレッドで実行された結果を受けとることができます。

Futureを使用する

FutureはThreadPoolExecutorでsubumitメソッドを呼び出したときの戻り値で受け取ることができます。受け取ったFutureのresultメソッドを呼び出せば、スレッドで処理が終わっていれば戻り値を取得でき、スレッド処理が終わっていなければ終わるまで待った後、戻り値を取得できます(引数にtimeoutを指定すれば指定した時間まで待ち、それまでに終了しなければ例外が発生します)。ではサンプルを見てみましょう。

sample_thread4.py
from concurrent.futures import ThreadPoolExecutor
import time

def make_udon(kind):
    print('  %sうどんを作ります。' % kind)
    time.sleep(3)
    return kind + 'うどん'

kinds = ['たぬき', 'かけ', 'ざる', 'きつね', '天ぷら', '肉']
executor = ThreadPoolExecutor(max_workers=3)
futures = []

for kind in kinds:
    print('%sうどん オーダー入りました。' % kind)
    future = executor.submit(make_udon, kind)
    futures.append(future)

for future in futures:
    print('%sお待たせしました。' % future.result())

executor.shutdown()

ThreadPoolExecutorのsubmitメソッド呼び出しでFutureを受け取り、Futureのresultメソッドでmake_udonメソッドの戻り値を取得しています。実行すればmake_udonの戻り値を取得して「XXXお待たせしました。」と表示されると思います。

ただ、このサンプルはあまり良くありません。make_udonメソッドは3秒と決まった時間で処理が終わりますが、処理時間がいつ終わるか分からない場合。例えば、最初の処理が10秒かかり、二番目以降の処理が1秒しかかからない場合には、このサンプルのような処理をしていると最初の処理のFutureのresultメソッド呼び出しで処理が止まってしまい。それが終わるまで既に終わっている二番目以降の処理結果が受け取れなくなります。

このような事態を回避する方法は色々とありますが、Futureにはスレッドの処理状態を確認するメソッドのrunningやdoneなどがあるので、それらを使用して回避することもできると思います。

runningメソッドはスレッドが現在実行中ならTrueを返却します。doneメソッドはスレッドが終了していればTrueを返却します。

Futureのまとめ

  • FutureはThreadPoolExecutorでsubumitメソッドを呼び出したときの戻り値で取得できる
  • Futureのresultメソッドを呼べばスレッドの実行結果を取得できる
  • Futureのrunning、doneメソッド等でスレッドの実行状態を確認できる

最後に

threadingやThreadPoolExcecutorでのスレッドを使用した実装を見てきました。実装自体は簡単にできると思いますが、並列処理ではデータの整合性、処理順などを意識した実装をする必要がある、デバッグが困難である、最適なスレッド数を求める必要がある、など大変な面もあります。なので、その辺の見極めが必要になってくると思います。

ソース:
sample_thread1.py
sample_thread2.py
sample_thread3.py
sample_thread4.py