本記事は,「株式会社RetailAI X Advent Calendar」の18日目の記事です.昨日の記事は,@utamoriさんの「Hankoを使ってAndroidでPassKeyログインするやつをやりたかった」でした.
端書
以降の記載内容は次の資料を参考にして書かれております.
また,使用した環境は次のとおりです.
- macOS Ventura 13.0.1
- pyenv 2.3.5
- python 3.9.5
- pip 22.3.1
- prefect 1.4.1
目的
pythonを用いた開発業務に携わる中で,例えば「かなり時間のかかる処理を実行して,しばらく放置していたら膨大なエラーログが出ていた」ということがあり,そのことに気づくまでに時間を要したということが多々ありました.1から通知機能を実装するのは面倒ですし,何かしら良いツールはないものかと思っていたところ,Prefectという物があることを知り,今回触ってみようと思った次第です.
注意
現在,PrefectにはVer. 1とVer. 2があります.これら2つのモジュールには,pythonのコーディングにおいて様々な違いがありますので注意が必要です.この記事では,Prefect Ver. 1を使って実行した結果を記載しております.コーディング方法の違いはPrefect Ver. 2の公式Docsの方に詳しく書かれています.
試用
概要
基本的な実装として,task
とFlow
があります.イメージは以下のとおりです.
import prefect
from prefect import task, Flow
@task
def task1(name="Task 1"):
logger = prefect.context.get("logger")
logger.info("I'm task 1")
@task
def task2(name="Task 2"):
logger = prefect.context.get("logger")
logger.info("I'm task 2")
@task
def task3(name="Task 3"):
logger = prefect.context.get("logger")
logger.info("I'm task 3")
# define flow
with Flow("Logging flow") as flow:
task1()
task2()
task3()
flow.run()
環境構築
python,pipのセットアップについての記載は省略します.prefectをインストールするため,以下のコマンドを実行します.
$ pip install "prefect==1.4.1"
バージョンが表示されればOKです.
$ prefect version
1.4.1
実行
上記のpythonスクリプトを実行します.
$ python main.py
[2022-12-17 09:10:16+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Logging flow'
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task1': Starting task run...
[2022-12-17 09:10:16+0900] INFO - prefect.task1 | I'm task 1
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task1': Finished task run for task with final state: 'Success'
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task2': Starting task run...
[2022-12-17 09:10:16+0900] INFO - prefect.task2 | I'm task 2
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task2': Finished task run for task with final state: 'Success'
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task3': Starting task run...
[2022-12-17 09:10:16+0900] INFO - prefect.task3 | I'm task 3
[2022-12-17 09:10:16+0900] INFO - prefect.TaskRunner | Task 'task3': Finished task run for task with final state: 'Success'
[2022-12-17 09:10:16+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
各タスク内の実行内容はもちろん,開始と終了のログを自動でとっています.試しに,task3
のなかにエラーを発生させてみました.
@task
def task3(name="Task 3"):
logger = prefect.context.get("logger")
logger.info("I'm task 3")
raise("Some errors occurred")
[2022-12-17 09:19:07+0900] INFO - prefect.TaskRunner | Task 'task3': Starting task run...
[2022-12-17 09:19:07+0900] INFO - prefect.task3 | I'm task 3
[2022-12-17 09:19:07+0900] ERROR - prefect.TaskRunner | Task 'task3': Exception encountered during task execution!
[2022-12-17 09:19:07+0900] INFO - prefect.TaskRunner | Task 'task3': Finished task run for task with final state: 'Failed'
[2022-12-17 09:19:07+0900] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
ログにエラーを通知しており,最後にFlow run FAILED
の出力があります.
まとめ
今回,Prefectのtask
とFlow
を試用しました.公式Docsによると
- クラウドでのflow実行
- slackへの通知
を簡単に実装する事が可能らしいので,近いうちに記事にまとめます.
最後まで読んでいただきありがとうございます.
明日は@syuri_nさんの記事です.