LoginSignup
3
0

More than 1 year has passed since last update.

Prefect で簡単なワークフローを構築してみた

Last updated at Posted at 2021-12-14

はじめに

python のライブラリ Prefect を少し触ってみました。
ワークフローが思った以上に簡単に構築できて良さげだったので、試したことのメモをまとめました。

Prefect について

ワークフローを構築する python ライブラリ。

参考資料

インストール

pip install prefect

簡単な用語説明

超簡単な用語説明

  • Task
    • 作業単位、一つのまとまった処理
  • Edge
    • 2つの Task を結合したもの
    • 前段 Task の処理結果を key を媒体に次の Task に渡せる
  • Flow
    • Task の集合
    • ワークフロー

今回はこれらを組み合わせて簡単なワークフローを構築してみます。

ワークフローを構築してみる

簡単にワークフローを構築してみます。

今回はお試しで以下のワークフローを Prefect で構築してみます。

  • 数値1を生成 => 10を追加 => 2倍にする => 結果の数値を print

書いたコード

from prefect import task, Task, Flow

# Task クラスを継承してタスク定義、run メソッドにタスクで実行したい処理を定義する
class GenerateTask(Task):
    def run(self):
        print(f"=========GenerateTask===============")
        return 1

class Add10Task(Task):
    def run(self, number):
        print(f"=========Add10Task {number}===============")
        return number + 10

class DoubleTask(Task):
    def run(self, number):
        print(f"=========DoubleTask {number}===============")
        return number * 2

# @task を使った版のタスク定義
@task
def print_task(result):
    print(f"=========PrintTask {result}===============")

# Flow を作成
sample_flow = Flow(name="sample_flow")

# Task のインスタンス化
generate_task = GenerateTask()
add_10_task = Add10Task()
double_task = DoubleTask()
print_task = print_task

# Task 同士を結合した Edge を登録
# 前段の処理結果を key で次のタスクに受け渡し
sample_flow.add_edge(generate_task, add_10_task, key='number')
sample_flow.add_edge(add_10_task, double_task, key='number')
sample_flow.add_edge(double_task, print_task, key='result')

# Flow の実行
sample_flow.run()

実行結果

[2021-12-14 22:49:16+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'sample_flow'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'GenerateTask': Starting task run...
=========GenerateTask===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'GenerateTask': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'Add10Task': Starting task run...
=========Add10Task 1===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'Add10Task': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'DoubleTask': Starting task run...
=========DoubleTask 11===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'DoubleTask': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'print_task': Starting task run...
=========PrintTask 22===============
[2021-12-14 22:49:16+0900] INFO - prefect.TaskRunner | Task 'print_task': Finished task run for task with final state: 'Success'
[2021-12-14 22:49:16+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

各タスクが実行され、key による値の受け渡しもできていそうです。

まとめ

お手軽にワークフローが構築できて良さそうでした。

今回は簡単なワークフローを構築するところまでしか試せていませんが、
「複数の依存 Task がある場合の Task / Flow / Edge の定義」や「ワークフローの途中で失敗した際のリトライ処理は可能か」あたりは気になっているので、
もう少し掘り下げてみようと思います。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0