9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Cloud Composer(Aiflow)を使う時に実施した方が良い設定をまとめました

Last updated at Posted at 2022-12-14

Cloud Composer(Aiflow)を使う時に実施した方が良い設定をまとめました

はじめに

こんにちは、京セラコミュニケーションシステム 宮坂(@kccs_katsuhiro_miyasaka)です。

皆様、Cloud Composerはご存知でしょうか。
Cloud Composer(Apache Airflow)はワークフローをオーケストレーションします。
画面も見慣れると、エラーも参照でき、詳細なログも確認でき便利なツールです。
しかし、少々不便に感じる事もあります。
そんな人向けに便利な設定等々をお伝えできればと思います。
※今回はAirflow V1を前提と致します。

執筆時のComposer(Airflow V1)の最新バージョン
Composer 1.20.1
Airflow 1.10.15
を前提とします。

Airflowのヘッダーカラーを変える

Composer作成直後のAirflow画面は、以下の画面になります。
どこのプロジェクトでも同様の画面になってしまうので、環境の見間違いが発生しないか不安になります。
image.png

そんな方にオススメなのが以下の設定
帯の色を変えて、プロジェクト等の違いを可視化できます。
一目見てわかる事により、誤操作防止を期待できます。
navbar_color

具体的な設定方法は、Airflow構成のオーバーライドを編集することで反映されます。
image.png

以下の形式で指定し、保存します。

項目名
セクション1 webserver
キー1 navbar_color
値1 #rrggb形式で色指定

image.png

すると、帯の色が変わります。
これでプロジェクト設定による差分に気づきやすくなります。
image.png

タスクがゾンビプロセスと判定されてkillされてしまう。

タスクが重く時間がかかる時に、Airflowのメインプロセスに応答を返せないと、タスクが終了させられてしまう事が有ります。
そんな時に以下の値を設定します。

scheduler_zombie_task_threshold

以下のGoogle Cloudのトラブルシューティングにもあります。
ゾンビタスクのトラブルシューティング

初期値は300秒となっており、Taskの実行時間に合わせてセットする必要があります。

項目名
セクション1 scheduler
キー1 scheduler_zombie_task_threshold
値1 設定した時間(秒)

image.png

AirflowのDBにデータが貯まってしまい、DBのレスポンスエラーになる。

突然ですが、Cloud Composer(Airflow)がどういう構成をしているかご存知でしょうか。
image.png
パブリック IP 環境のアーキテクチャより

こういった構成になっています。
この構成でAirflowの実行結果をAirflowのDBに保存します。

しばらくたつと、Airflowの実行履歴が大量に保存されてDBのレスポンスが悪くなります。
すると、AirflowがMySQLとの接続エラーになる事があります。
そんな時のためにメンテナンス用のDAGが用意されていますので、そちらをインストールします。

以下に公開されているので、ダウンロードします。
Airflow データベースをクリーンアップする

何か所か注意点があります。
58行目付近の値を自分で修正する必要があります。

# airflow-db-cleanup
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
START_DATE = airflow.utils.dates.days_ago(1) 
# How often to Run. @daily - Once a day at Midnight (UTC)
SCHEDULE_INTERVAL = "@daily"
# Who is listed as the owner of this DAG in the Airflow Web Server
DAG_OWNER_NAME = "operations"
# List of email address to send email alerts to if this job fails
ALERT_EMAIL_ADDRESSES = []
# Length to retain the log files if not already provided in the conf. If this
# is set to 30, the job will remove those files that arE 30 days old or older.

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
    Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))

具体的には
・タスクの開始日時を指定します。

airflow.utils.dates.days_ago

・実行間隔を指定します。

SCHEDULE_INTERVAL = "@daily"

・何日分のデータを保存しておくか指定します。

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
    Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))

修正したら、保存しDAGを配置します。
「DAフォルダを開く」を選びます。
image.png

バケットが表示されるので、ファイルを配置します。
image.png

配置後しばらくすると反映されます。
image.png

後は期限が来ると実行されます。
実行後は履歴情報が削除されて、DAG Runsの値が小さくなります。
image.png

DAGのリトライ設定について

システム運用時に処理が失敗時にリトライをしたいケースが有ります。
何回リトライするのか、事前に決められない時に設定値に持たせると便利です。
具体的には、DAGの共通引き数を定義されると思います。

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

ここに以下の設定を入れます。

#modelsのimport
from airflow import models
#---中略
default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': int(models.Variable.get('retries')),
    'retry_delay': timedelta(minutes=int(models.Variable.get('retry_delay'))
}

そして、adminのValiablesを指定します。
image.png

そして、キーを作るとリトライ設定を外からセットする事ができます。
image.png

これによって、処理失敗の傾向を見ながら後から、リトライの設定を入れる事ができるようになります。

終わりに

Composerを使い始めて、苦悩してきたことを上記のような方法で解決してきました。
上記以外の解決方法や、便利な使い方などありましたら、是非コメント頂けますでしょうか。

9
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?