0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Airflowでredashのクエリ結果をGCSに送る

Posted at

あまりない場面かと思いますが、「こんな事があった」がてらに…
Airflowでredashのクエリ結果を、APIを利用してGCSに送れるようにするために調査した個人メモ

やりたいこと

  • redashのクエリAPIを実行し、取得した結果をGCSにアップロードする

図に表すと下記のようになります。
redashからgcs.png

作業環境や利用サービス(verは作業当時のもの)

  • GCP Cloud Composer
    • Airflow(ver 1.10.2)
  • redash(var 5.0)
  • GCS bucket(すでに用意済み)

DAGの流れ

簡単な説明ですが、今回は下記2つの手順を踏んでデータを送りました。

redashのクエリAPIをAirflow上で実行

取得したデータをGCSに送る

  • 「google-cloud-storage」というライブラリを利用してAPIのデータをGCSに送る

サンプルコード

こちらも、とても大まかになりますが載せます。
redashのクエリAPIキーはcsvの方を利用しています。
クエリAPIキーの管理についてはAirflowのConnections等で管理するのが良いかと思います(ここではあえて直書きにしています)。

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import requests

client = storage.Client()

default_args = {
    'owner': 'task owner',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'execution_timeout': datetime.timedelta(hours=1),
    'retries': 3,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2019, 08, 3),
}



def redash_to_gcs(ds, **kwargs):
    result = requests.get('{redashのクエリAPIキー}')
    print(result)
    bucket = client.get_bucket('{送り先bucket}')
    blob = bucket.blob("{出力ファイル名}.csv")
    blob.upload_from_string(result.text.encode('utf-8'))

to_gcs = PythonOperator(
    task_id='echo_results',
    python_callable=redash_to_gcs,
    provide_context=True
)


with airflow.DAG(
        'salesforce',
        'catchup=False',
        default_args=default_args,
        schedule_interval='0 23 * * *') as dag:
	
    start_task = DummyOperator(task_id='start')
    finish_task = DummyOperator(task_id='finish')


    to_gcs = PythonOperator(
        task_id='echo_results',
        python_callable=redash_to_gcs,
        provide_context=True)

    start_taslk >> to_gcs >> finish_task
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?