5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

アイスタイルAdvent Calendar 2024

Day 14

Prefectでflowをリトライした時にアラート制御をしたい

Last updated at Posted at 2024-12-13

はじめに

この投稿はアイスタイル Advent Calendar 2024 の14日目の記事です。
こんにちは。 アイスタイルで、データ分析基盤を担当しているosadamです。
アイスタイルではデータ加工のパイプライン等を制御する方法としてワークフローツールであるPrefectを使用しています。

Prefect内のflowやtaskでアラート発火を制御しているのですが、その中で困ったこととその解決方法をまとめました。

現状

Prefectにはflow,taskのデコレータとしてretriesというパラメータを設定できます。
設定すると、flow,taskが失敗した場合、リトライ回数がなくなるか成功するまでそのflow,taskを繰り返し実行してくれます。
パラメータを設定するだけで、繰り返し実行してくれるので便利です。

from prefect import flow, task, get_run_logger

@task
def task1():
    logger = get_run_logger()
    logger.info(message)
    return

@task
def task2():
    logger = get_run_logger()
    if <条件文1>:
       return
    else:
       logger.error("task2が失敗しました")
       return False

@flow(retries=100,retry_delay_seconds=10)
def flow():
    task1()
    task2()

実際に実装していた内容を上記のように疑似コードにしてみました。
このコードだと何が起こるかというと、<条件文1>が100回失敗するとエラーログが100回流れてしまいます。
ただ、本当に実装したい内容としてはリトライ回数が0回になってもflowが失敗した場合のみエラーログを流したいです。

解決方法

その1 retriesを設定する箇所を変更する

今回の場合、task2で失敗する可能性があるのということがわかっていました。
そのためtask2にretriesを設定し、アラート発火はflowで実装します。

from prefect import flow, task, get_run_logger

@task
def task1():
    logger = get_run_logger()
    logger.info(message)
    return

@task(retries=100,retry_delay_seconds=10)
def task2():
    logger = get_run_logger()
    if <条件文1>:
       return
    else:
       return False

@flow
def flow():
    logger = get_run_logger()
    task1()
    try:
        task2()
    except Exception as e:
         logger.error("task2が失敗しました")
         return False

こうすることで、task2が失敗した場合にtask2が成功するかリトライ回数がなくなるまで再実行し、リトライ回数がなくなった場合のみエラーログを出すことができます。

その2 get_run_countを使用する

retriesはデコレータで定義されている値のため、直接参照することはできませんが、prefectで定義されているget_run_count()関数を用いてリトライ回数を制御することができます。

from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run

@task
def task1():
    logger = get_run_logger()
    logger.info(message)
    return

@task
def task2():
    logger = get_run_logger()
    if <条件文1>:
       return
    else:
       return False

@flow(retries=100,retry_delay_seconds=10)
def flow():
    task1()
    task2()
    count = flow_run.get_run_count()
    logger = get_run_logger()
    if count == 100:
        logger.error("flowが失敗しました")

今回、100をべた書きしてしまいましたが、環境変数か何かで設定できると良いと思います。
こうすることで、リトライ回数がない時のみアラート発火ができます。
ただ、これだとアラート内容だけ見るとどこでエラーが発生したか分かりづらいです。

その3 deploymentを分ける

deploymentとは簡単にいうとflow実行するためのトリガーを設定する方法です。
今回task2を別deploymentに切り出して、task2が成功した場合元のflowを実行するdeploymentをキックするという実装です。

from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment


@task(retries=100,retry_delay_seconds=10)
def task2():
    if <条件文1>:
       return
    else:
       return False

@flow
def flow():
    logger = get_run_logger()
    try:
        task2()
    except Exception as e:
        logger.error("task2が失敗しました")
        return

    run_deployment(name=<deployment名>)

ちなみに、今回はこの方法を実装しました。
なぜこの方法を取ったかというと、task2で行っている条件分岐は本来のflow内で行いたい内容と離れており、条件分岐の部分は元のflowと分けるべきだ、と判断したためです。

まとめ

今回はPrefectのflow,taskのデコレータretriesを使用した場合でも、設計通りの条件でアラートを発火する修正方法をまとめました。
各タスクの役割を理解し、どこでアラート発火させるべきか改めて整理するべきだと改めて思いました!

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?