LoginSignup
0
0

More than 1 year has passed since last update.

GCPの実行異常クラスタを通知する方法

Last updated at Posted at 2022-11-05

概要

実行異常でクラスタが存在し続けることがあります。
未発見の場合、ずっと実行がなされるので異常な料金が発生します。

そこで、定期的に実行の確認をするために、Cloud Functionsを活用して
ログを抽出して存在すればslackチャンネルを通知する というシステムを構築しました。

gcp.jpg

対応方法

前提

Webhook URL?

今回このslackへの通知に使用する「Webhook URL」についての解説

・アプリケーションの更新情報を他のアプリケーションへ
リアルタイム提供する仕組みや概念

・イベント(リポジトリにプッシュなど)発生時
指定した**URLにPOSTリクエストする仕組み

Webhookの説明を見ると**「通知する、Webhookを送る」などの言葉が用いられているが、
これはPOSTリクエストのことを指している**。

Webhookを利用すると何ができるのか?
GitHubやSlackなどのサービスのWebhookを利用すると、ユーザーはサービス側がPOSTリクエストするURLを指定可能

手順

1: Cloud Scheduler に定期実行スケジュールなどを登録
2: Cloud Scheduler が Cloud Pub/Subトピックにメッセージを送信(←トリガーの役割)
3: Cloud Functions が起動
4: Cloud Functions内でSlackに投稿

アーキテクチャ

Cloud Scheduler > Cloud Pub/Sub > Cloud Functions > slack

Cloud Scheduler & Pub/Subを設定

◼︎[Cloud Scheduler]
スケジューラーからもCloud Pub/Subの設定が可能。

設定方法は省略

実装内容

一定時間ごとに下記コードが実行

実装概要

異常起動実行を確認して一定時間以上起動していると
異常と認識してslaackへ通知

Cloud Functions

◼︎必要モジュールファイル

ファイル名:requirements.txt
  必要なモジュールなどバージョンのインポートするバージョンなどを記載してファイル保存
ブログ参照

◼︎実行コード
ファイル名:main.py

from google.cloud import proc
import datetime
import dateutil.parser
import requests
import base64
import os
import json


def extract(x):
    return (x.cluster_name, x.status.state, x.status.state_start_time)

def me2(y, t_limit):
    return y[2] < t_limit

def check_cluster(event, context):
    
    if 'data' in event:
        s = base64.b64decode(event['data']).decode('utf-8')
    else:
        s = os.environ.get('max(指標)', '(数値)')
    m = int(s)

    client = proc.ClusterControllerClient(
    client_options={"api_endpoint": "地域-dataproc.googleapis.com:443"}
    )

    # Initialize request argument(s)
    request = proc.ListClustersRequest(
        project_id="プロジェクト名", 
        region="",
    )

    # Make the request
    page = client.list_clusters(request=request)

    t_delta = datetime.timedelta(minutes = m)
    t_limit = datetime.datetime.now(datetime.timezone.utc) - t_delta

    me1 = lambda x: me2(x, time_limit)

    list = list(filter(me1, map(extract, page)))

    if list:
        webhook_url = "https://hooks.slack.com/~"
        message = f'`{checked_list[0][2]}から{m}時間以上`'
        requests.post(webhook_url, data=json.dumps({'text': message}))
    else:
        print("その他")

実行補足
functionsを作成する際に
ランタイム環境変数を設定して引数を渡すことが可能
スケジューラーのメッセージ本文からも引数設定が可能

参照資料

Google cloud scheduler で Functionsを定期実行する

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0