12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ぷりぷりあぷりけーしょんずAdvent Calendar 2019

Day 8

Airflowの基本的なところ

Last updated at Posted at 2019-12-07

この記事はぷりぷりあぷりけーしょんず Advent Calendar 2019の8日目の記事です。

#Airflowとは
Airbnbが開発元のOSSのワークフローエンジンです。
Airflowのサイトには

Airflow is a platform created by community to programmatically author,
schedule and monitor workflows.

「Airflowはワークフローをスケジュールおよび監視するためにプログラマーのコミュニティによって作成されたプラットフォームです。」的なことが書いてある気がします。そんな気がしてるだけです。

Airflowの仕組み

Airflowは一つのワークフローを一つのDAGによって定義していきます。
DAGの中に複数のタスクを定義し、そのタスク同士の依存関係を設定することで一つのワークフローが出来上がります。

DAGの定義

DAGオブジェクトをインスタンス化することでDAGの定義をします。

dag.py
from airflow.models import DAG
from datetime import datetime

default_args = {
    'owner': 'airflow'
}

dag = DAG(
    dag_id='DAGの名前',
    default_args=default_args,
    catchup=True,
    start_date=datetime(2019, 12, 8),
    schedule_interval='@hourly'
)
Parameters type 😇
dag_id str DAGの一意の識別子
default_args dict オペレーターに渡す引数
(オペレーターは次に出てきます)
catchup bool 前回実行時点まで遡って実行するかどうか
start_date datetime.datetime スケジューラーが遡って実行を試みるタイムスタンプ
schedule_interval datetime.timedelta
または
dateutil.relativedelta.relativedelta
スケジュールの実行間隔
(schedule_interval=’30 18 * * *’ のようにcron式でも書ける)

他にも色々なパラメーターがあります。こちらでDAGのパラメーターを確認できます。
https://airflow.readthedocs.io/en/stable/_api/airflow/models/dag/index.html

タスクの定義

タスクはOperatorをインスタンス化するときにタスクが生成されます。
各Operatorのコンストラクターに引数を渡すのですが、各Operatorで共通の引数などはdefault_argsに定義することで素敵になります。

Operatorはここから探せます。
https://airflow.readthedocs.io/en/stable/_api/airflow/contrib/operators/index.html

dag.py
from airflow.models import DAG, Variable # VariableはGUI上からトークンなどを設定できる
from airflow.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'retries': 1
}

dag = DAG(
    dag_id='DAGの名前',
    default_args=default_args,
    catchup=True,
    start_date=datetime(2019, 12, 8),
    schedule_interval='@hourly'

sql = "SELECT * FROM sample_table"

mysql_to_gcs = MySqlToGoogleCloudStorageOperator(
        task_id='タスク名',
        mysql_conn_id='GUIで設定したconn_idを使用',
        gcp_conn_id='GUIで設定したconn_idを使用',
        bucket=Variable.get('sample_gcs_bucket'),
        ensure_utc=True  # defaultでFalseになってますので必要に応じて設定
        sql=sql,
        filename=filename, # どこかで定義する(割愛)
        schema_filename=schema_filename, # どこかで定義する(割愛)
        dag=dag)

gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id='タスク名',
        retries=2,
        gcp_conn_id='GUIで設定したconn_idを使用',
        bucket=Variable.get('sample_gcs_bucket'),
        filename=filename, # どこかで定義する(割愛)
        schema_filename=schema_filename, # どこかで定義する(割愛)
        source_objects= source_objects,
        destination_project_dataset_table='sample_table_{{ ds_nodash }}'
        dag=dag)

mysql_to_gcs >> gcs_to_bq  # こんな感じでタスクの依存関係を定義します

こんな感じでmysqlからgcsに、gcsからbigqueryにみたいなことができます。

  • conn_idはGUI上から接続情報を設定できるので、設定した接続情報のidを使用します。
  • Variableはトークンなどを設定するとkey-valueで取得できるのでコードに書きたくない情報はここに設定すると良さそうです。

password, secret, passwd, authorization, api_key, apikey, access_tokenなどの文字列をkeyに設定すると勝手にvalueをマスクしてくれるみたいです。
https://airflow.readthedocs.io/en/stable/ui.html

タスクが優先する順位

  1. 明示的に渡された引数
  2. default_args辞書に存在する値
  3. オペレーターのデフォルト値(存在する場合)

なのでgcs_to_bq ではdefault_argsに設定したretriesよりもを明示的に渡しているretries=2の方が優先されます。

参考にしたサイト

https://airflow.apache.org/docs/stable/
https://analytics.livesense.co.jp/entry/2018/02/06/132842
https://blog.imind.jp/entry/2019/02/08/170332

最後に

ほんとはAirflowのカスタムオペレーターの作成のことを書こうと思って書き始めたのですが、
基本的なことを紹介してたら終わっちゃいました。。
airflowではもともと用意されているオペレーターでは細かい仕様に対応できないのでカスタムオペレーターが作れますという記事を書きたかった。。
書き直す時間ないし。いずれ書きます。

書きました
https://qiita.com/birdmoor23/items/4772e6943ff54088ef5e

しかも最後あたりは、疲れと飽きとカレンダーの期限と飽きで雑になってしまいました。
変なこと言ってたら修正します。

書き方あってるかなぁ。まぁいいか。

12
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?