追記 [2023/4/16]
ChatGPT先生に伺ったところ,fcntlが使えるとのことでした.
import fcntl
# 書き込み
with open("myfile.txt", "w") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
# The file is locked, so it's safe to write to it
f.write("some data")
except IOError:
# The file is already locked by another process
# Handle the error or retry after a delay
# 読み込み
with open("myfile.txt", "r") as f:
try:
fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
# The file is locked, so it's safe to read it
contents = f.read()
print(contents)
except IOError:
# The file is already locked by another process
# Handle the error or retry after a delay
はじめに
まずは問題設定を記述します.
- $N$ 個の独立した実行があり,それぞれの実行に対して,Uniqueな config を渡し,それに基づいた実行を独立に処理する
- SQL alchemyを利用したデータベースに対して,configを渡す毎に対応する行の
run flagをTrueにする - ただし,SQL alchemyに対する
SELECTクエリとUPDATEクエリの間に他の実行がSELECTクエリを実行してしまうと2つの実行に対して同一の config が渡されてしまうためこれを何とか避けたい.
より具体的に言うと,以下のデータベースにある table for config にある全ての config に対して,ただ一回のみ実行を行う.
その結果を table for record に格納する.
また,ある config による実行が行われたかどうかは table for run status により管理する.
問題となるのは table for run status 確認作業と更新作業の間(つまり,その実行がDatabaseに接続を確立していない)にその他の実行が更新作業を行ってしまう可能性があること.
Database
|_ table for config
|_ config ID
|_ config param 1
|_ config param 2
:
|_ config param D
|_ table for run status
|_ config ID
|_ running/evaluated or not (bool)
|_ table for record
|_ config ID
|_ result 1
|_ result 2
:
|_ result M
解決策
以後,Tokenの生成はtemp ディレクトリ内において完結するものとする.
- $N$ 個の実行開始時にただひとつだけ
target.tokenという空ファイルを生成する. - 各実行に対してUniqueなTime hashを生成し,それぞれ
target_<time hash>.tokenとなるようなファイル名を用意しておく. - ある実行に対するconfigを生成する前に
target.tokenという名前をその実行におけるtarget_<time hash>.tokenで書き換える.(また,このときrenameできない限りは生成処理に入れないようにする) - configを生成し,データベース内の
run flagを更新した後にtarget.tokenという名前に戻しておく.
これをすることによって,config生成時において run flag を更新するためにデータベースに入ることができるのはただ一つの実行のみとなるためconfigの生成もどの実行においても重複なく行うことができます.
実装イメージ
import glob
import hashlib
import os
import time
hash = hashlib.sha1()
hash.update(str(time.time()).encode("utf-8"))
timehash = hash.hexdigest()
public_token = "temp/target.token"
private_token = f"temp/target_{timehash}.token"
token_pattern = "temp/target_*.token"
def publish_token():
while True:
try:
os.rename(public_token, private_token)
return
except FileNotFoundError:
time.sleep(1e-6)
def remove_token():
os.rename(private_token, public_token)
def query():
# 1. データベースからconfigを取り出す
# 2. 該当する行の `run flag` をTrueにする
# 3. config を返す (もし`run flag`が全てTrueならNoneを返す)
def sample():
publish_token()
config = query()
remove_token()
return config
def run():
n_tokens = len(glob.glob(token_pattern))
if n_tokens == 0:
with open(public_token, mode="w"):
pass
elif n_tokens > 1:
raise FileExistsError
while True:
config = sample()
if config is None:
# 全て評価されたのでNoneを得る ==> 終了
return
results = func(config)
# データベースに記録する
最後に
色々調べて,flock等他に使えそうな方法はいくつかありましたが,どれも試してうまくいかなかったため結局この方法に落ち着きました.
もし他に良い方法があればどなたかご教示ください.