LoginSignup
0

posted at

updated at

Python: 関数を並列実行する(排他処理制御付き)

threading.Thread版

from threading import Lock, Thread
import time
from typing import Dict, List

# ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
locks: Dict[str, Lock] = {}

def f(
    s: str, # printしたい文字列
    id: str, # (例えば)ユーザーID
    wait_seconds: int, # 排他処理や並列処理が動作していることを確認するために使う
):
    start = time.time()
    # 排他処理用のLockオブジェクトがまだなければ作成して保持
    locks.update(**{id: Lock()}) if not locks.get(id) else None

    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        time.sleep(wait_seconds)
        print(f"{s}({int(time.time() - start)}秒経過) ")


if __name__ == "__main__":
    threads: List[Thread] = []

    threads.append(Thread(target=f, args=("A1", "A", 4)))
    threads.append(Thread(target=f, args=("A2", "A", 1)))
    threads.append(Thread(target=f, args=("B1", "B", 2)))
    threads.append(Thread(target=f, args=("B2", "B", 3)))
    # ※メモリなどのOS資源を個別に割り当てて、独立して実行させたいときは、ThreadではなくProcessを使う
    # I/O待ち時間が大きい処理はThread, CPU負荷やメモリ消費が大きい処理はProcess
    # (Processにすると、locksは参照できなくなるのでこのままでは排他制御は効かなくなる)

    [t.start() for t in threads]
    [t.join() for t in threads]

    print("全部完了。")

# B1(2秒経過) 
# A1(4秒経過) 
# A2(5秒経過) 
# B2(5秒経過)
# 全部完了。

この排他処理方法はメインプロセスが1基のとき前程ですが。

multiprocessing.Process版

import time
from multiprocessing import Lock, Process
from typing import Dict, List


def f(
    s: str,  # printしたい文字列
    id: str,  # (例えば)ユーザーID
    wait_seconds: int,  # 排他処理や並列処理が動作していることを確認するために使う
    locks: Dict[str, Lock], # ユーザーID単位で排他処理するためのLockを格納しているDict
    start: float, # 親プロセスが開始した時間
):
    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        try:
            with open(f"{id}.txt", "r") as filer:
                lines = filer.readlines()
        except FileNotFoundError as e:
            lines = []
        try:
            with open(f"{id}.txt", "w") as filew:
                time.sleep(wait_seconds)
                lines.append(f"{s}(通算{int(time.time() - start)}秒経過) ")
                print("".join(lines), file=filew) # print()でfileにテキストを追記
        except Exception as e:
            with open(f"{id}_error.txt", "w") as ew:
                print(e, file=ew)


if __name__ == "__main__":
    threads: List[Process] = []

    # ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
    locks: Dict[str, Lock] = {
        "A": Lock(),
        "B": Lock(),
    }
    start = time.time() # 処理経過時間計測開始

    # 実際どれが先に実行されるかはご機嫌次第。ここに書いた順とは限らない
    threads.append(Process(target=f, args=("A1", "A", 4, locks, start)))
    threads.append(Process(target=f, args=("A2", "A", 1, locks, start)))
    threads.append(Process(target=f, args=("B1", "B", 2, locks, start)))
    threads.append(Process(target=f, args=("B2", "B", 3, locks, start)))

    [t.start() for t in threads]
    [t.join() for t in threads]

    print("全部完了。")

# 全部完了。

Processの場合、
完全に別プロセスになってしまうのでprintしてもmainの標準出力には表示されないので、
ファイルに吐いてみると、

  • A.txt
A2(通算1秒経過) 
A1(通算5秒経過)  # ちゃんと"A2"のtime.sleep(1)の後になっている
  • B.txt
B1(通算2秒経過) 
B2(通算5秒経過)   # ちゃんと"B1"のtime.sleep(2)の後になっている

ProcessPoolExecutor版

import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def f(
    args,
    # s: str,  # printしたい文字列
    # id: str,  # (例えば)ユーザーID
    # wait_seconds: int,  # 排他処理や並列処理が動作していることを確認するために使う
    # locks: Dict[str, Lock],  # ユーザーID単位で排他処理するためのLockを格納しているDict
    # start: float,  # 親プロセスが開始した時間
):
    s, id, wait_seconds, locks, start = args
    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        try:
            with open(f"{id}.txt", "r") as filer:
                lines = filer.readlines()
        except FileNotFoundError as e:
            lines = []
        try:
            with open(f"{id}.txt", "w") as filew:
                time.sleep(wait_seconds)
                timedelta = int(time.time() - start.value)
                lines.append(f"{s}(通算{timedelta}秒経過) ")
                print("".join(lines), file=filew)  # print()でfileにテキストを追記
                return timedelta
        except Exception as e:
            with open(f"{id}_error.txt", "w") as ew:
                print(e, file=ew)
                return 0


if __name__ == "__main__":
    with Manager() as manager:
        locks = dict()
        locks.update(
            **{
                "A": manager.Lock(),
                "B": manager.Lock(),
            }
        )
        start = manager.Value("q", time.time())  # https://docs.python.org/ja/3/library/array.html

        params = [
            ("A1", "A", 4, locks, start),
            ("A2", "A", 1, locks, start),
            ("B1", "B", 2, locks, start),
            ("B2", "B", 3, locks, start),
        ]

        with ProcessPoolExecutor(max_workers=2) as executor:
            [
                print(r)
                for r in executor.map(
                    f,
                    params,
                    chunksize=2,  # [A1,A2] と[B1,B2] の2集団に分けて処理される
                )
            ]

        print("全部完了。")

# 4
# 5
# 2
# 5
# 全部完了。
  • A.txt
A2(通算1秒経過) 
A1(通算5秒経過)  # ちゃんと"A2"のtime.sleep(1)の後になっている
  • B.txt
B1(通算2秒経過) 
B2(通算5秒経過)   # ちゃんと"B1"のtime.sleep(2)の後になっている

複数のサーバで構成されている分散環境での分散排他制御など

redisを使ったredlockなどがあるようですが、
https://redis.io/docs/reference/patterns/distributed-locks/

これ以上は、例えばロードバランサで同一ユーザーからのアクセスは同一インスタンスに転送されるようにしたり、そもそもロックに頼らなくても安全なRESTAPI設計を目指した方が良いみたいです。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
0