LoginSignup
2
0

More than 1 year has passed since last update.

Python: 関数を並列実行する(排他処理制御付き)

Last updated at Posted at 2022-03-18

threading.Thread版

from threading import Lock, Thread
import time
from typing import Dict, List

# ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
locks: Dict[str, Lock] = {}

def f(
    s: str, # printしたい文字列
    id: str, # (例えば)ユーザーID
    wait_seconds: int, # 排他処理や並列処理が動作していることを確認するために使う
):
    start = time.time()
    # 排他処理用のLockオブジェクトがまだなければ作成して保持
    locks.update(**{id: Lock()}) if not locks.get(id) else None

    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        time.sleep(wait_seconds)
        print(f"{s}({int(time.time() - start)}秒経過) ")


if __name__ == "__main__":
    threads: List[Thread] = []

    threads.append(Thread(target=f, args=("A1", "A", 4)))
    threads.append(Thread(target=f, args=("A2", "A", 1)))
    threads.append(Thread(target=f, args=("B1", "B", 2)))
    threads.append(Thread(target=f, args=("B2", "B", 3)))
    # ※メモリなどのOS資源を個別に割り当てて、独立して実行させたいときは、ThreadではなくProcessを使う
    # I/O待ち時間が大きい処理はThread, CPU負荷やメモリ消費が大きい処理はProcess
    # (Processにすると、locksは参照できなくなるのでこのままでは排他制御は効かなくなる)

    [t.start() for t in threads]
    [t.join() for t in threads]

    print("全部完了。")

# B1(2秒経過) 
# A1(4秒経過) 
# A2(5秒経過) 
# B2(5秒経過)
# 全部完了。

この排他処理方法はメインプロセスが1基のとき前程ですが。

multiprocessing.Process版

import time
from multiprocessing import Lock, Process
from typing import Dict, List


def f(
    s: str,  # printしたい文字列
    id: str,  # (例えば)ユーザーID
    wait_seconds: int,  # 排他処理や並列処理が動作していることを確認するために使う
    locks: Dict[str, Lock], # ユーザーID単位で排他処理するためのLockを格納しているDict
    start: float, # 親プロセスが開始した時間
):
    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        try:
            with open(f"{id}.txt", "r") as filer:
                lines = filer.readlines()
        except FileNotFoundError as e:
            lines = []
        try:
            with open(f"{id}.txt", "w") as filew:
                time.sleep(wait_seconds)
                lines.append(f"{s}(通算{int(time.time() - start)}秒経過) ")
                print("".join(lines), file=filew) # print()でfileにテキストを追記
        except Exception as e:
            with open(f"{id}_error.txt", "w") as ew:
                print(e, file=ew)


if __name__ == "__main__":
    threads: List[Process] = []

    # ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
    locks: Dict[str, Lock] = {
        "A": Lock(),
        "B": Lock(),
    }
    start = time.time() # 処理経過時間計測開始

    # 実際どれが先に実行されるかはご機嫌次第。ここに書いた順とは限らない
    threads.append(Process(target=f, args=("A1", "A", 4, locks, start)))
    threads.append(Process(target=f, args=("A2", "A", 1, locks, start)))
    threads.append(Process(target=f, args=("B1", "B", 2, locks, start)))
    threads.append(Process(target=f, args=("B2", "B", 3, locks, start)))

    [t.start() for t in threads]
    [t.join() for t in threads]

    print("全部完了。")

# 全部完了。

Processの場合、
完全に別プロセスになってしまうのでprintしてもmainの標準出力には表示されないので、
ファイルに吐いてみると、

  • A.txt
A2(通算1秒経過) 
A1(通算5秒経過)  # ちゃんと"A2"のtime.sleep(1)の後になっている
  • B.txt
B1(通算2秒経過) 
B2(通算5秒経過)   # ちゃんと"B1"のtime.sleep(2)の後になっている

ProcessPoolExecutor版

import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def f(
    args,
    # s: str,  # printしたい文字列
    # id: str,  # (例えば)ユーザーID
    # wait_seconds: int,  # 排他処理や並列処理が動作していることを確認するために使う
    # locks: Dict[str, Lock],  # ユーザーID単位で排他処理するためのLockを格納しているDict
    # start: float,  # 親プロセスが開始した時間
):
    s, id, wait_seconds, locks, start = args
    # ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
    with locks[id]:
        try:
            with open(f"{id}.txt", "r") as filer:
                lines = filer.readlines()
        except FileNotFoundError as e:
            lines = []
        try:
            with open(f"{id}.txt", "w") as filew:
                time.sleep(wait_seconds)
                timedelta = int(time.time() - start.value)
                lines.append(f"{s}(通算{timedelta}秒経過) ")
                print("".join(lines), file=filew)  # print()でfileにテキストを追記
                return timedelta
        except Exception as e:
            with open(f"{id}_error.txt", "w") as ew:
                print(e, file=ew)
                return 0


if __name__ == "__main__":
    with Manager() as manager:
        locks = dict()
        locks.update(
            **{
                "A": manager.Lock(),
                "B": manager.Lock(),
            }
        )
        start = manager.Value("q", time.time())  # https://docs.python.org/ja/3/library/array.html

        params = [
            ("A1", "A", 4, locks, start),
            ("A2", "A", 1, locks, start),
            ("B1", "B", 2, locks, start),
            ("B2", "B", 3, locks, start),
        ]

        with ProcessPoolExecutor(max_workers=2) as executor:
            [
                print(r)
                for r in executor.map(
                    f,
                    params,
                    chunksize=2,  # [A1,A2] と[B1,B2] の2集団に分けて処理される
                )
            ]

        print("全部完了。")

# 4
# 5
# 2
# 5
# 全部完了。
  • A.txt
A2(通算1秒経過) 
A1(通算5秒経過)  # ちゃんと"A2"のtime.sleep(1)の後になっている
  • B.txt
B1(通算2秒経過) 
B2(通算5秒経過)   # ちゃんと"B1"のtime.sleep(2)の後になっている

複数のサーバで構成されている分散環境での分散排他制御など

redisを使ったredlockなどがあるようですが、
https://redis.io/docs/reference/patterns/distributed-locks/

これ以上は、例えばロードバランサで同一ユーザーからのアクセスは同一インスタンスに転送されるようにしたり、そもそもロックに頼らなくても安全なRESTAPI設計を目指した方が良いみたいです。

2
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0