あまりない場面かと思いますが、「こんな事があった」がてらに…
Airflowでredashのクエリ結果を、APIを利用してGCSに送れるようにするために調査した個人メモ
やりたいこと
- redashのクエリAPIを実行し、取得した結果をGCSにアップロードする
作業環境や利用サービス(verは作業当時のもの)
- GCP Cloud Composer
- Airflow(ver 1.10.2)
- redash(var 5.0)
- GCS bucket(すでに用意済み)
DAGの流れ
簡単な説明ですが、今回は下記2つの手順を踏んでデータを送りました。
redashのクエリAPIをAirflow上で実行
- redashに関するoperatorは無いので、pythonの「requests」を利用してAPIを叩く
取得したデータをGCSに送る
- 「google-cloud-storage」というライブラリを利用してAPIのデータをGCSに送る
サンプルコード
こちらも、とても大まかになりますが載せます。
redashのクエリAPIキーはcsvの方を利用しています。
クエリAPIキーの管理についてはAirflowのConnections
等で管理するのが良いかと思います(ここではあえて直書きにしています)。
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import requests
client = storage.Client()
default_args = {
'owner': 'task owner',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'execution_timeout': datetime.timedelta(hours=1),
'retries': 3,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2019, 08, 3),
}
def redash_to_gcs(ds, **kwargs):
result = requests.get('{redashのクエリAPIキー}')
print(result)
bucket = client.get_bucket('{送り先bucket}')
blob = bucket.blob("{出力ファイル名}.csv")
blob.upload_from_string(result.text.encode('utf-8'))
to_gcs = PythonOperator(
task_id='echo_results',
python_callable=redash_to_gcs,
provide_context=True
)
with airflow.DAG(
'salesforce',
'catchup=False',
default_args=default_args,
schedule_interval='0 23 * * *') as dag:
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
to_gcs = PythonOperator(
task_id='echo_results',
python_callable=redash_to_gcs,
provide_context=True)
start_taslk >> to_gcs >> finish_task