この記事はぷりぷりあぷりけーしょんず Advent Calendar 2019の8日目の記事です。
#Airflowとは
Airbnbが開発元のOSSのワークフローエンジンです。
Airflowのサイトには
Airflow is a platform created by community to programmatically author,
schedule and monitor workflows.
「Airflowはワークフローをスケジュールおよび監視するためにプログラマーのコミュニティによって作成されたプラットフォームです。」的なことが書いてある気がします。そんな気がしてるだけです。
Airflowの仕組み
Airflowは一つのワークフローを一つのDAGによって定義していきます。
DAGの中に複数のタスクを定義し、そのタスク同士の依存関係を設定することで一つのワークフローが出来上がります。
DAGの定義
DAGオブジェクトをインスタンス化することでDAGの定義をします。
from airflow.models import DAG
from datetime import datetime
default_args = {
'owner': 'airflow'
}
dag = DAG(
dag_id='DAGの名前',
default_args=default_args,
catchup=True,
start_date=datetime(2019, 12, 8),
schedule_interval='@hourly'
)
Parameters | type | 😇 |
---|---|---|
dag_id | str | DAGの一意の識別子 |
default_args | dict | オペレーターに渡す引数 (オペレーターは次に出てきます) |
catchup | bool | 前回実行時点まで遡って実行するかどうか |
start_date | datetime.datetime | スケジューラーが遡って実行を試みるタイムスタンプ |
schedule_interval | datetime.timedelta または dateutil.relativedelta.relativedelta |
スケジュールの実行間隔 (schedule_interval=’30 18 * * *’ のようにcron式でも書ける) |
他にも色々なパラメーターがあります。こちらでDAGのパラメーターを確認できます。
https://airflow.readthedocs.io/en/stable/_api/airflow/models/dag/index.html
タスクの定義
タスクはOperatorをインスタンス化するときにタスクが生成されます。
各Operatorのコンストラクターに引数を渡すのですが、各Operatorで共通の引数などはdefault_args
に定義することで素敵になります。
Operatorはここから探せます。
https://airflow.readthedocs.io/en/stable/_api/airflow/contrib/operators/index.html
from airflow.models import DAG, Variable # VariableはGUI上からトークンなどを設定できる
from airflow.operators.mysql_to_gcs import MySqlToGoogleCloudStorageOperator
from airflow.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'retries': 1
}
dag = DAG(
dag_id='DAGの名前',
default_args=default_args,
catchup=True,
start_date=datetime(2019, 12, 8),
schedule_interval='@hourly'
sql = "SELECT * FROM sample_table"
mysql_to_gcs = MySqlToGoogleCloudStorageOperator(
task_id='タスク名',
mysql_conn_id='GUIで設定したconn_idを使用',
gcp_conn_id='GUIで設定したconn_idを使用',
bucket=Variable.get('sample_gcs_bucket'),
ensure_utc=True # defaultでFalseになってますので必要に応じて設定
sql=sql,
filename=filename, # どこかで定義する(割愛)
schema_filename=schema_filename, # どこかで定義する(割愛)
dag=dag)
gcs_to_bq = GoogleCloudStorageToBigQueryOperator(
task_id='タスク名',
retries=2,
gcp_conn_id='GUIで設定したconn_idを使用',
bucket=Variable.get('sample_gcs_bucket'),
filename=filename, # どこかで定義する(割愛)
schema_filename=schema_filename, # どこかで定義する(割愛)
source_objects= source_objects,
destination_project_dataset_table='sample_table_{{ ds_nodash }}'
dag=dag)
mysql_to_gcs >> gcs_to_bq # こんな感じでタスクの依存関係を定義します
こんな感じでmysqlからgcsに、gcsからbigqueryにみたいなことができます。
-
conn_id
はGUI上から接続情報を設定できるので、設定した接続情報のidを使用します。 -
Variable
はトークンなどを設定するとkey-value
で取得できるのでコードに書きたくない情報はここに設定すると良さそうです。
password
, secret
, passwd
, authorization
, api_key
, apikey
, access_token
などの文字列をkeyに設定すると勝手にvalueをマスクしてくれるみたいです。
https://airflow.readthedocs.io/en/stable/ui.html
タスクが優先する順位
- 明示的に渡された引数
- default_args辞書に存在する値
- オペレーターのデフォルト値(存在する場合)
なのでgcs_to_bq ではdefault_argsに設定したretries
よりもを明示的に渡しているretries=2
の方が優先されます。
参考にしたサイト
https://airflow.apache.org/docs/stable/
https://analytics.livesense.co.jp/entry/2018/02/06/132842
https://blog.imind.jp/entry/2019/02/08/170332
最後に
ほんとはAirflowのカスタムオペレーターの作成のことを書こうと思って書き始めたのですが、
基本的なことを紹介してたら終わっちゃいました。。
airflowではもともと用意されているオペレーターでは細かい仕様に対応できないのでカスタムオペレーターが作れますという記事を書きたかった。。
書き直す時間ないし。いずれ書きます。
書きました
→ https://qiita.com/birdmoor23/items/4772e6943ff54088ef5e
しかも最後あたりは、疲れと飽きとカレンダーの期限と飽きで雑になってしまいました。
変なこと言ってたら修正します。
書き方あってるかなぁ。まぁいいか。