やりたいこと
データ分析にBigQueryを活用していますが、クエリの書き方や参照するデータの特性により、課金データ量が莫大なクエリが実行されることがあります。
手作業で棚卸しすることもあるのですが、日常的に通知されたほうが現状を把握しやすい・作業するきっかけにしやすいため、通知する仕組みを作ってみることにしました。
この記事で触れる話題
- AuditLogs
- GCPでは各種イベント発生時に誰がどんな操作をしたか・結果の成功失敗などの監査ログが記録されます
- BigQueryは初期設定で各種監査ログが記録されるようになっています
- Eventarc
- GCPでイベントドリブンアーキテクチャを構築するのに便利なサービスです
- 今回は監査ログ(AuditLogs)の記録をトリガーに各種処理を実行できる機能を活用します
- Cloud Functions(v2)
- ソースコードをアップロードしてトリガー1を設定するだけでコードを実行できます
- 今回はJSでコードを書きます
- バックエンドにCloud Runが使われているようで、Cloud Runの概念も登場します
- cdktf
- 単にGUIで構築するのではなくcdktf(言語はTypeScript)を使って再現性のある形で定義します
- コマンド一発でterraformを呼び出して各種リソースを一括で作成できます
参考記事
基本的には各種参考記事のサンプルコードを寄せ集めて改造しただけです。
- Eventarcを使ってBigQueryのQuery Jobの実行を検知する
- EventarcをトリガーにしたCloudFunctionの定義をterraformで定義する
- イベント発生時にCloudFunction(v2) with Node.jsで処理する
- cdktfでGCPのリソースを定義する
- cdktfでディレクトリまるごとzip化する
各種バージョンなど
cdktfなどnpmパッケージのバージョンは次の通りです
"@cdktf/provider-google": "7.0.3",
"cdktf": "^0.16.0",
"cdktf-cli": "^0.16.0",
実装
ちなみに今回作る仕組みは too-big-query-notice
と呼ぶことにします。
(BigQueryはbigなqueryを叩くためのサービスではあるがtoo bigなのも困るよね、という意味を込めて)
実装イメージ
図にするとこんな感じです
作成するリソース
図と対応しています
(上記の通り、監査ログの設定は初期状態から変更不要です)
- サービスアカウント
- EventarcのトリガーとCloudFunctionに指定するサービスアカウントが必要です
- 今回は共通にして1つだけ作ります
- IAMロール
-
roles/eventarc.eventReceiver
- Eventarcでイベント受信するために必要
-
roles/run.invoker
- CloudFunction(実体はCloudRun)を呼び出すのに必要
-
roles/artifactregistry.reader
- EventarcがCloudRunの呼び出し先を特定するために必要なのかな?(公式サンプルに入ってるから入れてるだけ)
-
- GCSバケット
- ソースコードを入れるバケットです
- CloudFunction(v2)
- 実行する処理の本体です
- 関数作成時にEventarcのトリガーも一緒に指定できるので今回はそうします
- 環境変数で挙動を調整できるようにします
-
WEBHOOK_URL
Slackのwebhook URL -
TOO_BIG_LIMIT_GIGABYTES
通知するしきい値[GB]
-
監査ログのデータ構造について
データ構造は下記のようになっています
次の記事の「(3) SQLクエリの実行」の箇所も分かりやすいです。
https://zenn.dev/hssh2_bin/articles/0766181f3f89ff
cdktfを使った実装
ファイル構成
-
too-big-query-notice-source
CloudFunction関数のソースディレクトリpackage.json
index.ts
-
main.ts
cdktfの各種実装 - ... その他package.jsonなど
実行するCloudFunction関数の実装
下記2ファイルを too-big-query-notice-source
というディレクトリに配置します
{
"dependencies": {
"@google-cloud/functions-framework": "^3.0.0",
"axios": "^1.3.6"
}
}
const functions = require('@google-cloud/functions-framework');
const axios = require('axios');
functions.cloudEvent('handleEvent', async cloudEvent => {
const payload = cloudEvent.data.protoPayload;
if (!payload) {
return console.error('payloadが未定義');
}
const jobCompletedEvent = payload.serviceData.jobCompletedEvent;
if (jobCompletedEvent.eventName !== 'query_job_completed') {
return console.info('query_job以外のjob完了');
}
const job = jobCompletedEvent.job;
const jobErrorMessage = job.jobStatus.error.message;
if (jobErrorMessage) {
return console.info(`クエリ失敗;${jobErrorMessage}`);
}
const limitString = process.env.TOO_BIG_LIMIT_GIGABYTES;
if (!limitString) {
return console.error(
'環境変数 `TOO_BIG_LIMIT_GIGABYTES` にしきい値を設定してください'
);
}
const TOO_BIG_LIMIT_GIGABYTES = Number(limitString);
const totalBilledBytes = job.jobStatistics.totalBilledBytes;
if (typeof totalBilledBytes === 'undefined') {
return console.info('課金データ量が未定義');
}
const totalGigabytesBilled = totalBilledBytes / (1024 * 1024 * 1024);
if (totalGigabytesBilled < TOO_BIG_LIMIT_GIGABYTES) {
return console.log(
`しきい値${TOO_BIG_LIMIT_GIGABYTES}GBに達していないので終了`
);
}
const jobId = job.jobName.jobId;
const query = job.jobConfiguration.query.query;
const principalEmail = payload.authenticationInfo.principalEmail;
const message = `
激重クエリが実行されました。よほどのことがない限りは${TOO_BIG_LIMIT_GIGABYTES}GBくらいに収まるはずなので時間を見つけてチューニングしましょう
jobId: ${jobId}
principalEmail: ${principalEmail}
totalGigabytesBilled: ${totalGigabytesBilled}
query:
\`\`\`${query}\`\`\`
`;
console.log(message);
await axios.post(process.env.WEBHOOK_URL, {text: message});
});
cdktfの実装
長々書いてありますが、「作成するリソース」と見比べるとそのままです。
import {Construct} from 'constructs';
import {
App,
TerraformStack,
GcsBackend,
TerraformAsset,
AssetType,
} from 'cdktf';
import {GoogleProvider} from '@cdktf/provider-google/lib/provider';
import {StorageBucket} from '@cdktf/provider-google/lib/storage-bucket';
import {ServiceAccount} from '@cdktf/provider-google/lib/service-account';
import {ProjectIamMember} from '@cdktf/provider-google/lib/project-iam-member';
import {StorageBucketObject} from '@cdktf/provider-google/lib/storage-bucket-object';
import {Cloudfunctions2Function} from '@cdktf/provider-google/lib/cloudfunctions2-function';
type EnvType = 'production';
class MyStack extends TerraformStack {
constructor(scope: Construct, id: string, env: EnvType) {
super(scope, `${id}-${env}`);
const projectName = '【プロジェクト名】'; // ←要設定
// cdktfの設定
new GoogleProvider(this, 'google', {
project: projectName,
});
new GcsBackend(this, {
bucket: 'terraform-state', // このバケットは手作業で作っておく
prefix: `terraform-${env}`,
});
// 今回の記事に固有な定義はここから
// 各種CloudFunctionsのソースコードを配置するのに使うバケット
const cloudFunctionsCommonSourceBucket = new StorageBucket(
this,
'gcs-cloud_functions_common_source',
{
name: 'cloud-functions-common-source',
location: 'US',
uniformBucketLevelAccess: true,
publicAccessPrevention: 'enforced',
}
);
// 実行用サービスアカウント作成
const runnerServiceAccount = new ServiceAccount(
this,
'service_account-too_big_query_notice',
{
accountId: 'too-big-query-notice',
displayName: 'for too-big-query-notice',
}
);
// 権限付与
const iamEventReceiving = new ProjectIamMember(
this,
'iam-service_account-too_big_query_notice-event_receiving',
{
project: projectName,
role: 'roles/eventarc.eventReceiver', // ←必要ならもっと細かく制限すること
member: `serviceAccount:${runnerServiceAccount.email}`,
}
);
const iamInvoking = new ProjectIamMember(
this,
'iam-service_account-too_big_query_notice-invoking',
{
project: projectName,
role: 'roles/run.invoker', // ←必要ならもっと細かく制限すること
member: `serviceAccount:${runnerServiceAccount.email}`,
}
);
const iamArtifactRegistryReader = new ProjectIamMember(
this,
'iam-service_account-too_big_query_notice-artifact_registry_reader',
{
project: projectName,
role: 'roles/artifactregistry.reader',
member: `serviceAccount:${runnerServiceAccount.email}`,
}
);
// ソースコードのアップロード
const zipfile = new TerraformAsset(this, 'asset-too_big_query_notice-source', {
path: 'src/too-big-query-notice-source',
type: AssetType.ARCHIVE,
});
const sourceZipObject = new StorageBucketObject(
this,
'gcs-object-too_big_query_notice-zip',
{
name: `too_big_query_notice-${zipfile.assetHash}.zip`, // ←変更時強制更新のためassetHashを入れた
bucket: cloudFunctionsCommonSourceBucket.name,
source: zipfile.path,
}
);
// CloudFunction関数作成
new Cloudfunctions2Function(this, 'gcf-too_big_query_notice', {
name: 'too_big_query_notice',
location: 'us-central1', // ←どこでもよい
description:
'探索データ量が一定以上のBigQueryのクエリ実行時にSlack通知します',
buildConfig: {
runtime: 'nodejs18',
entryPoint: 'handleEvent',
source: {
storageSource: {
bucket: sourceZipObject.bucket,
object: sourceZipObject.name,
},
},
},
serviceConfig: {
maxInstanceCount: 5,
minInstanceCount: 0,
availableMemory: '128Mi',
availableCpu: '0.083',
timeoutSeconds: 5,
ingressSettings: 'ALLOW_ALL',
allTrafficOnLatestRevision: true,
serviceAccountEmail: runnerServiceAccount.email,
environmentVariables: {
WEBHOOK_URL: '【webhook urlを設定】', // ←ここは適宜設定
TOO_BIG_LIMIT_GIGABYTES: '20', // ←ここは適宜設定
},
},
eventTrigger: {
triggerRegion: 'global', // ←特定リージョンのクエリ実行のみを検知したければ限定も可能
eventType: 'google.cloud.audit.log.v1.written',
eventFilters: [
{attribute: 'serviceName', value: 'bigquery.googleapis.com'},
{attribute: 'methodName', value: 'jobservice.jobcompleted'},
],
serviceAccountEmail: runnerServiceAccount.email,
retryPolicy: 'RETRY_POLICY_RETRY',
},
dependsOn: [iamInvoking, iamEventReceiving, iamArtifactRegistryReader], // ←これを指定しないと運が悪いと作成順により失敗するので注意
});
}
}
const app = new App();
new MyStack(app, 'sample', 'production');
app.synth();
疑問
GCSのソースコードを更新したときにCloudFunctionが更新を検知するためのスマートな方法がわからなかったです。今回は ${zipfile.assetHash}.zip
のようにしてソースコード変更時にオブジェクト名を変えることでごまかしました
最後に
疑問の箇所はググってもはっきりした答えがわからないのでモヤモヤが残りますが、いったん実用上は問題ないレベルのものが作れたと思うのでこれで運用してみようと思います。
cdktfも便利なのでぜひ使っていきましょう。
-
ドキュメントを読む限りだと、「CloudFunction(v2)にトリガーを設定する機能がある」というよりは、「トリガーを設定する機能自体がEventarcとの統合により実現されている」と捉えるのが正しそうです https://cloud.google.com/functions/docs/concepts/version-comparison?hl=ja ↩