はじめに
python のライブラリ Prefect を少し触ってみました。
ワークフローが思った以上に簡単に構築できて良さげだったので、試したことのメモをまとめました。
Prefect について
ワークフローを構築する python ライブラリ。
参考資料
- Prefect の Github : https://github.com/PrefectHQ/prefect
- Prefect API Document : https://docs.prefect.io/api/latest/
インストール
pip install prefect
簡単な用語説明
超簡単な用語説明
- Task
- 作業単位、一つのまとまった処理
- Edge
- 2つの Task を結合したもの
- 前段 Task の処理結果を key を媒体に次の Task に渡せる
- Flow
- Task の集合
- ワークフロー
今回はこれらを組み合わせて簡単なワークフローを構築してみます。
ワークフローを構築してみる
簡単にワークフローを構築してみます。
今回はお試しで以下のワークフローを Prefect で構築してみます。
- 数値1を生成 => 10を追加 => 2倍にする => 結果の数値を print
書いたコード
from prefect import task, Task, Flow
# Task クラスを継承してタスク定義、run メソッドにタスクで実行したい処理を定義する
class GenerateTask(Task):
def run(self):
print(f"=========GenerateTask===============")
return 1
class Add10Task(Task):
def run(self, number):
print(f"=========Add10Task {number}===============")
return number + 10
class DoubleTask(Task):
def run(self, number):
print(f"=========DoubleTask {number}===============")
return number * 2
# @task を使った版のタスク定義
@task
def print_task(result):
print(f"=========PrintTask {result}===============")
# Flow を作成
sample_flow = Flow(name="sample_flow")
# Task のインスタンス化
generate_task = GenerateTask()
add_10_task = Add10Task()
double_task = DoubleTask()
print_task = print_task
# Task 同士を結合した Edge を登録
# 前段の処理結果を key で次のタスクに受け渡し
sample_flow.add_edge(generate_task, add_10_task, key='number')
sample_flow.add_edge(add_10_task, double_task, key='number')
sample_flow.add_edge(double_task, print_task, key='result')
# Flow の実行
sample_flow.run()
実行結果
[2021-12-14 22:49:16+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'GenerateTask': Starting task run...
=========GenerateTask===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'GenerateTask': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'Add10Task': Starting task run...
=========Add10Task 1===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'Add10Task': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'DoubleTask': Starting task run...
=========DoubleTask 11===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'DoubleTask': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'print_task': Starting task run...
=========PrintTask 22===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'print_task': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
各タスクが実行され、key による値の受け渡しもできていそうです。
まとめ
お手軽にワークフローが構築できて良さそうでした。
今回は簡単なワークフローを構築するところまでしか試せていませんが、
「複数の依存 Task がある場合の Task / Flow / Edge の定義」や「ワークフローの途中で失敗した際のリトライ処理は可能か」あたりは気になっているので、
もう少し掘り下げてみようと思います。