前回 Airflowの公式ドキュメントをまとめた。
今回は Cloud Composer。
公式ドキュメント
公式Doc
QuickStart
QuickStart
- これで実際Cloud Composerにリリースしてみて雰囲気つかむ
- Dagの作り方や、リリース方法、ログの確認方法など基本的な流れわかる
アクセス環境
Airflow コマンドライン インターフェース
Cloud ComposerでAirflowのコマンドラインの実行方法
DAG(ワークフロー)の管理
DAG(ワークフロー)の作成
-
基本的にはDAGの作成方法
-
その他、Cloud Composerのpackageの情報など
- Cloud ComposerのContainerで実行できるshellコマンドpackage一覧
- gcloud
- bq
- gsutil
- kubectl
- Cloud ComposerのContainerが持っている python package一覧
- google-cloud-bigquery
- google-cloud-dataflow
- google-cloud-storage
- pandas
- pandas-gbq
- tensorflow
- tensorflow-transform
- Cloud ComposerのContainerで実行できるshellコマンドpackage一覧
-
追加でpython packageを追加したい -> こちら
-
よくある質問が使えそう
DAG(ワークフロー)の追加と更新
追加と更新
- GCSにDAGファイルをuploadすればいい
- 以下のgcloudコマンドでgcsにuploadするのが楽
gcloud composer environments storage dags import --environment ENVIRONMENT_NAME --location LOCATION --source LOCAL_FILE_TO_UPLOAD
削除
- 基本GCSからDAGファイルを削除すればok
- airflow 1.10.0以降であれば、removing dagコマンドで消すと、メタデータも消せる
gcloud composer environments run --location LOCATION ENVIRONMENT_NAME delete_dag -- DAG_NAME
カスタムプラグイインのインストール
-
カスタムプラグインとは
- 自作のライブラリ。hooks, operators, sensors, macros, executorsなどを
airflow.plugins_manager.AirflowPlugin
継承してつくる。 - GCSの
${BUCKET}/plugins
ディレクトリに配置すればok
- 自作のライブラリ。hooks, operators, sensors, macros, executorsなどを
Python 依存関係のインストール
- pipなどで pythonライブラリを追加したい場合
- GCLOUDで requests.txt を指定して更新するのが良さそう
gcloud composer environments update ENVIRONMENT-NAME --update-pypi-packages-from-file requirements.txt --location LOCATION
- ワーカーポッドへの接続方法などが詳しく載ってる -> Flower ウェブ インターフェースに接続する
DAGのトリガー
- イベントベースでCloud function経由でトリガーできたりする
DAGのテスト
- 本番のDAGとしてデプロイする前に動作確認する方法が説明される
python packageが追加された時の確認方法
- 直接airflowのworkerに入って、pip install して問題ないかを確かめる
- その後、pip uninstallする
- 本当にこんな方法でいいんだろうか...
syntax error チェック
- GCSの
${BUCKET}/data/test
ディレクトリに作ったdagファイルをアップロード -
gcloud composer environments
を使ってairflow list_dags
を実行する from ローカル
gcloud composer environments run ENVIRONMENT_NAME --location LOCATION list_dags -- -sd /home/airflow/gcs/data/test
task error チェック
- GCSの
${BUCKET}/data/test
ディレクトリに作ったdagファイルをアップロード -
gcloud composer environments
を使ってairflow test
を実行する from ローカル
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
test -- -sd /home/airflow/gcs/data/test DAG_ID \
TASK_ID DAG_EXECUTION_DATE
DAGのトラブルシューティング
-
ログの見方
- Airflow logs: DAGのtaskに関連するもの。 GCS かweb viewから確認できる
- Streaming log: Airflow logsの上位となるログ。 StackdriverやLogs viewerから観れる。
-
ログが出ないで失敗している場合はOOMを疑う
gcloud composer コマンド
- gcloud コマンドの解説
コンセプト
Cloud Composerの概要
- Apache Airflowを元に構築されたフルマネージドワークフローオーケストレーション
環境(environment)
- Airflowを分散設定でデプロイするための、いくつかのGCPコンポーネント
- GKEに基づく
アーキテクチャ
-
customer project
とtenant project(マネージドサービスリソース)
に分かれる - customer project
- gcs
- gke
- stackdriver
- tenant project
- cloud sql
- airflowのメタデータ
- 毎日バックアップ
- app engine
- web server
- Cloud IAPを使って、IAMの権限をサーバーアクセス管理に使える
- cloud sql
構成情報
- 環境用のパラメータは一度使ったら変更できない
-
*-composer-*
のpubsubトピックは消しちゃダメ - Airflow コマンド resetdb、initdb、upgradedb を発行すると、DB変わるので行わない
機能
環境
- Cloud Composerは Apache Airflowを基盤としてラッパー
- 3つのコンポーネントを持つ
- ウェブサーバー: web view
- データベース: メタデータの保持
- Cloud Storage: dag, log, plugin などが保存される
Gloud Storageに保存されるデータ
DAG
- 環境の DAG を保存します。このフォルダ内の DAG のみが環境にスケジュールされます。
- path:
gs://bucket-name/dags
プラグイン
- カスタム プラグインを保存します。カスタムのインハウス Airflow 演算子、フック、センサー、インターフェースなどです。
- path:
gs://bucket-name/plugins
データ
- タスクが生成して使用するデータを保存します。このフォルダは、すべてのワーカーノードにマウントされます。
- path:
gs://bucket-name/data
ログ
- タスクの Airflow ログを保存します。ログは Airflow ウェブ インターフェースでも利用できます。
- path:
gs://bucket-name/logs