threading.Thread版
from threading import Lock, Thread
import time
from typing import Dict, List
# ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
locks: Dict[str, Lock] = {}
def f(
s: str, # printしたい文字列
id: str, # (例えば)ユーザーID
wait_seconds: int, # 排他処理や並列処理が動作していることを確認するために使う
):
start = time.time()
# 排他処理用のLockオブジェクトがまだなければ作成して保持
locks.update(**{id: Lock()}) if not locks.get(id) else None
# ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
with locks[id]:
time.sleep(wait_seconds)
print(f"{s}({int(time.time() - start)}秒経過) ")
if __name__ == "__main__":
threads: List[Thread] = []
threads.append(Thread(target=f, args=("A1", "A", 4)))
threads.append(Thread(target=f, args=("A2", "A", 1)))
threads.append(Thread(target=f, args=("B1", "B", 2)))
threads.append(Thread(target=f, args=("B2", "B", 3)))
# ※メモリなどのOS資源を個別に割り当てて、独立して実行させたいときは、ThreadではなくProcessを使う
# I/O待ち時間が大きい処理はThread, CPU負荷やメモリ消費が大きい処理はProcess
# (Processにすると、locksは参照できなくなるのでこのままでは排他制御は効かなくなる)
[t.start() for t in threads]
[t.join() for t in threads]
print("全部完了。")
# B1(2秒経過)
# A1(4秒経過)
# A2(5秒経過)
# B2(5秒経過)
# 全部完了。
この排他処理方法はメインプロセスが1基のとき前程ですが。
multiprocessing.Process版
import time
from multiprocessing import Lock, Process
from typing import Dict, List
def f(
s: str, # printしたい文字列
id: str, # (例えば)ユーザーID
wait_seconds: int, # 排他処理や並列処理が動作していることを確認するために使う
locks: Dict[str, Lock], # ユーザーID単位で排他処理するためのLockを格納しているDict
start: float, # 親プロセスが開始した時間
):
# ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
with locks[id]:
try:
with open(f"{id}.txt", "r") as filer:
lines = filer.readlines()
except FileNotFoundError as e:
lines = []
try:
with open(f"{id}.txt", "w") as filew:
time.sleep(wait_seconds)
lines.append(f"{s}(通算{int(time.time() - start)}秒経過) ")
print("".join(lines), file=filew) # print()でfileにテキストを追記
except Exception as e:
with open(f"{id}_error.txt", "w") as ew:
print(e, file=ew)
if __name__ == "__main__":
threads: List[Process] = []
# ユーザーIDごとに排他処理をするのに使うLockオブジェクトを保持する
locks: Dict[str, Lock] = {
"A": Lock(),
"B": Lock(),
}
start = time.time() # 処理経過時間計測開始
# 実際どれが先に実行されるかはご機嫌次第。ここに書いた順とは限らない
threads.append(Process(target=f, args=("A1", "A", 4, locks, start)))
threads.append(Process(target=f, args=("A2", "A", 1, locks, start)))
threads.append(Process(target=f, args=("B1", "B", 2, locks, start)))
threads.append(Process(target=f, args=("B2", "B", 3, locks, start)))
[t.start() for t in threads]
[t.join() for t in threads]
print("全部完了。")
# 全部完了。
Processの場合、
完全に別プロセスになってしまうのでprintしてもmainの標準出力には表示されないので、
ファイルに吐いてみると、
- A.txt
A2(通算1秒経過)
A1(通算5秒経過) # ちゃんと"A2"のtime.sleep(1)の後になっている
- B.txt
B1(通算2秒経過)
B2(通算5秒経過) # ちゃんと"B1"のtime.sleep(2)の後になっている
ProcessPoolExecutor版
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
def f(
args,
# s: str, # printしたい文字列
# id: str, # (例えば)ユーザーID
# wait_seconds: int, # 排他処理や並列処理が動作していることを確認するために使う
# locks: Dict[str, Lock], # ユーザーID単位で排他処理するためのLockを格納しているDict
# start: float, # 親プロセスが開始した時間
):
s, id, wait_seconds, locks, start = args
# ユーザー単位でそれぞれ排他処理する(守りたい処理順序があるときなど。)
with locks[id]:
try:
with open(f"{id}.txt", "r") as filer:
lines = filer.readlines()
except FileNotFoundError as e:
lines = []
try:
with open(f"{id}.txt", "w") as filew:
time.sleep(wait_seconds)
timedelta = int(time.time() - start.value)
lines.append(f"{s}(通算{timedelta}秒経過) ")
print("".join(lines), file=filew) # print()でfileにテキストを追記
return timedelta
except Exception as e:
with open(f"{id}_error.txt", "w") as ew:
print(e, file=ew)
return 0
if __name__ == "__main__":
with Manager() as manager:
locks = dict()
locks.update(
**{
"A": manager.Lock(),
"B": manager.Lock(),
}
)
start = manager.Value("q", time.time()) # https://docs.python.org/ja/3/library/array.html
params = [
("A1", "A", 4, locks, start),
("A2", "A", 1, locks, start),
("B1", "B", 2, locks, start),
("B2", "B", 3, locks, start),
]
with ProcessPoolExecutor(max_workers=2) as executor:
[
print(r)
for r in executor.map(
f,
params,
chunksize=2, # [A1,A2] と[B1,B2] の2集団に分けて処理される
)
]
print("全部完了。")
# 4
# 5
# 2
# 5
# 全部完了。
- A.txt
A2(通算1秒経過)
A1(通算5秒経過) # ちゃんと"A2"のtime.sleep(1)の後になっている
- B.txt
B1(通算2秒経過)
B2(通算5秒経過) # ちゃんと"B1"のtime.sleep(2)の後になっている
複数のサーバで構成されている分散環境での分散排他制御など
redisを使ったredlockなどがあるようですが、
https://redis.io/docs/reference/patterns/distributed-locks/
これ以上は、例えばロードバランサで同一ユーザーからのアクセスは同一インスタンスに転送されるようにしたり、そもそもロックに頼らなくても安全なRESTAPI設計を目指した方が良いみたいです。