背景
AirflowでHTTPリクエスト送るだけのシンプルなDAGを作りたい
コード
先に動くコードを出しておく。
肝は HttpOperator
ではなく、 PythonOperator
を使うところ。(詳しくは後述)
import airflow
import subprocess
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
import requests
def execute_request():
URL = "http://hoge.com"
headers={'Authorization': 'some_token'}
r = requests.get(URL, headers=headers)
if r.status_code != requests.codes.ok:
r.raise_for_status()
return 'success'
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
dag = DAG(
"simple_request_dag",
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
PythonOperator(
task_id='simple_request_task',
python_callable=execute_request,
dag=dag,
)
はまったこと
Airflow の reference とか見ながら、httpリクエストするOperator 探すと、まず HttpOperator にたどり着くと思う。が、このOperatorはちょっと厄介なので気をつけたほうがいい。シンプルにURLを渡してリクエストできるようにはなっていない。
HttpOperatorのサンプルはここにある。
airflow/airflow/example_dags/example_http_operator.py
このコードを見ると、hostの指定がないことに気づく。
HttpOperatorではコード上でhostの指定ができない。
このサンプルコードのhostはAirflowがもつConnectionsという変数に設定されている。 これは WebUIの admin -> Connections で確認できる。上記のサンプルコードの場合はデフォルトホストである、default_host=www.google.com
というものが適用される。もし、このdefault以外のhostを指定したい場合は、Connectionsに hoge_host=http://hoge.com
のようなものを登録し、 http_conn_id
引数で指定する必要がある。Connectionsの設定は、WebUIで行うか、環境変数でも可能。
でも面倒。
とうことで、シンプルに作りたい場合は、PythonOperatorがおすすめ。
参考
How to access the response from Airflow SimpleHttpOperator GET request
「HttpOperatorでどうやってhost変えるの? え、そんな面倒なの? じゃ、PythonOperator使うわ。」みたいなシュールなやり取りがされている。