皆さんこんにちは。@best_not_bestです。
今年はCloud Composerを利用することが多かったため、そこで得たTipsをまとめたいと思います。
なお、以下記載のコードはPython 3での動作を想定しております。
サポート対象のPythonバージョン
2.7.15と3.6.6となります。
https://cloud.google.com/composer/docs/concepts/python-version
タスク失敗時のロギング/Monitoring検知
タスクが失敗した時のロギングへのログの落ち方がなかなか分かりにくいです。
AirflowException
をraise
するタスクを用意し、これを各タスクの後にone_failed
で実行させ、この時のメッセージを検知するのがシンプルかなと思います。
サンプルプログラム
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""sample."""
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator
def output_error():
"""output error."""
raise AirflowException('task failed.')
# (省略)
with models.DAG(
dag_id='sample_001',
# (省略)
) as dag:
task_1 = # (省略)
task_2 = # (省略)
error_task = PythonOperator(
task_id='error_task',
python_callable=output_error,
retries=0,
trigger_rule='one_failed',
dag=dag,
)
task_1 >> error_task
task_2 >> error_task
task_1 >> task_2
ロギングでの指標
以下で指標を作成します。
resource.type="cloud_composer_environment"
logName=projects/<プロジェクトID>/logs/airflow-worker
labels.workflow:sample_001
("task failed.")
Monitoringでのアラートポリシー
上記指標をターゲットとしてアラートポリシーを作成します。
エラーログ
ロギングでは少々分かりにくいので、Composerバケット配下のログが分かりやすいかと思います。
タスクの考え方
PythonOperator
である程度自由に処理を記述できますが、用意されているOperatorを極力使った方が処理速度が速いです。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/contrib/operators/index.html
例えばBigQuery内のデータをCloud StorageにCSVファイル出力する処理であれば、
-
PythonOperator
でBigQueryのデータをクエリで取得 -
PythonOperator
で取得したデータをCSVファイルにしてローカルに保存 -
PythonOperator
でCSVファイルをCloud Storageにコピー
ではなく
-
BigQueryOperator
でクエリ実行結果をBigQuery上にテーブル保存 -
BigQueryToCloudStorageOperator
でテーブル内のデータをCloud Storageに出力
の方が、BigQueryやCloud Storage側で処理が行われるため実行時間が速くなります。
(前者はデータ数によっては処理が終わらないことも・・・。)
VPC Service Controls
境界内に置くことは可能ですが、Pythonパッケージのインストールや外部システムとの連携の際の設定が複雑になる、等の理由から個人的には利用していません。
BigQueryやCloud Storageだけでも境界内に置きたいという場合は、
- BigQueryやCloud Storageを扱うプロジェクトA(境界内)
- Cloud Composerを扱うプロジェクトB(境界外)
のようにプロジェクトを分け、Cloud Composerを実行しているサービスアカウントからのアクセスをプロジェクトAで許可するようにすると後々楽かと思います。
また余談ですが、セキュリティ向上のためとはいえFWルールで下りを全遮断してしまうと、スケジューラ等が動かなくなるのでご注意ください。(Tenant Projectと通信しているため?)
可能な限り下りは開けたままの方が良いかと思います。
ヘルスチェック
以下ご参考ください。
https://cloud.google.com/composer/docs/tutorials/health-check
まとめ
- タスク失敗検知は検知用のタスクを用意する
- 用意されているOperatorを極力使いましょう
- 過度なVPC Service ControlsやFWルールでのアクセス制限は、保守運用を複雑にする場合もある
以上、ご参考になれば幸いです!