はじめに
2025年4月に株式会社インティメート・マージャーへ新卒入社した @kunaisn です 。
早いもので、入社してから8ヶ月が経とうとしています。
この8ヶ月間、新卒研修が終わった後は、主にバックエンドのシステム改修をしていました。
今回は、弊社で採用されているデータ処理パイプラインのプラットフォームであるAirflowについて記載します。
GCS上にファイルが存在する限り待機するカスタムセンサーと、その実処理を担うカスタムトリガーを実装します。
環境
- Python: 3.11.x
- Airflow: 2.10.5
- Cloud Composer: 2.13.3
Airflowのセンサーの3つの待機モード
Airflowで「何かの完了を待つ」処理を行うセンサー(Sensor)には、主に3つの待機モードがあります。
1. poke モード
古くからある基本的なモードです。
センサータスクは、poke_intervalで設定した秒数ごとに poke() メソッドを実行し、条件をチェックします。
問題点としては、条件を満たすまでワーカーのスロットを占有し続けることです。待機時間が長い場合、その間ずっとAirflowワーカーが1つ消費され続けます。
check_no_objects_sync = NoGCSObjectsWithPrefixSensor(
task_id="check_no_objects_sync",
bucket=bucket_name,
prefix=prefix,
timeout=600,
poke_interval=60,
mode="poke",
deferrable=False,
)
2. reschedule モード
poke モードのリソース非効率性を改善するために導入されたモードです。
poke() メソッドが False を戻すと、センサータスクはワーカーを一時的に解放し、自身のステータスを up_for_reschedule にします。そして、poke_interval が経過すると、Airflow Schedulerがこのタスクを再度キューに入れ直します。
利点としては、待機中にワーカーを解放するため、poke モードよりリソース効率が良いところです。
一方で、短い間隔で多数のセンサーが reschedule モードで動くと、Schedulerがタスクの再スケジュール処理に追われ、Scheduler自体の負荷が非常に高くなる可能性があります。また、タスクの起動・停止オーバーヘッドもdeferrable=True に比べて大きくなります。
check_no_objects_schedule = NoGCSObjectsWithPrefixSensor(
task_id="check_no_objects_schedule",
bucket=bucket_name,
prefix=prefix,
timeout=600,
poke_interval=60,
mode="reschedule",
deferrable=False,
)
3. deferrable モード
deferrable=True で実行されると、センサータスクは self.defer() メソッドを呼び出し、自身の処理をトリガーに委譲してタスクステータスを deferredにします。このとき、ワーカーのスロットは解放されます。
委譲されたトリガーは、Triggererと呼ばれる専用のプロセスに渡されます。
Triggererは非同期で動作し、多くのトリガーを同時に監視できます。
そして、Triggererが「条件を満たした」と判断すると、イベント(TriggerEvent)を発行します。
Airflow Schedulerがイベントを検知し、元のセンサータスクをワーカー上で再スケジュールします。
この際、センサーで定義された execute_complete メソッドが呼び出され、処理が再開・完了します。
deferrable モードは、待機中はワーカーリソースを消費せず、Schedulerにも負荷をかけない、リソース効率の良い待機モードです。基本的にこのモードが使えるのであれば、これを使います。
check_no_objects_async = NoGCSObjectsWithPrefixSensor(
task_id="check_no_objects_async",
bucket=bucket_name,
prefix=prefix,
timeout=600,
poke_interval=60,
deferrable=True,
)
実装するもの
今回の開発で必要になったのは、標準で実装されている「ファイルが出現するまで待つ」センサーではなく、「ファイルが無くなるまで待つ」センサーでした。このセンサーをdeferrableで動かすために、カスタムセンサーとカスタムトリガーを実装します。
基本的には、ライブラリの実装をコピペしながら改造すると事故が少ないと思います。
カスタムセンサーの実装
まずは、ワーカー上で動作するセンサー本体です。
標準の GCSObjectsWithPrefixExistenceSensor を継承し、ロジックを部分的に上書き・反転させます。
from typing import Any
from datetime import timedelta
from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.sensors.gcs import GCSObjectsWithPrefixExistenceSensor
from airflow.utils.context import Context
from triggers.no_gcs_objects_trigger import NoGCSObjectsWithPrefixTrigger
class NoGCSObjectsWithPrefixSensor(GCSObjectsWithPrefixExistenceSensor):
def execute(self, context: Context):
self.log.info(f"Checking for existence of object: {self.bucket}, {self.prefix}")
if not self.deferrable:
super().execute(context)
return
if not self.poke(context=context):
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=NoGCSObjectsWithPrefixTrigger(
bucket=self.bucket,
prefix=self.prefix,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)
def poke(self, context: Context) -> bool:
return not super().poke(context)
def execute_complete(self, context: dict[str, Any], event: dict[str, str | list[str]]):
self.log.info("Resuming from trigger and checking status")
if event["status"] == "success":
return
raise AirflowException(event["message"])
継承元の実装に合わせているので若干読みにくいですが、deferrable=Trueの場合にはトリガーに処理を委譲して、deferrable=Falseの場合には、pokeメソッドで存在確認する処理を行っています。
poke メソッドでは return not super().poke(context) とすることで、親クラスの「存在したらTrue」の判定を反転させ、「存在しなくなったらTrue」となるようにオーバーライドしています。
execute_completeは、Triggererから処理が戻ってきたときに呼び出されます。event["status"] をチェックして、"success" であればタスクを成功させています。
カスタムトリガーの実装
次に、Triggerer上で非同期に実行されるトリガー本体です。
import asyncio
from typing import AsyncIterator, Any
from airflow.providers.google.cloud.triggers.gcs import GCSPrefixBlobTrigger
from airflow.triggers.base import TriggerEvent
class NoGCSObjectsWithPrefixTrigger(GCSPrefixBlobTrigger):
def __init__(
self,
bucket: str,
prefix: str,
poke_interval: float = 60,
google_cloud_conn_id: str = "google_cloud_default",
hook_params: dict[str, Any] | None = None,
):
super().__init__(
bucket=bucket,
prefix=prefix,
poke_interval=poke_interval,
google_cloud_conn_id=google_cloud_conn_id,
hook_params=hook_params or {},
)
def serialize(self) -> tuple[str, dict[str, Any]]:
return (
"triggers.no_gcs_objects_trigger.NoGCSObjectsWithPrefixTrigger",
{
"bucket": self.bucket,
"prefix": self.prefix,
"poke_interval": self.poke_interval,
"google_cloud_conn_id": self.google_cloud_conn_id,
"hook_params": self.hook_params,
},
)
async def run(self) -> AsyncIterator[TriggerEvent]:
try:
hook = self._get_async_hook()
while True:
self.log.info(
f"Checking for absence of blobs with prefix '{self.prefix}' in bucket '{self.bucket}'",
)
res = await self._list_blobs_with_prefix(
hook=hook, bucket_name=self.bucket, prefix=self.prefix
)
# ここの処理を反転させて、オブジェクトが存在しない場合に成功とする
if len(res) == 0:
yield TriggerEvent(
{"status": "success", "message": "Successfully completed"}
)
return
await asyncio.sleep(self.poke_interval)
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e)})
こちらも、特定プレフィックスのオブジェクトが存在する時に成功するGCSPrefixBlobTriggerの処理を反転させています。それ以外は、細かい部分を除くとそのまま継承元の関数をコピペしています。
動作確認
以下のようなディレクトリ構成で配置します。
docker composeで立ち上げた開発環境の中で実行して動作確認をしています。
dags/
└── test_no_gcs_objects.py
plugins/
├── sensors/
│ └── no_gcs_objects_sensor.py
└── triggers/
└── no_gcs_objects_trigger.py
Cloud Composerへのデプロイとトラブルシューティング
ローカル環境では問題なく動作していましたが、弊社環境のGoogle Cloud Composer(2.13.3-airflow-2.10.5)にデプロイした際、Triggererで以下のエラーが発生して動かない事象に遭遇しました。
ModuleNotFoundError: No module named 'triggers'
「アップロードしたはずのモジュールが見つからない」という状況です。
これに対して調査を行ったところ、Composer環境自体の構成不整合であるという結論に至りました。
以下に、実際に行った調査手順を記載します。
1. Triggerer Podの中身
kubectl コマンドを使用して、Composerが動いているGKEクラスタに接続し、Triggerer Podの中身を直接確認しました。
# 事前にgcloud、kubectl、google-cloud-cli-gke-gcloud-auth-pluginのインストールが必要です
# Composerが動いているプロジェクトに移動
gcloud config set project <プロジェクト>
# 対象のComposer名を取得
gcloud composer environments list --locations <ロケーション>
# クラスタ名を取得
gcloud composer environments describe <Composer名> \
--location <ロケーション> \
--format="value(config.gkeCluster)"
# クラスタの認証情報を取得
gcloud container clusters get-credentials <クラスタ名> \
--region <ロケーション>
# クラスタのネームスペースを確認
kubectl get namespaces
# ポッドを確認
kubectl get pods -n <ネームスペース>
# Triggerer Podに入ってディレクトリを確認
kubectl exec -it <ポッド名> -n <ネームスペース> -- bash
Pod内でDAGやPluginが格納されているはずのディレクトリ(/home/airflow/gcs)を確認したところ、logs以外のディレクトリが存在しませんでした。
airflow@<取得したポッド名>:~/gcs$ ls
logs
2. Podの構成情報の確認
なぜマウントされていないのかを確認するため、Podの構成情報を確認しました。
kubectl describe pod <ポッド名> -n <ネームスペース>
内容を読むと、環境変数では GCSFUSE_EXTRACTED: TRUEになっているにも関わらず、Mounts セクションにはGCSボリュームのマウント設定がありませんでした。
3. 結論
調査の結果、そもそもComposerが生成したKubernetesのマニフェストに期待する設定が書かれていないことが分かりました。そのため、Podを再起動しても、設計図であるマニフェストにGCSマウント記述がない以上、解決しない状態でした。
回避策として、今回のコードをPythonパッケージ化してイメージに含めることで、GCSマウントに依存せず、カスタムトリガーを利用する方法も考えられました。
しかし、今回は大量にセンサーを動かすユースケースではなかったため、deferrable モードの使用を見送り、reschedule モードで実装して本番運用することにしました。
4. 感想
最終的にはインフラ側の不整合が原因でした。アプリ層で解決できない時は、kubectl を使ってPodの中身を確認することで、より深い原因を特定できるという良い経験となりました。
何気にKubernetesを触るのが初めてだったので、結構楽しかったです。
おわりに
今回は、AirflowのDeferrable Operatorの仕組みを利用し、GCS上のファイル非存在を監視するカスタムセンサーとカスタムトリガーを実装しました。
Deferrable Operatorを使いこなすことで、Airflowワーカーのリソースを大幅に節約でき、より効率的なデータパイプラインを構築できます。
ご清覧ありがとうございます!

