12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Airflowのセンサ(Sensor)入門

Posted at

センサ(Sensor)とは

Airflowのオペレータの一種で、何かが起きるまで待つのに使います。

どんなのがあるの?

いろいろありますが、よく使いそうなのは

  • Cloud Pub/Subを待つ(PubSubPullSensor)
  • 他のDAGのタスクインスタンスを待つ(ExternalTaskSensor)
  • Pythonのcallableを待つ(PythonSensor)
  • HTTPリクエストを待つ(HttpSensor)
  • ストレージを待つ(GoogleCloudStorageObjectSensor/SFTPSensor/S3KeySensor)
    などがあります。

ソースコード中ではariflow/sensorsディレクトリとairflow/contrib/sensorsディレクトリにあります。
(1.10.2の場合)

動作

SensorはBaseSensorOperatorクラスを継承し、大雑把には下な感じで動作します。

  1. 何かが起きたかチェック(pokeメソッド)
  2. 何か起きた(pokeがFalseでない)時は終了
  3. 経過時間をチェック。タイムアウトしていれば終了
  4. 何も起きず、タイムアウトもしていない時は、さらに一定時間(poke_interval)待つ
  5. 繰り返す

その他のオプション

  • soft_fail(True/False)

    • タイムアウトした時のタスクインスタンスの状態
    • Trueでは、timeoutした時にタスクインスタンスがfailしない(skipped)
    • downstreamのタスクインスタンスはスキップされます (私はハマりました…)
    • Falseでは、timeoutした時にタスクインスタンスがfailする
  • mode(poke/reschedule)

    • 待ち方の指定
    • pokeでは、タスクインスタンス内でsleepして待ちます
    • rescheduleでは、Airflowのスケジュールの仕組みで待ちます
    • pokeだと待っている間もスロット(同時実行の制限)専有するので、長く待つ可能性がある時はrescheduleの方が良いらしい
12
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?