Cloud Composer が正常に動作していることを確認する仕組みを作るときにハマったポイントが有るためメモも兼ねて記事にしました。
やりたいこと
- Cloud Composer のジョブが起動していないことを検出する
- Cloud Composer のジョブが異常終了していることを検出する
特に起動していないことを検出する観点があることに注意が必要です。
エラーが発生していることを検出するだけなら Cloud Logging への出力でアラートを上げればよいのですが、そもそもジョブが起動しなければ Cloud Logging へも出力されないため、Cloud Composer の外から起動できているか確認する必要があります。
やったこと
Cloud Functions でジョブの起動履歴を取得して、特定の時間以降の起動がなければエラーを吐く実装をしました。
ハマったこと
ジョブの起動履歴を取得するときに、 Airflow の API を呼び出す方法を考えました。
List DAG runs API を叩くことで実行履歴を取得できます。
ただし Cloud Functions から Airflow の API を叩いて実行履歴を取得するには Cloud Functions の使用するサービスアカウントを Airflow ユーザーとして登録し権限を与える必要があります。
この登録する方法が分からずドキュメントを探し回りました。
解決方法
Cloud Composer の サービス アカウントを使用して Airflow REST API にアクセスする のページに書いてありました。
次のコマンドを実行します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
users create -- \
-u accounts.google.com:NUMERIC_USER_ID \
-e UNIQUE_ID \
-f UNIQUE_ID \
-l - -r Op --use-random-password
accounts.google.com:NUMERIC_USER_ID
には Airflow の API を叩く Functions の NUMERIC_USER_ID を入れます。
NUMERIC_USER_ID は次のコマンドで取得可能です。
gcloud iam service-accounts describe \
SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
--format="value(oauth2ClientId)"
これで Cloud Functions のサービスアカウントを Airflow ユーザーとして登録できました。
まとめ
- Airflow の List DAG runs API でジョブの起動履歴を取得できる
- Airflow の API を叩くには Airflow ユーザーの登録と権限付与が必要
- 登録と権限付与は gcloud コマンドで可能
実装した感想
最短の手順は上記の通りですが、先述の通りドキュメントを探し回ってようやく最短手順がわかりました。
やったこととしては
- 「セキュリティのタブからユーザー登録可能」という記述を見つけてユーザー登録を試みる
- そもそもセキュリティのタブが表示されていないことを確認した
- 表示するに Airflow ユーザーの Admin 権限を必要とするがわかった
- それには Airflow UI にユーザーを登録する という部分を読めばやり方が書いてあった
- ここまできて記事に書いていた方法を見つけた
という感じで遠回りしてしまいました。
もうちょっとドキュメントを探すのうまくなりたい。