はじめに
Amazon Managed Workflows for Apache Airflow (MWAA) は、AirflowのAWSマネージドサービスであり、簡単にAirflowの環境を構築できます。今回はAirflowでのSensorのよくある課題と、その解決方法としてDeferrable Operatorsを利用する方法を記載します。
Sensorの概要
Airflowではワークフロー(AirflowではDAGと呼ばれる)をPythonで記載できます。DAGでは、実施したいアクションをOperatorを使って記述します。そのOperatorの中の一つに、何かのイベントを待つために使うSensorというOperatorがあります。
例えば、S3にファイルが置かれたらGlueジョブを実行する、というDAGを作成したいときは、Sensorの一つであるS3KeySensorを使うことで実現できます。
Sensorのよくある課題
このSensorですが、デフォルトではワーカースロットを占有してしまいます。WorkerはTaskを実行するコンピューティング環境であり、例えばMWAAの環境クラスmw1.small
では同時に実行できるTaskはデフォルトで5つとなっています。
同時に実行できるTaskが5つなので、例えば5つのSensorがS3からのファイルを待っている状態だと、他のデータ処理などのTaskが実行できなくなります。
もう少し具体的に言うと、Taskはキューで管理されており、最大同時実行数に達するとそれ以降のTaskはキューに入ったままとなります。
待ちが発生するのはSensor以外のOperatorでも同じく起きますが、Sensorは「アクションを待つ」という性質上、実行が長くなりがちで、この課題が起きる可能性も高いと言えます。
Sensorの実行モードでreschedule
を指定すればワーカースロットを使わずに待つこともできますが、時間で繰り返し実行されリソース使用量が多くなることなどから、今回は取り扱いません。Deferrable Operatorsとの比較は公式ドキュメントをご覧ください。
Deferrable Operatorsを使った解決方法
この課題を解決する方法として、Deferrable Operatorsを利用する方法があります。
これを使うと、待機中はWorkerを利用せず、Triggerという別のプロセスで実施してくれます。つまり先ほどの例だと、5つのSensorを実行していても、それらはTriggerで実行されるため、Workerでは別の処理が実行できるということです。
このDeferrable Operatorsは、Airflow v2.2.0以降で利用でき、MWAAだとv2.7.2以降で利用できます。
具体的な使い方
対応しているOperatorであれば、以下のようにdeferrable=True
を指定するだけで利用できます。簡単ですね。
wait_for_source_data = S3KeySensor (
task_id="WaitForSourceData",
bucket_name="source_bucket_name",
bucket_key = "object_key",
aws_conn_id="aws_default",
deferrable=True
)
実際に使ってみる
今回は、S3KeySensor
を、以下2通りの方法で利用してみます。
- 通常のSensorとして利用
- Deferrable Operatorsとして利用
具体的には、まずS3KeySensor
でS3の指定した場所に指定のファイルが置かれるのを待ちます。その後、DummyOperator
を実行(何も起きないダミーのOperator)して終了です。
このDAGを、同時実行数を超えた回数実行して、挙動を見てみます。
上記はAirflow UIの画面ですが、ダークモードにしているため背景が黒くなっています。ダークモードは、Airflow v2.10以降で利用できます。
実行する環境
- 東京リージョン
- MWAA v2.10.1
- 環境クラス mw1.micro
- デフォルトの最大同時実行タスクは3つ
- 実行ロールには、適切なIAMポリシーを付与済み
- S3KeySensorで見に行くS3バケットの参照権限など
実行するソース
以下のソースのDAGを実行します。
deferrable=True
を指定するとDeferrable Operatorsとして実行されるため、通常のSensorとして実行したい場合はコメントアウトします。
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
default_args = {
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# DAGの定義
dag = DAG(
's3_key_sensor_example',
default_args=default_args,
description='A simple DAG with S3KeySensor and a dummy task',
schedule_interval=None,
start_date=datetime(2023, 5, 20),
catchup=False
)
# S3KeySensorタスク
check_s3_file = S3KeySensor(
task_id="WaitForSourceData",
bucket_key="s3://<bucket-name>/data/test.txt",
dag=dag,
deferrable=True, # ★★★ Deferrable 有効化 ★★★
poke_interval=10,
timeout=600
)
# ダミータスク
dummy_task = DummyOperator(
task_id='dummy_task',
dag=dag
)
# タスクの依存関係を設定
check_s3_file >> dummy_task
① 通常のSensorとして利用
まずは、通常のSensorとして実行してみます。
デフォルトの最大同時実行タスクは3つであるため、わざとDAGを5回実行してみます。
最初の3回の実行状態は、緑色のrunning
になっています。これはWorkerでSensorが実行されている状態です。
4,5回目の実行状態は、茶色のschedulued
になっています。つまり、まだ実行されていない状態です。
通常のSensorでは、Workerを使っているので4, 5回目のDAG(の中のTask)は実行されませんでした。
② Deferrable Operatorsとして利用
Deferrable Operatorsを有効化して、DAGを5回実行してみます。
5つ、全ての状態が紫色のdeferred
となっていることが分かります。TriggerでSensorが実行されているので、Workerは使用していません。
Deferrable Operatorsを使うと、Workerを使用せず5つのDAG(の中のTask)が全て実行されました。
おわりに
Deferrable Operatorsを有効/無効にした際、最大同時実行タスク以上の数のTaskを実行したらどうなるのかを見てみました。
基本的にSensorを使う場合は、有効化してよいのではないかなと思います。Sensorで待機していたから他のTaskが実行できない、というのはあまり望ましくない動きですよね。
今回利用したS3KeySensor
は既にオプションがあったので簡単に有効化できましたが、例えばGlueのジョブ完了を待つGlueJobSensor
などにはそのオプションがないんですよね。
classairflow.providers.amazon.aws.sensors.glue.GlueJobSensor(*, job_name, run_id, verbose=False, aws_conn_id='aws_default', **kwargs)
一方、以前の記事で紹介したAWS Batchのジョブ完了を待つBatchSensor
にはオプションがあるようです。
classairflow.providers.amazon.aws.sensors.batch.BatchSensor(*, job_id, aws_conn_id='aws_default', region_name=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poke_interval=30, max_retries=4200, **kwargs)
このように、Deferrable Operatorsを使いたい場合はドキュメントで確認することをお勧めします。
Glueジョブを実行するGlueJobOperatorにはdeferrableオプションが存在します。これを使うことで、そもそもSensorは使わずに、非同期実行+ワーカースロット解放が実現できそうです。
参考にした記事