13
8

More than 3 years have passed since last update.

Cloud Composer入門のために公式ドキュメント読んだメモ

Posted at

前回 Airflowの公式ドキュメントをまとめた。
今回は Cloud Composer。

公式ドキュメント

公式Doc

QuickStart

QuickStart

  • これで実際Cloud Composerにリリースしてみて雰囲気つかむ
  • Dagの作り方や、リリース方法、ログの確認方法など基本的な流れわかる

アクセス環境

Airflow コマンドライン インターフェース

Cloud ComposerでAirflowのコマンドラインの実行方法

DAG(ワークフロー)の管理

DAG(ワークフロー)の作成

  • 基本的にはDAGの作成方法
  • その他、Cloud Composerのpackageの情報など

    • Cloud ComposerのContainerで実行できるshellコマンドpackage一覧
      • gcloud
      • bq
      • gsutil
      • kubectl
    • Cloud ComposerのContainerが持っている python package一覧
      • google-cloud-bigquery
      • google-cloud-dataflow
      • google-cloud-storage
      • pandas
      • pandas-gbq
      • tensorflow
      • tensorflow-transform
  • 追加でpython packageを追加したい -> こちら

  • よくある質問が使えそう

DAG(ワークフロー)の追加と更新

追加と更新
  • GCSにDAGファイルをuploadすればいい
  • 以下のgcloudコマンドでgcsにuploadするのが楽
gcloud composer environments storage dags import --environment ENVIRONMENT_NAME --location LOCATION --source LOCAL_FILE_TO_UPLOAD
削除
  • 基本GCSからDAGファイルを削除すればok
    • airflow 1.10.0以降であれば、removing dagコマンドで消すと、メタデータも消せる

gcloud composer environments run --location LOCATION ENVIRONMENT_NAME delete_dag -- DAG_NAME

カスタムプラグイインのインストール

  • カスタムプラグインとは
    • 自作のライブラリ。hooks, operators, sensors, macros, executorsなどをairflow.plugins_manager.AirflowPlugin 継承してつくる。
    • GCSの ${BUCKET}/plugins ディレクトリに配置すればok

Python 依存関係のインストール

  • pipなどで pythonライブラリを追加したい場合
  • GCLOUDで requests.txt を指定して更新するのが良さそう

gcloud composer environments update ENVIRONMENT-NAME --update-pypi-packages-from-file requirements.txt --location LOCATION

DAGのトリガー

  • イベントベースでCloud function経由でトリガーできたりする

DAGのテスト

  • 本番のDAGとしてデプロイする前に動作確認する方法が説明される
python packageが追加された時の確認方法
  • 直接airflowのworkerに入って、pip install して問題ないかを確かめる
  • その後、pip uninstallする
  • 本当にこんな方法でいいんだろうか...
syntax error チェック
  • GCSの ${BUCKET}/data/test ディレクトリに作ったdagファイルをアップロード
  • gcloud composer environments を使ってairflow list_dags を実行する from ローカル
gcloud composer environments run ENVIRONMENT_NAME --location LOCATION list_dags -- -sd /home/airflow/gcs/data/test
task error チェック
  • GCSの ${BUCKET}/data/test ディレクトリに作ったdagファイルをアップロード
  • gcloud composer environments を使ってairflow test を実行する from ローカル

gcloud composer environments run ENVIRONMENT_NAME \
  --location LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

DAGのトラブルシューティング

  • ログの見方

    • Airflow logs: DAGのtaskに関連するもの。 GCS かweb viewから確認できる
    • Streaming log: Airflow logsの上位となるログ。 StackdriverやLogs viewerから観れる。
  • ログが出ないで失敗している場合はOOMを疑う

gcloud composer コマンド

  • gcloud コマンドの解説

コンセプト

Cloud Composerの概要

  • Apache Airflowを元に構築されたフルマネージドワークフローオーケストレーション
環境(environment)
  • Airflowを分散設定でデプロイするための、いくつかのGCPコンポーネント
  • GKEに基づく
アーキテクチャ

スクリーンショット 2019-11-07 14.23.31.png

  • customer projecttenant project(マネージドサービスリソース) に分かれる
  • customer project
    • gcs
    • gke
    • stackdriver
  • tenant project
    • cloud sql
      • airflowのメタデータ
      • 毎日バックアップ
    • app engine
      • web server
      • Cloud IAPを使って、IAMの権限をサーバーアクセス管理に使える
構成情報
  • 環境用のパラメータは一度使ったら変更できない
  • *-composer-* のpubsubトピックは消しちゃダメ
  • Airflow コマンド resetdb、initdb、upgradedb を発行すると、DB変わるので行わない

機能

環境
  • Cloud Composerは Apache Airflowを基盤としてラッパー
  • 3つのコンポーネントを持つ
    • ウェブサーバー: web view
    • データベース: メタデータの保持
    • Cloud Storage: dag, log, plugin などが保存される

Gloud Storageに保存されるデータ

DAG
  • 環境の DAG を保存します。このフォルダ内の DAG のみが環境にスケジュールされます。
  • path: gs://bucket-name/dags
プラグイン
  • カスタム プラグインを保存します。カスタムのインハウス Airflow 演算子、フック、センサー、インターフェースなどです。
  • path: gs://bucket-name/plugins
データ
  • タスクが生成して使用するデータを保存します。このフォルダは、すべてのワーカーノードにマウントされます。
  • path: gs://bucket-name/data
ログ
  • タスクの Airflow ログを保存します。ログは Airflow ウェブ インターフェースでも利用できます。
  • path: gs://bucket-name/logs
13
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
8