Prefectとは
Prefectは、データエンジニアリングとデータサイエンスのための強力なワークフローオーケストレーションツールの一種です。
ワークフローオーケストレーションツールの歴史や主なツールについては、下記がとても分かりやすいです。
ワークフローツールの主な機能
(上記から引用)
- 実行制御: タスクの実行順序の制御やリトライなどを保証する
- 監視: ワークフローの実行状態やログを追跡できる
- スケーラビリティ: 並行処理や柔軟な計算リソースの拡張を行う
- 外部連携: データベースやETLツールなどと容易に連携可能
Prefectの特徴とAirflowとの違い
Prefectの主な特徴には、直感的なAPI、動的なワークフロー生成、高度な状態管理、簡易なデバッグ、および強力なUIがあります。Airflowに比べて、Prefectはよりプログラマティックにフレキシブルなワークフローの定義を可能にし、エラーハンドリングと状態管理が向上しています。
PrefectはAirflowを明確に意識して開発されており、公式から下記のドキュメントが出ています。
Why Not Airflow? | Prefect Docs
- API: ユーザーフレンドリーで軽量なAPI。Pythonicな方法でDAGを構築できる
-
Scheduling and Time: 時間の使い方が分かりやすい(AirflowのDAGに必須な
execution_date
は独特の概念だった) - The Scheduler Service: Prefectのスケジューラはシンプルで、ワークフローのロジックや実行には一切関与しない。新しいフローを作成し、Scheduled状態にするだけ
- Dataflow: タスクが直接データを交換できる(Airflowはタスクを独立させるためXComでメタデータデータベースを利用)
- Parametrized Workflows: PrefectのParameterは、実行時に値を(オプションで)オーバーライドできる(AirflowのDAGは固定スケジュールで実行され、入力を受け取らない)
- Dynamic Workflows: 上流タスクの出力に基づいて、実行時にタスクの新しいコピーを動的に生成することができる
- Versioned Workflows: Prefect Cloudでは、同じ名前のフローが既に存在するプロジェクトにフローをデプロイすると、バージョン管理が自動的に行われる
- Local Testing: シンプルなため・かつ色々用意されており、ローカルでのテストがしやすい
- UI: 様々な機能をサポートする美しいリアルタイムUIを提供する
Prefectの構成要素
Prefectの主要なコンポーネントは次のとおりです。
- フロー: ワークフロー全体を表します。タスクの集合体です。
- タスク: ワークフロー内で実行される個々の作業項目です。
Prefectのはじめ方
Prefectのセットアップ
Prefectを使用するには、まずPython環境にインストールする必要があります。次のコマンドを使用して、Prefect 2.0をインストールするだけでまずは利用を開始することができます。
pip install prefect
Prefectで開発・実行
タスクとフローをコーディングする
Prefectを使用して、簡単なフローを作成します。
Prefectでは、タスク間でデータを簡単に受け渡すことができます。以下の例では、加算タスクと乗算タスクを定義し、加算の結果を乗算タスクへ渡しています。
from prefect import flow, task
@task
def add(x, y):
return x + y
@task
def multiply(x, y):
return x * y
@flow
def my_complex_flow(x, y, z):
add_result = add(x, y)
final_result = multiply(add_result, z)
print(f"最終結果は {final_result} です。")
# フローの実行
my_complex_flow(1, 2, 3)
このコードでは、add
関数 と multiply
関数を @task
デコレータで装飾することによりタスクに変換し、my_complex_flow
フロー内で実行しています。
フローを実行する
フローを定義した後、単純に関数として呼び出すことで実行できます。上記の例では、my_complex_flow()
を呼び出すことでフローが実行されます。
フローへのパラメータの渡し方
上記の例では、my_complex_flow
フローは3つのパラメータx, y, zを受け取ります。これらのパラメータはフローの実行時に指定します。
主な機能の詳細
実行順序制御
上記例の通り、タスクのアウトプットで実行順序を制御することができます。
また、フロー内のタスクの実行順序を明示的に制御することも可能です。タスク間の依存関係を定義することで、実行順序を制御できます。例えば、task_2
がtask_1
の結果に依存している場合、task_1
の完了後にのみtask_2
が実行されます。
@flow
def example_flow():
result1 = task_1()
result2 = task_2(upstream_tasks=[result1])
upstream_tasks
やset_dependencies
メソッドを使用して、タスクの実行順序を制御することができます。
リトライ
自動リトライ
Prefectでは、タスクやフローの実行に失敗した場合のリトライを簡単に設定できます。@task
デコレータのretries
パラメータを使用して、リトライ回数を指定し、retry_delay
パラメータでリトライ間の遅延を設定できます。
from prefect import task, flow
from datetime import timedelta
@task(retries=3, retry_delay=timedelta(seconds=10))
def unreliable_task():
# 実装
pass
この設定により、unreliable_task
は最大3回までリトライされ、リトライ間には10秒の遅延があります。
実行状態の追跡
Prefect UIを使用すると、過去のフロー実行の履歴を確認し、成功、失敗、その他の状態に関する詳細を見ることができます。
ダッシュボードを起動するには、コマンドラインから次のコマンドを実行します。
$ prefect server start
このコマンドを実行すると、ブラウザでダッシュボードが開きます。ダッシュボードでは、フローの実行履歴、実行中のフロー、スケジュールされたフロー、フローの実行結果など、さまざまな情報を確認することができます。
ログの追跡
Prefect UIを使用すると、過去のフロー実行の履歴を確認し、ログの詳細を見ることができます。
並列実行
Prefect 2.0では、特に並列実行を明示的に指定する必要はありません。代わりに、タスク間に循環しない依存関係がある場合、実行システムは自動的に可能な限りタスクを並列に実行します。
リソースのスケール
Prefectでは、タスクごとに異なる実行環境(コンテナを含む)を指定することができます。これにより、タスクが特定の依存関係や環境で実行されるように設定できます。
外部連携
- Slack通知
import requests
from prefect import flow, task
from prefect.tasks import task_input_hash
# Slack通知関数
def send_slack_message(message):
webhook_url = "YOUR_SLACK_WEBHOOK_URL"
slack_data = {'text': message}
response = requests.post(webhook_url, json=slack_data)
if response.status_code != 200:
raise ValueError(f"Request to Slack returned an error {response.status_code}, the response is:\n{response.text}")
# ステートハンドラ
def slack_state_handler(obj, old_state, new_state):
if new_state.is_failed():
send_slack_message(f"❌ {obj} has failed.")
elif new_state.is_completed():
send_slack_message(f"✅ {obj} has completed successfully.")
return new_state
@task
def sample_task():
# タスクの処理
return "タスクの実行結果"
@flow(state_handlers=[slack_state_handler])
def my_flow():
result = sample_task()
my_flow()
この例では、slack_state_handler関数がステートハンドラとしてフローに登録されています。このハンドラは、フローの状態が変化するたびに呼び出され、フローが失敗した場合や成功裏に完了した場合にSlackに通知します。
Tips
標準ディレクトリ構成は?
Prefectのプロジェクトでは、特定のディレクトリ構成を採用することが推奨されますが、必須ではありません。一般的には、**flows/
ディレクトリにフローの定義を、tasks/
ディレクトリにタスクの定義を格納します。lib/
**ディレクトリには共通のライブラリやユーティリティを置きます。
Prefect 2.0で追加された機能は?
Prefect 2.0では、より柔軟なワークフロー定義、改善されたUI、動的なタスク生成、改善されたエラーハンドリングなど、多くの新機能が追加されました。
下記が分かりやすかったです。
Prefect 2.0に入門する
名称の変更などもありました。(orion
-> server
など)
prefect.yaml
にかけることは?
**prefect.yaml
**ファイルは、Prefectフローの設定を管理するために使用されます。このファイルには、フローの実行スケジュール、ログの設定、実行環境の設定など、多くの設定項目を記述することができます。また、プロジェクト固有の設定やカスタムストレージオプションの指定にも使用されます。
Prefect Cloudとは
Prefect Cloudは、Prefectをクラウド上で実行するためのマネージドサービスです。これにより、ワークフローのオーケストレーション、監視、スケーリングが簡単になり、ユーザーはインフラストラクチャの管理から解放され、ビジネスロジックの開発に集中できます。Prefect Cloudは、フローの実行スケジュール、リアルタイムの監視、ログ収集、セキュリティ機能など、エンタープライズレベルの機能を提供します。
実行履歴の永続化
Prefect UIを使用してタスクやフローの実行結果を集約し、過去の実行履歴も確認できるようにするためには、以下のステップに従います。このプロセスでは、Prefect CloudまたはPrefect Serverを利用してデータを集約し、UIでそれらを管理します。
ステップ 1: Prefect Community CloudまたはPrefect Server
-
Prefect Community Cloudの利用: Prefect Cloudにサインアップし、組織を設定します。Prefect Cloudは、フローのオーケストレーションと監視をクラウド上で管理するサービスです。Prefect Cloudにアクセスしてアカウントを作成します。
-
Prefect Server(自己ホスト)のセットアップ: Serverは、Prefect 2.0のコアオーケストレーションエンジンであり、ローカルマシンや自己管理サーバーにセットアップすることができます。Serverをセットアップするには、適切なインストール手順に従ってServerサービスを起動します。
prefect server start
これにより、serverサーバーが開始され、デフォルトで
http://localhost:4200
でアクセスできるようになります。
ステップ 2: Prefectエージェントの設定
Prefectエージェントは、フローの実行を管理し、指定された実行環境(例:Cloud Run)でフローを実行します。Cloud Runや他の実行環境でフローを実行するには、適切なエージェントをセットアップし、Prefect CloudまたはServerサーバーに接続します。
-
エージェントの起動:
Prefect Cloudを使用している場合は、Prefect Cloudのダッシュボードで提供されるエージェント起動コマンドを使用します。
Orionを自己ホストしている場合は、以下のコマンドを使用してローカルエージェントを起動します。
prefect agent start
ステップ 3: フローの登録と実行
フローをPrefect CloudまたはServerに登録し、エージェントを使用して指定した実行環境でフローを実行します。フローの登録は、フロー定義ファイル内から、またはCLIを使用して行うことができます。
-
フローの登録:
from prefect import flow @flow def my_flow(): # フローロジック my_flow.register(project_name="my_project")
-
フローの実行:
フローが登録されると、Prefect UIまたはCLIを使用してフローをトリガーできます。また、スケジュールされた実行やAPIを通じた実行も可能です。
ステップ 4: Prefect UIの利用
Prefect CloudまたはServer UIを使用して、フローの実行状態、履歴、ログなどを監視し、管理します。UIでは、実行中のフローの状態の確認、過去のフロー実行の履歴の閲覧、フローのパラメータの調整、スケジュールの設定などが可能です。