この記事はBrainpad Advent Calendar 2018の16日目の記事です。
#はじめに
こんにちは。BrainPadでエンジニアをしている高橋です。
今年4月にBrainPadに中途入社して自社サービスの開発をしています。
現在開発中の機能のワークフロー管理にCloud Composerを採用しているため、ここ3ヶ月はAirflowのOperatorに触れてきました。その中で利用したOperatorの一部について今回は書きたいと思います。
#Cloud Composer(Airflow)について
まずCloud Composerについて説明します。
Cloud ComposerはGCP(Google Cloud Platform)が提供するマネージドなAirflowです。
Airflowの詳細については今回省略しますが、簡単に言うとワークフローを管理するOSSです。
Cloud Composerがリリースされるまでは、Airflowを利用するために複雑なインフラの管理をしなければならなかったようです。(composerからしか使ってないので詳しくはわかりませんが・・・)
しかし、Cloud Composerが出てからはGCPでインフラの管理を行ってくれるためワークフローの作成、管理に集中できるようになりました。また最近ではβ版ではありますが、airflowのversion1.10.0を利用できたりPython3で実装できるようになりました。
#Operatorについて
続いてOperatorについて説明します。
Opeartorはプログラムを実行するためのテンプレートになります。
たとえば、ワークフローの中でbashコマンドを実行したい場合はBashOperatorを用います。
Operatorを使用するためには、まずOperatorをTaskとして定義してDAGにTask Instanceとして当てはめてあげる必要があります。
以下よりBashOperatorを用いてOperatorの扱い方をみていきたいと思います。
1.DAGオブジェクトの生成
from datetime import datetime
from airflow import models
from airflow.operators import bash_operator
default_dag_args = {
'start_date': datetime(2018, 12, 16), # ジョブの開始日時
'retries': 0, # task失敗時のretryの回数
}
with models.DAG(
'BashOperator test',
schedule_interval=None, # cron形式で記述するscheduleの実行時間(今回はスケジュールを設定しない)
default_args=default_dag_args) as dag:
BashOperatorを利用するのでまずはじめにbash_operatorをインポートします。次にDAGオブジェクトを生成しますが、今回はOperatorの紹介なのでDAGオブジェクトに関しての詳細は飛ばします。DAGの詳細やDAGの引数について知りたい方はドキュメントを参照してください。
#####2.Operatorインスタンスの作成し依存関係を記述
task1 = bash_operator.BashOperator(
task_id='echo', # taskの識別子
bash_command="echo Start task"
)
task2 = bash_operator.BashOperator(
task_id='sleep',
bash_command='sleep 5'
)
task1 >> task2 # operatorインスタンス間の依存関係を記述
上記のようにOpeartorを記述します。まずtask1についてですが、BashOperatorを使って”Start task”と表示させるだけのシンプルなタスクです。task_idはタスクにつける識別子でUI画面上ではこの識別子で表示されます。bash_commandには、実行したいbashコマンドを記述します。echoを使って”Start task”と表示させたいのでbash_commandにechoコマンドを代入します。
task2に関しても同様にtask_idを与え、sleepコマンドを実行したいのでbash_commandにsleepコマンド代入します。
そして最後にtask同士の依存関係を記述します。今回はtask1が実行された後、task2を実行したいので上記のように<<(シフト演算子)を用いて記述します(そのほかにも記述方法はありますが今回は省略します)。非常にシンプルですね。
例としてBashOperatorを用いましたが、基本的にOperatorの実装方法は統一されているのでどのOperatorを使用しても同じように扱うことができます。
#実行してみる
Cloud Composerでは、Composerの環境を作成すると、GCSのバケットが同時に作成されます。
バケットの中にdagsとういうフォルダが作成されるので、その下に先ほど作成したbash_operator_demo.pyをアップロードします。
すると・・・
DAGが作成されました。非常に簡単ですね!
airflowのUIにはGraph Viewという機能がついておりタスクの依存関係を簡単に確認することができます。
bash_operator_demo.pyで記述した通り、echoタスク → sleepタスクの順番になっていますね。
早速実行してみましょう。今回はスケジュールの設定はしていないので手動でDAGを実行します。手動で実行する場合はDAG一覧表示画面のLinksの一番左にある再生ボタンのようなものを押すと実行できます。
実行されていますね!
実行されるとDAG RUNSの丸部分に実行数が表示され丸枠にステータスを表す色がつきます。それぞれの色の意味は下の図のようになっています。
実行が無事成功すると下の図のように表示されます。
また過去の履歴やステータスを確認す流のにはTreeViewという機能が便利です。
複数回分の実行ステータスが見れて便利ですね!
bashコマンドの実行結果かなどは機会があればお手元の環境で動かして結果を見てみてください!
以上がざっくりとしたOperatorの扱い方から実行方法でした。
(本題)3ヶ月で触ってみたOperatorについて
前置きが長くなってしまいましたが、3ヶ月で触ってみた一部のOperator紹介をします。
今回紹介するOperatorは、BranchPythonOperator、TriggerDagRunOperator、触ってみたけど動かなかったOperatorについて紹介したいと思います。
BranchPythonOperator
BranchPythonOperatorはpythonの条件式をもとに次に実行するタスクを判定するOperatorになります。
実際に扱ってみましょう!
from airflow import models
from airflow.operators import python_operator, bash_operator
'''
省略
'''
def hoge_or_fuga(**kwargs):
hoge_or_fuga = kwargs['dag_run'].conf['word'] # DAGを実行時に渡した引数を取得
if hoge_or_fuga == 'hoge':
return 'hoge_task' # hogeであればtask_idがhoge_taskのものを実行
elif hoge_or_fuga == 'fuga':
return 'fuga_task' # fugeであればtask_idがfuge_taskのものを実行
else:
return
with models.DAG(
'branch_python_operator',
schedule_interval=None,
default_args=default_dag_args) as dag:
task1 = python_operator.BranchPythonOperator(
task_id='branch_python',
provide_context=True, # トリガーするときに引数を渡すのでTrueにする
python_callable=hoge_or_fuga # 実行したいpythonの関数
)
hoge_task = bash_operator.BashOperator(
task_id='hoge_task',
bash_command='echo hoge'
)
fuga_task = bash_operator.BashOperator(
task_id='fuga_task',
bash_command='echo fuge'
)
task1 >> hoge_task
task1 >> fuga_task
BranchPythonOperatorは上記のように扱えます。まず、実行したい処理を関数で定義してpython_callableに代入します。ここまではPythonOperatorと一緒ですね。
BranchPythonOperatorは関数の中で次に実行したいtask_idを返してあげると、task_idと一致するタスクが実行され。一致しないタスクはスキップされます。
実際に実行してみましょう。今回は引数を渡してDAGを実行したいのでgcloudコマンドを使ってkeyに'word', Valueに'hoge'を設定してDAGを実行します。GCPのサービスなので手軽にcloud shellから実行できます。
hogeを渡すのでbranch_pythonの後にhoge_taskが実行されることが期待されます。
実行結果のGraphViewをみてみましょう。
hoge_taskが実行され、fuga_taskがスキップされているのが確認できますね。また、hogeとfugaどちらにも当てはまらない引数を与えるとNoneを返すようにしています。Noneが返されると後続タスクは全てスキップされます。このようにしてBranchPythonOperatorを使ってワークフローの条件分岐を作ることが可能です。
分岐などはなくただ単に条件によって後続のタスクを実行するかスキップするか決めたいときはShortCircuitOperatorというOperatorもありますので使い分けることによって処理の内容をOperatorによって明確にすることができます。
TriggerDagRunOperator
次にTriggerDagRunOperatorについてみていきます。TriggerDagRunOperatorは名前のままですが、指定したdag_idのDAGを実行するためのOperatorです。指定したDAGを実行する際に先ほどのgcloudコマンドと同じように値を渡すことが可能です。BranchPythonOperatorとTriggerDagOperatorを合わせて実行してみましょう。まずはDAGを実装します。TriggerDagRunOperatorだけだとworkflowの感じが出ないので(個人の好みですが)最初にDummyOperatorをかませます(名前の通り何もしないダミーのOperator)。
from airflow import models
from airflow.operators import python_operator, bash_operator, dummy_operator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
'''
省略
'''
def trigger_dag(context, dag_run_obj):
next_dag_id = 'fuga'
dag_run_obj.payload = {'word': next_dag_id}
return dag_run_obj
with models.DAG(
'trigger_dag',
schedule_interval=None,
default_args=default_dag_args) as dag:
dummy_task = dummy_operator.DummyOperator(task_id='start')
trigger_dag_task = TriggerDagRunOperator(task_id='trigger_dag',
python_callable=trigger_dag,
trigger_dag_id='branch_python_operator',
provide_context=True)
dummy_task >> trigger_dag_task
TriggerDagRunOperatorは他のOperatorと違い関数の引数にcontext, dag_run_objを入れなければなりません。そして、dag_run_objを返却することにより次のタスクに値を渡すことができます。あとは実行したいDAG名をtrigger_dag_idに代入します。実行してbranch_python_operatorが実行されるかみてみましょう。branch_python_operatorが実行されfuga_taskが実行されることが想定できます。
2.trigger_dagからbranch_python_operatorの実行
trigger_dagのタスクが全てsuccessになったところで上のbranch_python_operatorが実行されているのが確認できます。
fuga_taskが実行されhoge_taskがスキップされているのが確認できます。想定通りに動作しました。
用途例としては、Cloud Pub/Subなどから情報をポーリングしてその情報をもとにTriggerDagRunOperatorでDAGを実行すればスケジュールをたくさん設定する必要がなく管理が楽になることもあるので非常に便利なOperatorだと思います。
触ってみたけど動かなかったOperator
動かなかったというと語弊がありますが、実際にtaskを作って実行したけどエラーが解消できなかったOperatorなどもあります。
####1.BigQueryOperator
BigQueryOperatorに関しては、Cloud Composerのドキュメントに乗っているので使えないはずはないと思われるかもしれませんが、リージョンによります。BigQueryOperatorの中で、BigQueryのステータスを取得する部分があるのですが、その部分でリージョンを指定できないためUSリージョンをみにいってしまいます。結果としてBigqueryの実行は成功しますが、ステータスを取得できないのでComposerのtaskは失敗してしまいます。また、検証したのは2ヶ月ほど前ですが、DataflowOperatorに関しても同じことが起きていました。airflowの問題なのでOSSにコントリビュートするいい機会かもしれません。
#まとめ
cloud composerは実装が非常に簡単でわかりやすいと思います。今回はほんの少ししか紹介できませんでしたが、airflowにはたくさんのOperatorがあります。うまく動作しないOperatorや物足りないOperatorなどあるかもしれませんが、OSSなのでどんどんコミットしていきましょう!長い時間お付き合いいただきありがとうございました。
#参考
ドキュメント
Airflow を用いたデータフロー分散処理
airflowを用いて、 複雑大規模なジョブフロー管理 に立ち向かう
おしまい