Cloud Composer(Aiflow)を使う時に実施した方が良い設定をまとめました
はじめに
こんにちは、京セラコミュニケーションシステム 宮坂(@kccs_katsuhiro_miyasaka)です。
皆様、Cloud Composerはご存知でしょうか。
Cloud Composer(Apache Airflow)はワークフローをオーケストレーションします。
画面も見慣れると、エラーも参照でき、詳細なログも確認でき便利なツールです。
しかし、少々不便に感じる事もあります。
そんな人向けに便利な設定等々をお伝えできればと思います。
※今回はAirflow V1を前提と致します。
執筆時のComposer(Airflow V1)の最新バージョン
Composer 1.20.1
Airflow 1.10.15
を前提とします。
Airflowのヘッダーカラーを変える
Composer作成直後のAirflow画面は、以下の画面になります。
どこのプロジェクトでも同様の画面になってしまうので、環境の見間違いが発生しないか不安になります。
そんな方にオススメなのが以下の設定
帯の色を変えて、プロジェクト等の違いを可視化できます。
一目見てわかる事により、誤操作防止を期待できます。
navbar_color
具体的な設定方法は、Airflow構成のオーバーライドを編集することで反映されます。
以下の形式で指定し、保存します。
項目名 | 値 |
---|---|
セクション1 | webserver |
キー1 | navbar_color |
値1 | #rrggb形式で色指定 |
すると、帯の色が変わります。
これでプロジェクト設定による差分に気づきやすくなります。
タスクがゾンビプロセスと判定されてkillされてしまう。
タスクが重く時間がかかる時に、Airflowのメインプロセスに応答を返せないと、タスクが終了させられてしまう事が有ります。
そんな時に以下の値を設定します。
scheduler_zombie_task_threshold
以下のGoogle Cloudのトラブルシューティングにもあります。
ゾンビタスクのトラブルシューティング
初期値は300秒となっており、Taskの実行時間に合わせてセットする必要があります。
項目名 | 値 |
---|---|
セクション1 | scheduler |
キー1 | scheduler_zombie_task_threshold |
値1 | 設定した時間(秒) |
AirflowのDBにデータが貯まってしまい、DBのレスポンスエラーになる。
突然ですが、Cloud Composer(Airflow)がどういう構成をしているかご存知でしょうか。
パブリック IP 環境のアーキテクチャより
こういった構成になっています。
この構成でAirflowの実行結果をAirflowのDBに保存します。
しばらくたつと、Airflowの実行履歴が大量に保存されてDBのレスポンスが悪くなります。
すると、AirflowがMySQLとの接続エラーになる事があります。
そんな時のためにメンテナンス用のDAGが用意されていますので、そちらをインストールします。
以下に公開されているので、ダウンロードします。
Airflow データベースをクリーンアップする
何か所か注意点があります。
58行目付近の値を自分で修正する必要があります。
# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1)
# How often to Run. @daily - Once a day at Midnight (UTC)
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
具体的には
・タスクの開始日時を指定します。
airflow.utils.dates.days_ago
・実行間隔を指定します。
SCHEDULE_INTERVAL = "@daily"
・何日分のデータを保存しておくか指定します。
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
修正したら、保存しDAGを配置します。
「DAフォルダを開く」を選びます。
後は期限が来ると実行されます。
実行後は履歴情報が削除されて、DAG Runsの値が小さくなります。
DAGのリトライ設定について
システム運用時に処理が失敗時にリトライをしたいケースが有ります。
何回リトライするのか、事前に決められない時に設定値に持たせると便利です。
具体的には、DAGの共通引き数を定義されると思います。
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
ここに以下の設定を入れます。
#modelsのimport
from airflow import models
#---中略
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': int(models.Variable.get('retries')),
'retry_delay': timedelta(minutes=int(models.Variable.get('retry_delay'))
}
そして、キーを作るとリトライ設定を外からセットする事ができます。
これによって、処理失敗の傾向を見ながら後から、リトライの設定を入れる事ができるようになります。
終わりに
Composerを使い始めて、苦悩してきたことを上記のような方法で解決してきました。
上記以外の解決方法や、便利な使い方などありましたら、是非コメント頂けますでしょうか。