はじめに
この投稿はアイスタイル Advent Calendar 2024 の14日目の記事です。
こんにちは。 アイスタイルで、データ分析基盤を担当しているosadamです。
アイスタイルではデータ加工のパイプライン等を制御する方法としてワークフローツールであるPrefectを使用しています。
Prefect内のflowやtaskでアラート発火を制御しているのですが、その中で困ったこととその解決方法をまとめました。
現状
Prefectにはflow,taskのデコレータとしてretriesというパラメータを設定できます。
設定すると、flow,taskが失敗した場合、リトライ回数がなくなるか成功するまでそのflow,taskを繰り返し実行してくれます。
パラメータを設定するだけで、繰り返し実行してくれるので便利です。
from prefect import flow, task, get_run_logger
@task
def task1():
logger = get_run_logger()
logger.info(message)
return
@task
def task2():
logger = get_run_logger()
if <条件文1>:
return
else:
logger.error("task2が失敗しました")
return False
@flow(retries=100,retry_delay_seconds=10)
def flow():
task1()
task2()
実際に実装していた内容を上記のように疑似コードにしてみました。
このコードだと何が起こるかというと、<条件文1>が100回失敗するとエラーログが100回流れてしまいます。
ただ、本当に実装したい内容としてはリトライ回数が0回になってもflowが失敗した場合のみエラーログを流したいです。
解決方法
その1 retriesを設定する箇所を変更する
今回の場合、task2で失敗する可能性があるのということがわかっていました。
そのためtask2にretriesを設定し、アラート発火はflowで実装します。
from prefect import flow, task, get_run_logger
@task
def task1():
logger = get_run_logger()
logger.info(message)
return
@task(retries=100,retry_delay_seconds=10)
def task2():
logger = get_run_logger()
if <条件文1>:
return
else:
return False
@flow
def flow():
logger = get_run_logger()
task1()
try:
task2()
except Exception as e:
logger.error("task2が失敗しました")
return False
こうすることで、task2が失敗した場合にtask2が成功するかリトライ回数がなくなるまで再実行し、リトライ回数がなくなった場合のみエラーログを出すことができます。
その2 get_run_countを使用する
retriesはデコレータで定義されている値のため、直接参照することはできませんが、prefectで定義されているget_run_count()関数を用いてリトライ回数を制御することができます。
from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run
@task
def task1():
logger = get_run_logger()
logger.info(message)
return
@task
def task2():
logger = get_run_logger()
if <条件文1>:
return
else:
return False
@flow(retries=100,retry_delay_seconds=10)
def flow():
task1()
task2()
count = flow_run.get_run_count()
logger = get_run_logger()
if count == 100:
logger.error("flowが失敗しました")
今回、100をべた書きしてしまいましたが、環境変数か何かで設定できると良いと思います。
こうすることで、リトライ回数がない時のみアラート発火ができます。
ただ、これだとアラート内容だけ見るとどこでエラーが発生したか分かりづらいです。
その3 deploymentを分ける
deploymentとは簡単にいうとflow実行するためのトリガーを設定する方法です。
今回task2を別deploymentに切り出して、task2が成功した場合元のflowを実行するdeploymentをキックするという実装です。
from prefect import flow, task, get_run_logger
from prefect.deployments import run_deployment
@task(retries=100,retry_delay_seconds=10)
def task2():
if <条件文1>:
return
else:
return False
@flow
def flow():
logger = get_run_logger()
try:
task2()
except Exception as e:
logger.error("task2が失敗しました")
return
run_deployment(name=<deployment名>)
ちなみに、今回はこの方法を実装しました。
なぜこの方法を取ったかというと、task2で行っている条件分岐は本来のflow内で行いたい内容と離れており、条件分岐の部分は元のflowと分けるべきだ、と判断したためです。
まとめ
今回はPrefectのflow,taskのデコレータretriesを使用した場合でも、設計通りの条件でアラートを発火する修正方法をまとめました。
各タスクの役割を理解し、どこでアラート発火させるべきか改めて整理するべきだと改めて思いました!