追記 [2023/4/16]
ChatGPT先生に伺ったところ,fcntl
が使えるとのことでした.
import fcntl
# 書き込み
with open("myfile.txt", "w") as f:
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
# The file is locked, so it's safe to write to it
f.write("some data")
except IOError:
# The file is already locked by another process
# Handle the error or retry after a delay
# 読み込み
with open("myfile.txt", "r") as f:
try:
fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
# The file is locked, so it's safe to read it
contents = f.read()
print(contents)
except IOError:
# The file is already locked by another process
# Handle the error or retry after a delay
はじめに
まずは問題設定を記述します.
- $N$ 個の独立した実行があり,それぞれの実行に対して,Uniqueな config を渡し,それに基づいた実行を独立に処理する
- SQL alchemyを利用したデータベースに対して,configを渡す毎に対応する行の
run flag
をTrue
にする - ただし,SQL alchemyに対する
SELECT
クエリとUPDATE
クエリの間に他の実行がSELECT
クエリを実行してしまうと2つの実行に対して同一の config が渡されてしまうためこれを何とか避けたい.
より具体的に言うと,以下のデータベースにある table for config
にある全ての config に対して,ただ一回のみ実行を行う.
その結果を table for record
に格納する.
また,ある config による実行が行われたかどうかは table for run status
により管理する.
問題となるのは table for run status
確認作業と更新作業の間(つまり,その実行がDatabaseに接続を確立していない)にその他の実行が更新作業を行ってしまう可能性があること.
Database
|_ table for config
|_ config ID
|_ config param 1
|_ config param 2
:
|_ config param D
|_ table for run status
|_ config ID
|_ running/evaluated or not (bool)
|_ table for record
|_ config ID
|_ result 1
|_ result 2
:
|_ result M
解決策
以後,Tokenの生成はtemp
ディレクトリ内において完結するものとする.
- $N$ 個の実行開始時にただひとつだけ
target.token
という空ファイルを生成する. - 各実行に対してUniqueなTime hashを生成し,それぞれ
target_<time hash>.token
となるようなファイル名を用意しておく. - ある実行に対するconfigを生成する前に
target.token
という名前をその実行におけるtarget_<time hash>.token
で書き換える.(また,このときrenameできない限りは生成処理に入れないようにする) - configを生成し,データベース内の
run flag
を更新した後にtarget.token
という名前に戻しておく.
これをすることによって,config生成時において run flag
を更新するためにデータベースに入ることができるのはただ一つの実行のみとなるためconfigの生成もどの実行においても重複なく行うことができます.
実装イメージ
import glob
import hashlib
import os
import time
hash = hashlib.sha1()
hash.update(str(time.time()).encode("utf-8"))
timehash = hash.hexdigest()
public_token = "temp/target.token"
private_token = f"temp/target_{timehash}.token"
token_pattern = "temp/target_*.token"
def publish_token():
while True:
try:
os.rename(public_token, private_token)
return
except FileNotFoundError:
time.sleep(1e-6)
def remove_token():
os.rename(private_token, public_token)
def query():
# 1. データベースからconfigを取り出す
# 2. 該当する行の `run flag` をTrueにする
# 3. config を返す (もし`run flag`が全てTrueならNoneを返す)
def sample():
publish_token()
config = query()
remove_token()
return config
def run():
n_tokens = len(glob.glob(token_pattern))
if n_tokens == 0:
with open(public_token, mode="w"):
pass
elif n_tokens > 1:
raise FileExistsError
while True:
config = sample()
if config is None:
# 全て評価されたのでNoneを得る ==> 終了
return
results = func(config)
# データベースに記録する
最後に
色々調べて,flock
等他に使えそうな方法はいくつかありましたが,どれも試してうまくいかなかったため結局この方法に落ち着きました.
もし他に良い方法があればどなたかご教示ください.