4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Prefectに入門する

Last updated at Posted at 2024-03-27

Prefectとは

Prefectは、データエンジニアリングとデータサイエンスのための強力なワークフローオーケストレーションツールの一種です。
ワークフローオーケストレーションツールの歴史や主なツールについては、下記がとても分かりやすいです。

ワークフローオーケストレーション入門

ワークフローツールの主な機能

(上記から引用)

  • 実行制御: タスクの実行順序の制御やリトライなどを保証する
  • 監視: ワークフローの実行状態やログを追跡できる
  • スケーラビリティ: 並行処理や柔軟な計算リソースの拡張を行う
  • 外部連携: データベースやETLツールなどと容易に連携可能

Prefectの特徴とAirflowとの違い

Prefectの主な特徴には、直感的なAPI、動的なワークフロー生成、高度な状態管理、簡易なデバッグ、および強力なUIがあります。Airflowに比べて、Prefectはよりプログラマティックにフレキシブルなワークフローの定義を可能にし、エラーハンドリングと状態管理が向上しています。

PrefectはAirflowを明確に意識して開発されており、公式から下記のドキュメントが出ています。

Why Not Airflow? | Prefect Docs

  • API: ユーザーフレンドリーで軽量なAPI。Pythonicな方法でDAGを構築できる
  • Scheduling and Time: 時間の使い方が分かりやすい(AirflowのDAGに必須な execution_date は独特の概念だった)
  • The Scheduler Service: Prefectのスケジューラはシンプルで、ワークフローのロジックや実行には一切関与しない。新しいフローを作成し、Scheduled状態にするだけ
  • Dataflow: タスクが直接データを交換できる(Airflowはタスクを独立させるためXComでメタデータデータベースを利用)
  • Parametrized Workflows: PrefectのParameterは、実行時に値を(オプションで)オーバーライドできる(AirflowのDAGは固定スケジュールで実行され、入力を受け取らない)
  • Dynamic Workflows: 上流タスクの出力に基づいて、実行時にタスクの新しいコピーを動的に生成することができる
  • Versioned Workflows: Prefect Cloudでは、同じ名前のフローが既に存在するプロジェクトにフローをデプロイすると、バージョン管理が自動的に行われる
  • Local Testing: シンプルなため・かつ色々用意されており、ローカルでのテストがしやすい
  • UI: 様々な機能をサポートする美しいリアルタイムUIを提供する

Prefectの構成要素

Prefectの主要なコンポーネントは次のとおりです。

  • フロー: ワークフロー全体を表します。タスクの集合体です。
  • タスク: ワークフロー内で実行される個々の作業項目です。

Prefectのはじめ方

Prefectのセットアップ

Prefectを使用するには、まずPython環境にインストールする必要があります。次のコマンドを使用して、Prefect 2.0をインストールするだけでまずは利用を開始することができます。

pip install prefect

Prefectで開発・実行

タスクとフローをコーディングする

Prefectを使用して、簡単なフローを作成します。

Prefectでは、タスク間でデータを簡単に受け渡すことができます。以下の例では、加算タスクと乗算タスクを定義し、加算の結果を乗算タスクへ渡しています。

from prefect import flow, task

@task
def add(x, y):
    return x + y

@task
def multiply(x, y):
    return x * y

@flow
def my_complex_flow(x, y, z):
    add_result = add(x, y)
    final_result = multiply(add_result, z)
    print(f"最終結果は {final_result} です。")

# フローの実行
my_complex_flow(1, 2, 3)

このコードでは、add関数 と multiply関数を @task デコレータで装飾することによりタスクに変換し、my_complex_flowフロー内で実行しています。

フローを実行する

フローを定義した後、単純に関数として呼び出すことで実行できます。上記の例では、my_complex_flow()を呼び出すことでフローが実行されます。

フローへのパラメータの渡し方

上記の例では、my_complex_flow フローは3つのパラメータx, y, zを受け取ります。これらのパラメータはフローの実行時に指定します。

主な機能の詳細

実行順序制御

上記例の通り、タスクのアウトプットで実行順序を制御することができます。

また、フロー内のタスクの実行順序を明示的に制御することも可能です。タスク間の依存関係を定義することで、実行順序を制御できます。例えば、task_2task_1の結果に依存している場合、task_1の完了後にのみtask_2が実行されます。

@flow
def example_flow():
    result1 = task_1()
    result2 = task_2(upstream_tasks=[result1])

upstream_tasksset_dependenciesメソッドを使用して、タスクの実行順序を制御することができます。

リトライ

自動リトライ

Prefectでは、タスクやフローの実行に失敗した場合のリトライを簡単に設定できます。@taskデコレータのretriesパラメータを使用して、リトライ回数を指定し、retry_delayパラメータでリトライ間の遅延を設定できます。

from prefect import task, flow
from datetime import timedelta

@task(retries=3, retry_delay=timedelta(seconds=10))
def unreliable_task():
    # 実装
    pass

この設定により、unreliable_taskは最大3回までリトライされ、リトライ間には10秒の遅延があります。

実行状態の追跡

Prefect UIを使用すると、過去のフロー実行の履歴を確認し、成功、失敗、その他の状態に関する詳細を見ることができます。

ダッシュボードを起動するには、コマンドラインから次のコマンドを実行します。

$ prefect server start

このコマンドを実行すると、ブラウザでダッシュボードが開きます。ダッシュボードでは、フローの実行履歴、実行中のフロー、スケジュールされたフロー、フローの実行結果など、さまざまな情報を確認することができます。

ログの追跡

Prefect UIを使用すると、過去のフロー実行の履歴を確認し、ログの詳細を見ることができます。

並列実行

Prefect 2.0では、特に並列実行を明示的に指定する必要はありません。代わりに、タスク間に循環しない依存関係がある場合、実行システムは自動的に可能な限りタスクを並列に実行します。

リソースのスケール

Prefectでは、タスクごとに異なる実行環境(コンテナを含む)を指定することができます。これにより、タスクが特定の依存関係や環境で実行されるように設定できます。

外部連携

  • Slack通知
import requests
from prefect import flow, task
from prefect.tasks import task_input_hash

# Slack通知関数
def send_slack_message(message):
    webhook_url = "YOUR_SLACK_WEBHOOK_URL"
    slack_data = {'text': message}
    response = requests.post(webhook_url, json=slack_data)
    if response.status_code != 200:
        raise ValueError(f"Request to Slack returned an error {response.status_code}, the response is:\n{response.text}")

# ステートハンドラ
def slack_state_handler(obj, old_state, new_state):
    if new_state.is_failed():
        send_slack_message(f"{obj} has failed.")
    elif new_state.is_completed():
        send_slack_message(f"{obj} has completed successfully.")
    return new_state

@task
def sample_task():
    # タスクの処理
    return "タスクの実行結果"

@flow(state_handlers=[slack_state_handler])
def my_flow():
    result = sample_task()

my_flow()

この例では、slack_state_handler関数がステートハンドラとしてフローに登録されています。このハンドラは、フローの状態が変化するたびに呼び出され、フローが失敗した場合や成功裏に完了した場合にSlackに通知します。

Tips

標準ディレクトリ構成は?

Prefectのプロジェクトでは、特定のディレクトリ構成を採用することが推奨されますが、必須ではありません。一般的には、**flows/ディレクトリにフローの定義を、tasks/ディレクトリにタスクの定義を格納します。lib/**ディレクトリには共通のライブラリやユーティリティを置きます。

Prefect 2.0で追加された機能は?

Prefect 2.0では、より柔軟なワークフロー定義、改善されたUI、動的なタスク生成、改善されたエラーハンドリングなど、多くの新機能が追加されました。
下記が分かりやすかったです。
Prefect 2.0に入門する

名称の変更などもありました。(orion -> server など)

prefect.yamlにかけることは?

**prefect.yaml**ファイルは、Prefectフローの設定を管理するために使用されます。このファイルには、フローの実行スケジュール、ログの設定、実行環境の設定など、多くの設定項目を記述することができます。また、プロジェクト固有の設定やカスタムストレージオプションの指定にも使用されます。

Prefect Cloudとは

Prefect Cloudは、Prefectをクラウド上で実行するためのマネージドサービスです。これにより、ワークフローのオーケストレーション、監視、スケーリングが簡単になり、ユーザーはインフラストラクチャの管理から解放され、ビジネスロジックの開発に集中できます。Prefect Cloudは、フローの実行スケジュール、リアルタイムの監視、ログ収集、セキュリティ機能など、エンタープライズレベルの機能を提供します。

実行履歴の永続化

Prefect UIを使用してタスクやフローの実行結果を集約し、過去の実行履歴も確認できるようにするためには、以下のステップに従います。このプロセスでは、Prefect CloudまたはPrefect Serverを利用してデータを集約し、UIでそれらを管理します。

ステップ 1: Prefect Community CloudまたはPrefect Server

  1. Prefect Community Cloudの利用: Prefect Cloudにサインアップし、組織を設定します。Prefect Cloudは、フローのオーケストレーションと監視をクラウド上で管理するサービスです。Prefect Cloudにアクセスしてアカウントを作成します。

  2. Prefect Server(自己ホスト)のセットアップ: Serverは、Prefect 2.0のコアオーケストレーションエンジンであり、ローカルマシンや自己管理サーバーにセットアップすることができます。Serverをセットアップするには、適切なインストール手順に従ってServerサービスを起動します。

    prefect server start
    

    これにより、serverサーバーが開始され、デフォルトでhttp://localhost:4200でアクセスできるようになります。

ステップ 2: Prefectエージェントの設定

Prefectエージェントは、フローの実行を管理し、指定された実行環境(例:Cloud Run)でフローを実行します。Cloud Runや他の実行環境でフローを実行するには、適切なエージェントをセットアップし、Prefect CloudまたはServerサーバーに接続します。

  1. エージェントの起動:

    Prefect Cloudを使用している場合は、Prefect Cloudのダッシュボードで提供されるエージェント起動コマンドを使用します。

    Orionを自己ホストしている場合は、以下のコマンドを使用してローカルエージェントを起動します。

    prefect agent start
    

ステップ 3: フローの登録と実行

フローをPrefect CloudまたはServerに登録し、エージェントを使用して指定した実行環境でフローを実行します。フローの登録は、フロー定義ファイル内から、またはCLIを使用して行うことができます。

  1. フローの登録:

    from prefect import flow
    
    @flow
    def my_flow():
        # フローロジック
    
    my_flow.register(project_name="my_project")
    
  2. フローの実行:

    フローが登録されると、Prefect UIまたはCLIを使用してフローをトリガーできます。また、スケジュールされた実行やAPIを通じた実行も可能です。

ステップ 4: Prefect UIの利用

Prefect CloudまたはServer UIを使用して、フローの実行状態、履歴、ログなどを監視し、管理します。UIでは、実行中のフローの状態の確認、過去のフロー実行の履歴の閲覧、フローのパラメータの調整、スケジュールの設定などが可能です。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?