軽量ワークフローエンジンを検討する機会があったので、記載しました。
※ Prefect Cloudはスコープ外です。
Prefectとは
Pythonベースのオープンソースのワークフローオーケストレーションツールです。
関数にデコレータを付けるだけでフローやタスクを定義でき、Pythonコードから
直感的にワークフローを書ける点が特長です。
実際触ってみたところ、Airflowをよりシンプルかつ軽量にした印象を受けました。
サンプルコード
下記のように、関数/メソッドの定義にデコレーターを付与するだけで、
ワークフローとして必要な機能(task定義やリトライ戦略)を
可読性を保ったまま追加することが可能です。
各taskの定義
# デコレーターでtaskを設定可能(関数/メソッド共に対応)
@task(retries=3, retry_delay_seconds=5, name="create-csv-file")
def create_csv_file(self, json_str: str) -> str:
...
# taskのリトライ戦略も同様
@task(retries=3, retry_delay_seconds=5, name="upload-to-gcs")
def upload_to_gcs(
self,
csv_data: str,
bucket_name: str,
filename: str
) -> str:
...
依存関係(ワークフロー)の定義
# 依存関係(ワークフロー)の定義
@flow
def sample_flow():
# 1. ファイルを作成
## 前述のtaskを呼び出し
csv_data = create_csv_file(json_data)
# 2. GCSにアップロードするtask
## taskの戻り値をtaskに渡すことで、依存関係を実現
gcs_path = upload_to_gcs(csv_data, bucket_name, filename)
ELT時代の計算リソースを持たないデータ処理もさることながら、
(障害や枯渇でリソースが必ず疎通するとは限らない)クラウドネイティブの
軽微なバッチ処理にも適している印象です。
(ML用途での利用が多いですが、他の用途の課題解決にも使える)
公式でも、Cloud Functions(Cloud Run functions)、AWSだとFargateなどのFaaSをサポートしており、
クラウド向けの機能を活用することで、可読性と要件を満たしたワークフローを直感的に
記述できます。
(ローカルでOrion UIを起動すれば、AirflowやDigdag UIのような管理画面も利用できますが、
今回は触れません)
Google Cloudでの軽量ワークフロー向けの機能4選
前述のようにクラウドネイティブに適しているライブラリですが、
その中で、基本的な機能の他に備えているクラウド向けの機能をピックアップしました。
※弊社がGoogle CloudメインなのでGoogle Cloudと銘打っていますが、
AWS等にも転用は可能です。
1.prefect_gcp
AirflowのOperatorのよう(だが、やや薄め)に、各種リソースのテンプレート的なヘルパーが
公式から提供されています。(AWS/Snowflake/Databricks etc...)
以下は、Google Cloudの実装例です。
認証情報ブロックを生成
Google Cloudの資格情報を再利用可能なブロックとして保存しておくことで、
他のフローから名前指定で読み込んで使い回すことができます。
from prefect_gcp import GcpCredentials
# ADCに基づき認証情報ブロックを生成(jsonも指定可能)
credentials_block = GcpCredentials()
# ブロックをPrefectに保存(名称 "my-gcp-creds" を定義して登録)
credentials_block.save("my-gcp-creds", overwrite=True)
BigQueryを利用
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.bigquery import bigquery_query
@flow
def bq_flow():
creds = GcpCredentials.load("my-gcp-creds") # 事前に Block を作成
bigquery_query(
"SELECT 1 AS x",
gcp_credentials=creds,
dialect="standard"
)
2.sync_compatible
非同期関数を同期関数に変換します。
デコレータとして利用すれば、async/await地獄を避け、コードの可読性を保つことができます。
from prefect import flow, task
from prefect.utilities.asyncutils import sync_compatible
import asyncio
@task
@sync_compatible
async def fetch_data(x: int) -> int:
# 非同期処理の例(擬似的なI/O待ち)
print(f"Fetching data for {x}...")
await asyncio.sleep(1) # 非同期に1秒待機
result = x * 2
print(f"Done fetching data for {x}. Result = {result}")
return result
@flow
def my_sync_flow():
# sync_compatibleにより、非同期タスクを同期的に呼び出せる(!)
value = fetch_data(42)
print(f"Result in flow: {value}")
return value
# フローを実行
final_result = my_sync_flow()
3.結果の永続化
ワークフローの実行結果を保存して永続化する機能も、デコレーターで反映可能です。
ワークフロー実行前に、保存先のGCSを設定
def ensure_block():
gcs = GcsBucket(
bucket="your-bucket-name", # ここを変更
project="your-gcp-project-id", # ここを変更
bucket_folder="prefect-results" # GCS側の保存プレフィックス
)
gcs.save("my-gcs-bucket-block", overwrite=True)
ensure_block()
ワークフロー実行開始時に、設定した保存先Blockを反映
@flow(
name="sample-result-to-gcs",
persist_result=True, # フローの返り値も保存
result_storage=GcsBucket.load("my-gcs-bucket-block"), # 設定した保存先Block
)
4.prefect_gcp.cloud_run
Prefect フロー内で直接、Cloud Run ジョブを通じてコマンドを実行できます。
結果のステータスを受け取ることも可能です。(AWSはLambdaを呼べます)
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob
@flow
def cloud_run_job_flow():
cloud_run_job = CloudRunJob(
image="us-docker.pkg.dev/cloudrun/container/job:latest",
credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER"),
region="us-central1",
command=["echo", "Hello, Prefect!"],
)
# CloudRunWorkerResultクラスが返る
return cloud_run_job.run()
補足
ワークフローのUI向け実行履歴はDBに格納されるため、
UIで実行履歴を管理したい場合は、下記のどちらかを選択する必要があります。
1.PostgreSQLに保存
2.SQLiteに保存 (ディスクマウントなどの、永続的な保存先が必要)
※実行履歴の管理が不要の場合は、SQLiteの保存先をFaaSのtmpディレクトリにする、
などが考えられます