センサ(Sensor)とは
Airflowのオペレータの一種で、何かが起きるまで待つのに使います。
どんなのがあるの?
いろいろありますが、よく使いそうなのは
- Cloud Pub/Subを待つ(PubSubPullSensor)
- 他のDAGのタスクインスタンスを待つ(ExternalTaskSensor)
- Pythonのcallableを待つ(PythonSensor)
- HTTPリクエストを待つ(HttpSensor)
- ストレージを待つ(GoogleCloudStorageObjectSensor/SFTPSensor/S3KeySensor)
などがあります。
ソースコード中ではariflow/sensorsディレクトリとairflow/contrib/sensorsディレクトリにあります。
(1.10.2の場合)
動作
SensorはBaseSensorOperatorクラスを継承し、大雑把には下な感じで動作します。
- 何かが起きたかチェック(pokeメソッド)
- 何か起きた(pokeがFalseでない)時は終了
- 経過時間をチェック。タイムアウトしていれば終了
- 何も起きず、タイムアウトもしていない時は、さらに一定時間(poke_interval)待つ
- 繰り返す
その他のオプション
-
soft_fail(True/False)
- タイムアウトした時のタスクインスタンスの状態
- Trueでは、timeoutした時にタスクインスタンスがfailしない(skipped)
- downstreamのタスクインスタンスはスキップされます (私はハマりました…)
- Falseでは、timeoutした時にタスクインスタンスがfailする
-
mode(poke/reschedule)
- 待ち方の指定
- pokeでは、タスクインスタンス内でsleepして待ちます
- rescheduleでは、Airflowのスケジュールの仕組みで待ちます
- pokeだと待っている間もスロット(同時実行の制限)専有するので、長く待つ可能性がある時はrescheduleの方が良いらしい