0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pythonで独立した実行に対して重複しないようにJobを与えたときのメモ

Last updated at Posted at 2022-04-17

追記 [2023/4/16]

ChatGPT先生に伺ったところ,fcntlが使えるとのことでした.

import fcntl

# 書き込み
with open("myfile.txt", "w") as f:
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        # The file is locked, so it's safe to write to it
        f.write("some data")
    except IOError:
        # The file is already locked by another process
        # Handle the error or retry after a delay

# 読み込み
with open("myfile.txt", "r") as f:
    try:
        fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
        # The file is locked, so it's safe to read it
        contents = f.read()
        print(contents)
    except IOError:
        # The file is already locked by another process
        # Handle the error or retry after a delay

はじめに

まずは問題設定を記述します.

  1. $N$ 個の独立した実行があり,それぞれの実行に対して,Uniqueな config を渡し,それに基づいた実行を独立に処理する
  2. SQL alchemyを利用したデータベースに対して,configを渡す毎に対応する行の run flagTrue にする
  3. ただし,SQL alchemyに対する SELECT クエリと UPDATE クエリの間に他の実行が SELECT クエリを実行してしまうと2つの実行に対して同一の config が渡されてしまうためこれを何とか避けたい.

より具体的に言うと,以下のデータベースにある table for config にある全ての config に対して,ただ一回のみ実行を行う.
その結果を table for record に格納する.
また,ある config による実行が行われたかどうかは table for run status により管理する.
問題となるのは table for run status 確認作業と更新作業の間(つまり,その実行がDatabaseに接続を確立していない)にその他の実行が更新作業を行ってしまう可能性があること.

Database
    |_ table for config
        |_ config ID
        |_ config param 1
        |_ config param 2
        :
        |_ config param D
    |_ table for run status
        |_ config ID
        |_ running/evaluated or not (bool)
    |_ table for record
        |_ config ID
        |_ result 1
        |_ result 2
        :
        |_ result M

解決策

以後,Tokenの生成はtemp ディレクトリ内において完結するものとする.

  1. $N$ 個の実行開始時にただひとつだけ target.tokenという空ファイルを生成する.
  2. 各実行に対してUniqueなTime hashを生成し,それぞれ target_<time hash>.token となるようなファイル名を用意しておく.
  3. ある実行に対するconfigを生成する前に target.token という名前をその実行における target_<time hash>.token で書き換える.(また,このときrenameできない限りは生成処理に入れないようにする)
  4. configを生成し,データベース内の run flag を更新した後に target.token という名前に戻しておく.

これをすることによって,config生成時において run flag を更新するためにデータベースに入ることができるのはただ一つの実行のみとなるためconfigの生成もどの実行においても重複なく行うことができます.

実装イメージ

import glob
import hashlib
import os
import time


hash = hashlib.sha1()
hash.update(str(time.time()).encode("utf-8"))
timehash = hash.hexdigest()
public_token = "temp/target.token"
private_token = f"temp/target_{timehash}.token"
token_pattern = "temp/target_*.token"


def publish_token():
    while True:
        try:
            os.rename(public_token, private_token)
            return
        except FileNotFoundError:
            time.sleep(1e-6)


def remove_token():
    os.rename(private_token, public_token)


def query():
    # 1. データベースからconfigを取り出す
    # 2. 該当する行の `run flag` をTrueにする
    # 3. config を返す (もし`run flag`が全てTrueならNoneを返す)



def sample():
    publish_token()
    config = query()
    remove_token()
    return config


def run():
    n_tokens = len(glob.glob(token_pattern))
    if n_tokens == 0:
        with open(public_token, mode="w"):
            pass
    elif n_tokens > 1:
        raise FileExistsError

    while True:
        config = sample()
        if config is None:
            # 全て評価されたのでNoneを得る ==> 終了
            return
        results = func(config)
        # データベースに記録する

最後に

色々調べて,flock等他に使えそうな方法はいくつかありましたが,どれも試してうまくいかなかったため結局この方法に落ち着きました.
もし他に良い方法があればどなたかご教示ください.

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?