1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

BigQueryで激重クエリが実行されたらSlack通知する仕組みを作る(with cdktf)

Last updated at Posted at 2023-04-28

やりたいこと

データ分析にBigQueryを活用していますが、クエリの書き方や参照するデータの特性により、課金データ量が莫大なクエリが実行されることがあります。

手作業で棚卸しすることもあるのですが、日常的に通知されたほうが現状を把握しやすい・作業するきっかけにしやすいため、通知する仕組みを作ってみることにしました。

この記事で触れる話題

  • AuditLogs
    • GCPでは各種イベント発生時に誰がどんな操作をしたか・結果の成功失敗などの監査ログが記録されます
    • BigQueryは初期設定で各種監査ログが記録されるようになっています
  • Eventarc
    • GCPでイベントドリブンアーキテクチャを構築するのに便利なサービスです
    • 今回は監査ログ(AuditLogs)の記録をトリガーに各種処理を実行できる機能を活用します
  • Cloud Functions(v2)
    • ソースコードをアップロードしてトリガー1を設定するだけでコードを実行できます
    • 今回はJSでコードを書きます
    • バックエンドにCloud Runが使われているようで、Cloud Runの概念も登場します
  • cdktf
    • 単にGUIで構築するのではなくcdktf(言語はTypeScript)を使って再現性のある形で定義します
    • コマンド一発でterraformを呼び出して各種リソースを一括で作成できます

参考記事

基本的には各種参考記事のサンプルコードを寄せ集めて改造しただけです。

各種バージョンなど

cdktfなどnpmパッケージのバージョンは次の通りです

    "@cdktf/provider-google": "7.0.3",
    "cdktf": "^0.16.0",
    "cdktf-cli": "^0.16.0",

実装

ちなみに今回作る仕組みは too-big-query-notice と呼ぶことにします。

(BigQueryはbigなqueryを叩くためのサービスではあるがtoo bigなのも困るよね、という意味を込めて)

実装イメージ

図にするとこんな感じです

image.png

作成するリソース

図と対応しています

(上記の通り、監査ログの設定は初期状態から変更不要です)

  • サービスアカウント
    • EventarcのトリガーとCloudFunctionに指定するサービスアカウントが必要です
    • 今回は共通にして1つだけ作ります
  • IAMロール
    • roles/eventarc.eventReceiver
      • Eventarcでイベント受信するために必要
    • roles/run.invoker
      • CloudFunction(実体はCloudRun)を呼び出すのに必要
    • roles/artifactregistry.reader
      • EventarcがCloudRunの呼び出し先を特定するために必要なのかな?(公式サンプルに入ってるから入れてるだけ)
  • GCSバケット
    • ソースコードを入れるバケットです
  • CloudFunction(v2)
    • 実行する処理の本体です
    • 関数作成時にEventarcのトリガーも一緒に指定できるので今回はそうします
    • 環境変数で挙動を調整できるようにします
      • WEBHOOK_URL Slackのwebhook URL
      • TOO_BIG_LIMIT_GIGABYTES 通知するしきい値[GB]

監査ログのデータ構造について

データ構造は下記のようになっています

スクリーンショット 2023-04-28 9.58.41.png

ドキュメント
https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/AuditData#JobCompletedEvent

次の記事の「(3) SQLクエリの実行」の箇所も分かりやすいです。
https://zenn.dev/hssh2_bin/articles/0766181f3f89ff

cdktfを使った実装

ファイル構成

  • too-big-query-notice-source CloudFunction関数のソースディレクトリ
    • package.json
    • index.ts
  • main.ts cdktfの各種実装
  • ... その他package.jsonなど

実行するCloudFunction関数の実装

下記2ファイルを too-big-query-notice-source というディレクトリに配置します

too-big-query-notice-source/package.json
{
  "dependencies": {
    "@google-cloud/functions-framework": "^3.0.0",
    "axios": "^1.3.6"
  }
}
too-big-query-notice-source/index.js
const functions = require('@google-cloud/functions-framework');
const axios = require('axios');

functions.cloudEvent('handleEvent', async cloudEvent => {
  const payload = cloudEvent.data.protoPayload;
  if (!payload) {
    return console.error('payloadが未定義');
  }

  const jobCompletedEvent = payload.serviceData.jobCompletedEvent;
  if (jobCompletedEvent.eventName !== 'query_job_completed') {
    return console.info('query_job以外のjob完了');
  }

  const job = jobCompletedEvent.job;
  const jobErrorMessage = job.jobStatus.error.message;
  if (jobErrorMessage) {
    return console.info(`クエリ失敗;${jobErrorMessage}`);
  }

  const limitString = process.env.TOO_BIG_LIMIT_GIGABYTES;
  if (!limitString) {
    return console.error(
      '環境変数 `TOO_BIG_LIMIT_GIGABYTES` にしきい値を設定してください'
    );
  }
  const TOO_BIG_LIMIT_GIGABYTES = Number(limitString);

  const totalBilledBytes = job.jobStatistics.totalBilledBytes;
  if (typeof totalBilledBytes === 'undefined') {
    return console.info('課金データ量が未定義');
  }

  const totalGigabytesBilled = totalBilledBytes / (1024 * 1024 * 1024);
  if (totalGigabytesBilled < TOO_BIG_LIMIT_GIGABYTES) {
    return console.log(
      `しきい値${TOO_BIG_LIMIT_GIGABYTES}GBに達していないので終了`
    );
  }

  const jobId = job.jobName.jobId;
  const query = job.jobConfiguration.query.query;
  const principalEmail = payload.authenticationInfo.principalEmail;

  const message = `
激重クエリが実行されました。よほどのことがない限りは${TOO_BIG_LIMIT_GIGABYTES}GBくらいに収まるはずなので時間を見つけてチューニングしましょう

jobId: ${jobId}
principalEmail: ${principalEmail}
totalGigabytesBilled: ${totalGigabytesBilled}

query:
\`\`\`${query}\`\`\`
      `;

  console.log(message);
  await axios.post(process.env.WEBHOOK_URL, {text: message});
});

cdktfの実装

長々書いてありますが、「作成するリソース」と見比べるとそのままです。

main.ts
import {Construct} from 'constructs';
import {
  App,
  TerraformStack,
  GcsBackend,
  TerraformAsset,
  AssetType,
} from 'cdktf';
import {GoogleProvider} from '@cdktf/provider-google/lib/provider';
import {StorageBucket} from '@cdktf/provider-google/lib/storage-bucket';
import {ServiceAccount} from '@cdktf/provider-google/lib/service-account';
import {ProjectIamMember} from '@cdktf/provider-google/lib/project-iam-member';
import {StorageBucketObject} from '@cdktf/provider-google/lib/storage-bucket-object';
import {Cloudfunctions2Function} from '@cdktf/provider-google/lib/cloudfunctions2-function';

type EnvType = 'production';

class MyStack extends TerraformStack {
  constructor(scope: Construct, id: string, env: EnvType) {
    super(scope, `${id}-${env}`);

    const projectName = '【プロジェクト名】'; // ←要設定

    // cdktfの設定
    new GoogleProvider(this, 'google', {
      project: projectName,
    });
    new GcsBackend(this, {
      bucket: 'terraform-state', // このバケットは手作業で作っておく
      prefix: `terraform-${env}`,
    });

    // 今回の記事に固有な定義はここから

    // 各種CloudFunctionsのソースコードを配置するのに使うバケット
    const cloudFunctionsCommonSourceBucket = new StorageBucket(
      this,
      'gcs-cloud_functions_common_source',
      {
        name: 'cloud-functions-common-source',
        location: 'US',
        uniformBucketLevelAccess: true,
        publicAccessPrevention: 'enforced',
      }
    );

    // 実行用サービスアカウント作成
    const runnerServiceAccount = new ServiceAccount(
      this,
      'service_account-too_big_query_notice',
      {
        accountId: 'too-big-query-notice',
        displayName: 'for too-big-query-notice',
      }
    );

    // 権限付与
    const iamEventReceiving = new ProjectIamMember(
      this,
      'iam-service_account-too_big_query_notice-event_receiving',
      {
        project: projectName,
        role: 'roles/eventarc.eventReceiver', // ←必要ならもっと細かく制限すること
        member: `serviceAccount:${runnerServiceAccount.email}`,
      }
    );
    const iamInvoking = new ProjectIamMember(
      this,
      'iam-service_account-too_big_query_notice-invoking',
      {
        project: projectName,
        role: 'roles/run.invoker', // ←必要ならもっと細かく制限すること
        member: `serviceAccount:${runnerServiceAccount.email}`,
      }
    );
    const iamArtifactRegistryReader = new ProjectIamMember(
      this,
      'iam-service_account-too_big_query_notice-artifact_registry_reader',
      {
        project: projectName,
        role: 'roles/artifactregistry.reader',
        member: `serviceAccount:${runnerServiceAccount.email}`,
      }
    );

    // ソースコードのアップロード
    const zipfile = new TerraformAsset(this, 'asset-too_big_query_notice-source', {
      path: 'src/too-big-query-notice-source',
      type: AssetType.ARCHIVE,
    });
    const sourceZipObject = new StorageBucketObject(
      this,
      'gcs-object-too_big_query_notice-zip',
      {
        name: `too_big_query_notice-${zipfile.assetHash}.zip`, // ←変更時強制更新のためassetHashを入れた
        bucket: cloudFunctionsCommonSourceBucket.name,
        source: zipfile.path,
      }
    );

    // CloudFunction関数作成
    new Cloudfunctions2Function(this, 'gcf-too_big_query_notice', {
      name: 'too_big_query_notice',
      location: 'us-central1', // ←どこでもよい
      description:
        '探索データ量が一定以上のBigQueryのクエリ実行時にSlack通知します',
      buildConfig: {
        runtime: 'nodejs18',
        entryPoint: 'handleEvent',
        source: {
          storageSource: {
            bucket: sourceZipObject.bucket,
            object: sourceZipObject.name,
          },
        },
      },
      serviceConfig: {
        maxInstanceCount: 5,
        minInstanceCount: 0,
        availableMemory: '128Mi',
        availableCpu: '0.083',
        timeoutSeconds: 5,
        ingressSettings: 'ALLOW_ALL',
        allTrafficOnLatestRevision: true,
        serviceAccountEmail: runnerServiceAccount.email,
        environmentVariables: {
          WEBHOOK_URL: '【webhook urlを設定】', // ←ここは適宜設定
          TOO_BIG_LIMIT_GIGABYTES: '20', // ←ここは適宜設定
        },
      },
      eventTrigger: {
        triggerRegion: 'global', // ←特定リージョンのクエリ実行のみを検知したければ限定も可能
        eventType: 'google.cloud.audit.log.v1.written',
        eventFilters: [
          {attribute: 'serviceName', value: 'bigquery.googleapis.com'},
          {attribute: 'methodName', value: 'jobservice.jobcompleted'},
        ],
        serviceAccountEmail: runnerServiceAccount.email,
        retryPolicy: 'RETRY_POLICY_RETRY',
      },
      dependsOn: [iamInvoking, iamEventReceiving, iamArtifactRegistryReader], // ←これを指定しないと運が悪いと作成順により失敗するので注意
    });
  }
}

const app = new App();
new MyStack(app, 'sample', 'production');
app.synth();

疑問

GCSのソースコードを更新したときにCloudFunctionが更新を検知するためのスマートな方法がわからなかったです。今回は ${zipfile.assetHash}.zip のようにしてソースコード変更時にオブジェクト名を変えることでごまかしました

最後に

実行イメージ
スクリーンショット 2023-04-28 16.08.12.png

疑問の箇所はググってもはっきりした答えがわからないのでモヤモヤが残りますが、いったん実用上は問題ないレベルのものが作れたと思うのでこれで運用してみようと思います。

cdktfも便利なのでぜひ使っていきましょう。

  1. ドキュメントを読む限りだと、「CloudFunction(v2)にトリガーを設定する機能がある」というよりは、「トリガーを設定する機能自体がEventarcとの統合により実現されている」と捉えるのが正しそうです https://cloud.google.com/functions/docs/concepts/version-comparison?hl=ja

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?