学習履歴

■はじめに

チャットアプリを作成する上で並列化処理について勉強したので、備忘として残しておく。

並列処理には、マルチスレッドとマルチプロセスの 2 種類あり、ここではマルチスレッドについて記載する。

■環境
python3

■参考
17.1. threading — スレッドベースの並列処理

■スレッド

スレッドを作成してみる。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')

def worker1():
    # thread の名前を取得
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')

def worker2():
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')

if __name__ == '__main__':
    # スレッドに workder1 関数を渡す
    t1 = threading.Thread(target=worker1)
    t2 = threading.Thread(target=worker2)
    # スレッドスタート
    t1.start()
    t2.start()
    print('started')
thread.実行結果
started
Thread-1: start
Thread-2: start
Thread-2: end
Thread-1: end

■スレッドに渡す引数

スレッドは引き数を渡して処理を実行させることもできる。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1():
    # thread の名前を取得
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')


def worker2(x, y=1):
    logging.debug('start')
    logging.debug(x)
    logging.debug(y)
    time.sleep(5)
    logging.debug('end')

if __name__ == '__main__':
    # スレッドに workder1 関数を渡す
    t1 = threading.Thread(name='rename worker1', target=worker1)
    t2 = threading.Thread(target=worker2, args=(100, ), kwargs={'y': 200})
    # スレッドスタート
    t1.start()
    t2.start()
    print('started')
thread.実行結果
rename worker1: start
Thread-1: start
Thread-1: 100
Thread-1: 200
Thread-1: end
rename worker1: end

■デーモンスレッド

スレッドは、setDaemon 関数で、デーモン化することができる。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')

def worker1():
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')


def worker2():
    logging.debug('start')
    time.sleep(2)
    logging.debug('end')

if __name__ == '__main__':
    t1 = threading.Thread(target=worker1)
    # スレッドをデーモン化する
    t1.setDaemon(True)
    t2 = threading.Thread(target=worker2)
    t1.start()
    t2.start()
    print('started')

ただし、デーモン化したスレッドは、起動状態であってもプログラムが終了した時点で、強制終了してしまう。

実行結果を確認すると、デーモン化していないスレッド 2 が終了した時点で、プログラムが終了している。

thread.実行結果
Thread-1: start
started
Thread-2: start
Thread-2: end

デーモン化したスレッドの処理の完了を待って、プログラムを終了させたいときは、join 関数を使用する必要がある。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1():
    # thread の名前を取得
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')


def worker2():
    logging.debug('start')
    time.sleep(2)
    logging.debug('end')

if __name__ == '__main__':
    t1 = threading.Thread(target=worker1)
    # t1 スレッドをデーモン化する
    t1.setDaemon(True)
    t2 = threading.Thread(target=worker2)
    t1.start()
    t2.start()
    print('started')
  # t1 スレッドの完了を待つ
    t1.join()
thread.実行結果
Thread-1: start
Thread-2: start
started
Thread-2: end
Thread-1: end

■スレッドの一覧

プログラム実行中に起動しているスレッドを複数作成して、一覧を出力してみよう。

<方法1>

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1():
    # thread の名前を取得
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')

if __name__ == '__main__':
    threads = []
    # スレッドを 5 個つくる
    for _ in range(5):
        t = threading.Thread(target=worker1)
        t.setDaemon(True)
        t.start()
        threads.append(t)
    for thread in threads:
        thread.join()
thread.実行結果
Thread-1: start
Thread-2: start
Thread-3: start
Thread-4: start
Thread-5: start
Thread-3: end
Thread-2: end
Thread-1: end
Thread-5: end
Thread-4: end

enumerate を使う方法もある。

<方法2>

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1():
    # thread の名前を取得
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')

if __name__ == '__main__':
    # スレッドを 5 個つくる
    for _ in range(5):
        t = threading.Thread(target=worker1)
        t.setDaemon(True)
        t.start()
    # 現在起動しているスレッドを enumerate でキャッチ
    print(threading.enumerate())
    for thread in threading.enumerate():
        # メインのスレッドは、一覧に加えない
        if thread is threading.current_thread():
            print(thread)
            continue
        thread.join()
thread.実行結果
Thread-1: start
[<_MainThread(MainThread, started 4040)>, <Thread(Thread-1, started daemon 14460)>, <Thread(Thread-2, started daemon 11096)>, <Thread(Thread-3, started daemon 4580)>, <Thread(Thread-4, started daemon 4064)>, <Thread(Thread-5, started daemon 1444)>]
Thread-2: start
<_MainThread(MainThread, started 4040)>
Thread-3: start
Thread-4: start
Thread-5: start
Thread-5: end
Thread-2: end
Thread-1: end
Thread-4: end
Thread-3: end

■タイマー

Timer 関数で、スレッドの起動時間を設定することができる。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(x, y=1):
    # thread の名前を取得
    logging.debug('start')
    logging.debug(x)
    logging.debug(y)
    time.sleep(5)
    logging.debug('end')

if __name__ == '__main__':
    # スレッドを 3 秒後にスタート
    t = threading.Timer(3, worker1, args=(100,), kwargs={'y': 200})
    t.start()

ここではわかりにくいが、3 秒経ってからプログラムがスタートしている。

thread.実行結果
Thread-1: start
Thread-1: 100
Thread-1: 200
Thread-1: end

■スレッドのロック

スレッドが同時に走ると不具合が生じることがあるので、ロックを書けることができる。

以下のプログラムは、ディクショナリの x の値をカウントしていくものだ。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(d):
    logging.debug('start')
    i = d['x']
    d['x'] = i + 1
    logging.debug(d)
    logging.debug('end')


def worker2(d):
    logging.debug('start')
    i = d['x']
    d['x'] = i + 1
    logging.debug(d)
    logging.debug('end')

if __name__ == '__main__':
    d = {'x': 0}
    t1 = threading.Thread(target=worker1, args=(d,))
    t2 = threading.Thread(target=worker2, args=(d,))
    t1.start()
    t2.start()

問題なく、カウントできていることがわかる。

thread.実行結果
Thread-1: start
Thread-1: {'x': 1}
Thread-1: end
Thread-2: start
Thread-2: {'x': 2}
Thread-2: end

しかし、スレッド内のカウントする処理の前に何らかの処理が入る場合、正常に動作しない。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(d):
    logging.debug('start')
    i = d['x']
    time.sleep(5)
    d['x'] = i + 1
    logging.debug(d)
    logging.debug('end')


def worker2(d):
    logging.debug('start')
    i = d['x']
    d['x'] = i + 1
    logging.debug(d)
    logging.debug('end')

if __name__ == '__main__':
    d = {'x': 0}
    t1 = threading.Thread(target=worker1, args=(d,))
    t2 = threading.Thread(target=worker2, args=(d,))
    t1.start()
    t2.start()
thread.実行結果
Thread-1: start
Thread-2: start
Thread-2: {'x': 1}
Thread-2: end
Thread-1: {'x': 1} #{カウントされない}
Thread-1: end

d[x] は、スレッド間で共有されているが、スレッド1が「x = 1」の値を保持した状態で
5 秒間停止してしまうので、その間にスレッド2が 「x = 1」の値を d[x] に格納する。
その後、スレッド1によって「x = 1」 の値で上書きしてしまうので、カウントされない。

これを防ぐために、スレッドにロックをかける。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(d, lock):
    logging.debug('start')
    # ②ロック実行
    lock.acquire()
    i = d['x']
    time.sleep(5)
    d['x'] = i + 1
    logging.debug(d)
    # ③アンロック
    lock.release()
    logging.debug('end')


def worker2(d, lock):
    logging.debug('start')
    # ②ロック実行(worker1の実行が完了するまで、処理を待つ)
    lock.acquire()
    i = d['x']
    d['x'] = i + 1
    logging.debug(d)
    # ③アンロック
    lock.release()
    logging.debug('end')

if __name__ == '__main__':
    d = {'x': 0}
    # ①ロックを作成
    lock = threading.Lock()
    t1 = threading.Thread(target=worker1, args=(d, lock))
    t2 = threading.Thread(target=worker2, args=(d, lock))
    t1.start()
    t2.start()
thread.実行結果
Thread-1: start
Thread-2: start
Thread-1: {'x': 1}
Thread-1: end
Thread-2: {'x': 2}
Thread-2: end

今回は、Lock 関数を使ったが、RLock というものもある。

■セマフォ

セマフォを使うとロックをかけるスレッド数をコントロールできる。

まずは、RLock を使った場合を見てみよう。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(lock):
    with lock:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')

def worker2(lock):
    with lock:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')

def worker3(lock):
    with lock:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')


if __name__ == '__main__':
    lock = threading.RLock()
    t1 = threading.Thread(target=worker1, args=(lock,))
    t2 = threading.Thread(target=worker2, args=(lock,))
    t3 = threading.Thread(target=worker3, args=(lock,))
    t1.start()
    t2.start()
    t3.start()

このプログラムを実行すると、スレッド1 ~ スレッド3 まで 5 秒間隔で実行されることがわかる。

thread.実行結果
Thread-1: start
Thread-1: end
Thread-2: start
Thread-2: end
Thread-3: start
Thread-3: end

次は、セマフォを使ってみる。

threadtest.py
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(semaphore):
    with semaphore:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')

def worker2(semaphore):
    with semaphore:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')

def worker3(semaphore):
    with semaphore:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')


if __name__ == '__main__':
    # セマフォを使用(引数には実行スレッド数を指定)
    semaphore = threading.Semaphore(2)
    t1 = threading.Thread(target=worker1, args=(semaphore,))
    t2 = threading.Thread(target=worker2, args=(semaphore,))
    t3 = threading.Thread(target=worker3, args=(semaphore,))
    t1.start()
    t2.start()
    t3.start()

セマフォを使った場合は、スレッド1とスレッド2が最初に実行されて、最後にスレッド3が走っていることが確認できる。

thread.実行結果
Thread-1: start
Thread-2: start
Thread-1: end
Thread-2: end
Thread-3: start
Thread-3: end

■キュー

キューを使うとスレッド間でデータのやり取りができるようになる。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(queue):
    logging.debug('start')
    # キューに値を格納
    queue.put(100) #[100]
    time.sleep(5)
    queue.put(200) #[100, 200]
    logging.debug('end')

def worker2(queue):
    logging.debug('start')
    # キューを取得
    logging.debug(queue.get())
    logging.debug(queue.get()) #キューはロックの性質も持っており、put(200)が実行されるまで待ち続ける
    logging.debug('end')

if __name__ == '__main__':
    # キューを作成(キューはファーストインファーストアウト)
    queue = queue.Queue()
    t1 = threading.Thread(target=worker1, args=(queue,))
    t2 = threading.Thread(target=worker2, args=(queue,))
    t1.start()
    t2.start()

thread.実行結果
Thread-1: start
Thread-2: start
Thread-2: 100
Thread-1: end
Thread-2: 200
Thread-2: end

スレッドの数がいっぱいあって処理も長いとき、全てのスレッドが完了してからプログラムを終了したい場合は、
以下の様に処理する。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(queue):
    logging.debug('start')
    while True:
        item = queue.get()
        if item is None:
            break
        logging.debug(item)
        queue.task_done()
    logging.debug('--------------すごい長い処理-------------------')
    logging.debug('end')


if __name__ == '__main__':
    # キューを作成(キューはファーストインファーストアウト)
    queue = queue.Queue()
    for i in range(10):
        queue.put(i)
    t1 = threading.Thread(target=worker1, args=(queue,))
    t1.start()
    logging.debug('tasks are not done')
    queue.join()
    logging.debug('tasks are done')
    queue.put(None)
    t1.join()
thread.実行結果
Thread-1: start
MainThread: tasks are not done
Thread-1: 0
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-1: 5
Thread-1: 6
Thread-1: 7
Thread-1: 8
Thread-1: 9
MainThread: tasks are done
Thread-1: --------------すごい長い処理-------------------
Thread-1: end

もう少し長いバージョン。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(queue):
    logging.debug('start')
    while True:
        item = queue.get()
        if item is None:
            break
        logging.debug(item)
        queue.task_done()
    logging.debug('--------------すごい長い処理-------------------')
    logging.debug('end')


if __name__ == '__main__':
    # キューを作成(キューはファーストインファーストアウト)
    queue = queue.Queue()
    for i in range(10000):
        queue.put(i)
    ts = []
    for _ in range(3):
        t = threading.Thread(target=worker1, args=(queue,))
        t.start()
        ts.append(t)
    logging.debug('tasks are not done')
    queue.join()
    logging.debug('tasks are done')
    for _ in range(len(ts)):
        queue.put(None)

    [t.join() for t in ts]
thread.実行結果
...省略

Thread-3: 9999
Thread-1: 9996
Thread-2: 9900
MainThread: tasks are done
Thread-3: --------------すごい長い処理-------------------
Thread-3: end
Thread-1: --------------すごい長い処理-------------------
Thread-1: end
Thread-2: --------------すごい長い処理-------------------
Thread-2: end

■イベント

あるスレッドでイベントを発生させ、イベントをトリガーに他のスレッドを実行させることができる。

以下は、スレッド3の処理が完了後にスレッド1~2が起動するプログラムだ。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(event):
    # event.set が実行されるまで待機
    event.wait()
    logging.debug('start')
    time.sleep(3)
    logging.debug('end')

def worker2(event):
    # event.set が実行されるまで待機
    event.wait()
    logging.debug('start')
    time.sleep(3)
    logging.debug('end')

def worker3(event):
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')
    # event.waitにしたスレッドを実行
    event.set()

if __name__ == '__main__':
    # イベントを作成
    event = threading.Event()
    t1 = threading.Thread(target=worker1, args=(event,))
    t2 = threading.Thread(target=worker2, args=(event,))
    t3 = threading.Thread(target=worker3, args=(event,))
    t1.start()
    t2.start()
    t3.start()

スレッド3の実行が完了後、スレッド1~2が実行されていることがわかる。

thread.実行結果
Thread-3: start
Thread-3: end
Thread-1: start
Thread-2: start
Thread-2: end
Thread-1: end

スレッド3で特定の処理をさせ、その実行結果をスレッド1~2で使用する、みたいな使い方もできそうだ。

■コンディション

イベントは、スレッドの完了をトリガーに他のスレッドを起動させられるので便利だが、複数のスレッドを
一斉に動作させることが望ましくない場合がある。(ファイルへの書き込み等)

コンディションはイベントと似たような機能だが、スレッドにロックがかけられる。

以下は、スレッド3の実行が完了後、スレッド1を先にロックし、最後にスレッド2を動作させるプログラムだ。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(condition):
    # condition.notifyAll が実行されるまで待機
    # ロックを取得
    with condition:
        condition.wait()
        logging.debug('start')
        time.sleep(3)
        logging.debug('end')

def worker2(condition):
    # condition.notifyAll が実行されるまで待機
    # ロックを取得
    with condition:
        condition.wait()
        logging.debug('start')
        time.sleep(3)
        logging.debug('end')

def worker3(condition):
    with condition:
        logging.debug('start')
        time.sleep(5)
        logging.debug('end')
        # condition.waitにしたスレッドを実行
        condition.notifyAll()

if __name__ == '__main__':
    # コンディションを作成
    condition = threading.Condition()
    t1 = threading.Thread(target=worker1, args=(condition,))
    t2 = threading.Thread(target=worker2, args=(condition,))
    t3 = threading.Thread(target=worker3, args=(condition,))
    t1.start()
    t2.start()
    t3.start()

スレッド3→スレッド1→スレッド2の順番に処理が実行されている。

thread.実行結果
Thread-3: start
Thread-3: end
Thread-1: start
Thread-1: end
Thread-2: start
Thread-2: end

■バリア

指定した数のスレッドが立ち上がるまで、処理を待機させておくことができる。

threadtest.py
import logging
import threading
import time
import queue

logging.basicConfig(level=logging.DEBUG, format='%(threadName)s: %(message)s')


def worker1(barrier):
    r = barrier.wait()
    logging.debug('num={}'.format(r))
    while True:
        logging.debug('start')
        time.sleep(2)
        logging.debug('end')

def worker2(barrier):
    r = barrier.wait()
    logging.debug('num={}'.format(r))
    while True:
        logging.debug('start')
        time.sleep(2)
        logging.debug('end')


if __name__ == '__main__':
    # バリアを作成(2個のスレッドが立ち上がるまで、処理を待機)
    barrier = threading.Barrier(2)
    t1 = threading.Thread(target=worker1, args=(barrier,))
    t2 = threading.Thread(target=worker2, args=(barrier,))
    t1.start()
    t2.start()

スレッドが 2 個立ち上がった後、while ループが回り始めたことが確認できる。

thread.実行結果
Thread-2: num=1
Thread-2: start
Thread-1: num=0
Thread-1: start
Thread-1: end
Thread-1: start
Thread-2: end
Thread-2: start
Thread-2: end

チャットプログラムなんかは、ソケットサーバ及びクライアントのプログラムが必要だが、
両方立ち上がっていないと処理をしない、みたいな実装ができそうだ。

■まとめ

マルチスレッドの機能を使えば処理の並列化をすることが可能なのは理解できたが、使いこなすのは難しそうだ。

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.