1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS AnalyticsAdvent Calendar 2024

Day 15

MWAAでのリソース管理:Deferrable Operatorsによるワーカースロット解放の実践

Last updated at Posted at 2024-12-15

はじめに

Amazon Managed Workflows for Apache Airflow (MWAA) は、AirflowのAWSマネージドサービスであり、簡単にAirflowの環境を構築できます。今回はAirflowでのSensorのよくある課題と、その解決方法としてDeferrable Operatorsを利用する方法を記載します。

Sensorの概要

Airflowではワークフロー(AirflowではDAGと呼ばれる)をPythonで記載できます。DAGでは、実施したいアクションをOperatorを使って記述します。そのOperatorの中の一つに、何かのイベントを待つために使うSensorというOperatorがあります。

例えば、S3にファイルが置かれたらGlueジョブを実行する、というDAGを作成したいときは、Sensorの一つであるS3KeySensorを使うことで実現できます。

image.png

Sensorのよくある課題

このSensorですが、デフォルトではワーカースロットを占有してしまいます。WorkerはTaskを実行するコンピューティング環境であり、例えばMWAAの環境クラスmw1.smallでは同時に実行できるTaskはデフォルトで5つとなっています。

同時に実行できるTaskが5つなので、例えば5つのSensorがS3からのファイルを待っている状態だと、他のデータ処理などのTaskが実行できなくなります。

もう少し具体的に言うと、Taskはキューで管理されており、最大同時実行数に達するとそれ以降のTaskはキューに入ったままとなります。

待ちが発生するのはSensor以外のOperatorでも同じく起きますが、Sensorは「アクションを待つ」という性質上、実行が長くなりがちで、この課題が起きる可能性も高いと言えます。

image.png

Sensorの実行モードでrescheduleを指定すればワーカースロットを使わずに待つこともできますが、時間で繰り返し実行されリソース使用量が多くなることなどから、今回は取り扱いません。Deferrable Operatorsとの比較は公式ドキュメントをご覧ください。

Deferrable Operatorsを使った解決方法

この課題を解決する方法として、Deferrable Operatorsを利用する方法があります。

これを使うと、待機中はWorkerを利用せず、Triggerという別のプロセスで実施してくれます。つまり先ほどの例だと、5つのSensorを実行していても、それらはTriggerで実行されるため、Workerでは別の処理が実行できるということです。

このDeferrable Operatorsは、Airflow v2.2.0以降で利用でき、MWAAだとv2.7.2以降で利用できます。

具体的な使い方

対応しているOperatorであれば、以下のようにdeferrable=Trueを指定するだけで利用できます。簡単ですね。

wait_for_source_data = S3KeySensor (
task_id="WaitForSourceData",
bucket_name="source_bucket_name",
bucket_key = "object_key",
aws_conn_id="aws_default",
deferrable=True
)

実際に使ってみる

今回は、S3KeySensorを、以下2通りの方法で利用してみます。

  1. 通常のSensorとして利用
  2. Deferrable Operatorsとして利用

具体的には、まずS3KeySensorでS3の指定した場所に指定のファイルが置かれるのを待ちます。その後、DummyOperatorを実行(何も起きないダミーのOperator)して終了です。

このDAGを、同時実行数を超えた回数実行して、挙動を見てみます。

image.png

上記はAirflow UIの画面ですが、ダークモードにしているため背景が黒くなっています。ダークモードは、Airflow v2.10以降で利用できます。

実行する環境

  • 東京リージョン
  • MWAA v2.10.1
  • 環境クラス mw1.micro
    • デフォルトの最大同時実行タスクは3つ
  • 実行ロールには、適切なIAMポリシーを付与済み
    • S3KeySensorで見に行くS3バケットの参照権限など

実行するソース

以下のソースのDAGを実行します。
deferrable=Trueを指定するとDeferrable Operatorsとして実行されるため、通常のSensorとして実行したい場合はコメントアウトします。

from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.dummy import DummyOperator

from datetime import datetime, timedelta

default_args = {
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

# DAGの定義
dag = DAG(
    's3_key_sensor_example',
    default_args=default_args,
    description='A simple DAG with S3KeySensor and a dummy task',
    schedule_interval=None,
    start_date=datetime(2023, 5, 20),
    catchup=False
)

# S3KeySensorタスク
check_s3_file = S3KeySensor(
    task_id="WaitForSourceData",
    bucket_key="s3://<bucket-name>/data/test.txt",
    dag=dag,
    deferrable=True, # ★★★ Deferrable 有効化 ★★★
    poke_interval=10,
    timeout=600
)

# ダミータスク
dummy_task = DummyOperator(
    task_id='dummy_task',
    dag=dag
)

# タスクの依存関係を設定
check_s3_file >> dummy_task

① 通常のSensorとして利用

まずは、通常のSensorとして実行してみます。

デフォルトの最大同時実行タスクは3つであるため、わざとDAGを5回実行してみます。

最初の3回の実行状態は、緑色のrunningになっています。これはWorkerでSensorが実行されている状態です。
image.png

4,5回目の実行状態は、茶色のscheduluedになっています。つまり、まだ実行されていない状態です。
image.png

通常のSensorでは、Workerを使っているので4, 5回目のDAG(の中のTask)は実行されませんでした

② Deferrable Operatorsとして利用

Deferrable Operatorsを有効化して、DAGを5回実行してみます。

5つ、全ての状態が紫色のdeferredとなっていることが分かります。TriggerでSensorが実行されているので、Workerは使用していません。
image.png

Deferrable Operatorsを使うと、Workerを使用せず5つのDAG(の中のTask)が全て実行されました

おわりに

Deferrable Operatorsを有効/無効にした際、最大同時実行タスク以上の数のTaskを実行したらどうなるのかを見てみました。

基本的にSensorを使う場合は、有効化してよいのではないかなと思います。Sensorで待機していたから他のTaskが実行できない、というのはあまり望ましくない動きですよね。

今回利用したS3KeySensorは既にオプションがあったので簡単に有効化できましたが、例えばGlueのジョブ完了を待つGlueJobSensorなどにはそのオプションがないんですよね。

classairflow.providers.amazon.aws.sensors.glue.GlueJobSensor(*, job_name, run_id, verbose=False, aws_conn_id='aws_default', **kwargs)

一方、以前の記事で紹介したAWS Batchのジョブ完了を待つBatchSensorにはオプションがあるようです。

classairflow.providers.amazon.aws.sensors.batch.BatchSensor(*, job_id, aws_conn_id='aws_default', region_name=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poke_interval=30, max_retries=4200, **kwargs)

このように、Deferrable Operatorsを使いたい場合はドキュメントで確認することをお勧めします。

Glueジョブを実行するGlueJobOperatorにはdeferrableオプションが存在します。これを使うことで、そもそもSensorは使わずに、非同期実行+ワーカースロット解放が実現できそうです。

参考にした記事

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?