1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

みなさん、こんにちは!

Pythonプログラムを動かしているとき、ネットワークエラー等で途中で落ちてしまった経験はありませんか?

リトライを入れていれば処理を再実行することはできますが、データベースへのデータ挿入を連続で行ったりしていた場合、処理を最初から行うことになり悲しい思いをします。

今回は、そのような場合に使えるPythonプログラムの実装についてご紹介します。

アプローチ

なんらかのエラーが発生しリトライさせる場合、その時点までの処理状況を記録しておく必要があります。
変数に記録しておく手もありますが、安定性を考えてファイルで進捗を記録することにします。
ファイルは進捗を記録するだけなので、読み書きでそれほど大きなオーバーヘッドは発生しません。

上記を踏まえて、以下のような形で実装します。

  • 繰り返し処理の最初に進捗を確認する
  • 繰り返し処理が1回完了するたびにファイルに進捗を記録する

実装

データをストリーミングする処理を例に実装しました。

loader.py

qiita.rb
import os
import json

class Loader:
    def __init__(self):
        self.data = [] # データ格納用リスト
        self.stream_progress_file = "stream_progress.json" # 進捗記録用ファイル

    def load(self):
        # データロード
        # 10個の値をリストに追加するだけ
        for i in range(10):
            self.data.append(i)

    def read_progress(self) -> int:
        # 進捗をファイルから読み出す
        # ファイルが存在しない場合、0を返す
        try:
            with open(self.stream_progress_file, "r") as f:
                data = json.load(f)
                return data["index"]
        except FileNotFoundError:
            return 0

    def save_progress(self, data: dict):
        # 現在の進捗をファイルに記録する
        with open(self.stream_progress_file, "w") as f:
            json.dump(data, f)

    def stream(self):
        # 進捗状況を確認する
        start_index = self.read_progress()

        # 取得した進捗時点から処理を再開
        for i in range(start_index, len(self.data)):
            self.save_progress({"index": i})         
            yield self.data[i]

    def __del__(self):
        # 進捗記録用ファイルの削除
        try:
            os.remove(self.stream_progress_file)
            print("Cleaned up stream progress file.")
        except FileNotFoundError:
            pass

上記の loader.py は、ロードしたデータをストリーミング処理するものをLoaderクラスとして実装したものです。
いくつか処理を書いていますが、大事なポイントは以下の3点です。

  • self.stream_progress_fileに記載のファイルで進捗を管理
  • stream()の最初にread_progress()で進捗を確認
  • stream()の繰り返し処理が1回完了するごとにsave_progress()で進捗を記録

resume.py

qiita.rb
import random

from tenacity import retry, stop_after_attempt

from loader import Loader

@retry(stop=stop_after_attempt(3))
def handle_data(loader: Loader):
    for data in loader.stream():
        # ランダムにエラーを発生させる
        if random.random() > 0.1:
            print(data)
        else:
            print("Error!")
            raise Exception

def main():
    loader = Loader()
    loader.load()
    handle_data(loader)

if __name__ == "__main__":
    try:
        main()
        print("Successfully completed.")
    except Exception as e:
        print(f"Maximum retries reached. The program failed: {e}")

上記の resume.py は処理を実行する本体のファイルです。

main()loaderの作成とストリーミング処理を行います。
ストリーミング処理を行うhandle_data()の部分は、tenacityライブラリで最大3回リトライさせる形にしています。

実行すると以下のようになります。

qiita.rb
$ python resume.py 
0
1
Error!
2
3
4
5
6
7
8
9
Cleaned up stream progress file.
Successfully completed.

2のところで1回エラーが発生したものの、途中から問題なく再開できていることがわかります。

まとめ

Pythonプログラムでエラーになった処理を途中から再開する方法についてご紹介しました。

データベースにデータを連続で挿入する処理があり、認証の有効期限切れ等で不定期にエラーが発生していたのですが、今回の実装にしたことで安定した処理を実現できています。
一度に1200件ほど処理していますが、ファイル読み書きの負荷もわずかで問題なく動作しています。

同じような課題に直面されている方は、ぜひ本記事の方法を試してみてください。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?