1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

みなさん、こんにちは!

前回の記事に引き続き、Snowflake のログを CloudWatch に連携する方法についてご紹介します。

外部関数作成までの基本的な流れは同じです。
今回はアクセス履歴を10分ごとに取得し自動で CloudWatch に連携するため、プロシージャの作成とタスクの登録も行います。

Lambda関数

前回の記事で作成したLambda関数のPythonコードを以下に置き換えます。

qiita.rb
import json
import datetime, decimal

import boto3

def default_json_transform(obj):
    if isinstance(obj, decimal.Decimal):
        return str(obj)
    if isinstance(obj, (datetime.date, datetime.datetime)):
        return obj.isoformat()

    raise TypeError

def lambda_handler(event, context):
    array_of_rows_to_return = []
    status_code = 200

    event_body = event["body"]
    payload = json.loads(event_body)

    for row in payload["data"]:
        row_number = row[0]
        groupName = row[1]
        streamName = row[2]
        dimensionsArray = row[3]

        dimensionDict = json.loads(dimensionsArray)
        metricResponse = send_cloudwatch_logs(groupName, streamName, dimensionDict)

        row_to_return = [row_number, metricResponse]
        array_of_rows_to_return.append(row_to_return)

    json_compatible_string_to_return = json.dumps({"data" : array_of_rows_to_return}, default=default_json_transform)

    return {
        'statusCode': status_code,
        'body': json_compatible_string_to_return
    }

def send_cloudwatch_logs(groupName, streamName, dimensionDict):
    log_client = boto3.client('logs')

    response = log_client.put_log_events(
        logGroupName=groupName,
        logStreamName=streamName,
        logEvents=[{
            'timestamp': int(datetime.datetime.now().timestamp() * 1000),
            'message': json.dumps(dimensionDict)
        }]
    )

    return response

今回はCloudWatchロググループへのログ出力を行うので、Lambdaの実行ロールに以下のポリシーがアタッチされていることを確認します。

snowflake-cloudwatch-log-policy

qiita.rb
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}

また、書き込み先のロググループとログストリームを以下のように作成しておきます。

  • ロググループ:snowflake-cloudwatch-logs
  • ログストリーム:access-history

外部関数

以下のSQLを実行し、ログ送信用の外部関数send_cloudwatch_logsを作成します。

qiita.rb
CREATE DATABASE IF NOT EXISTS EXTERNAL_APIS;
USE EXTERNAL_APIS;
CREATE SCHEMA IF NOT EXISTS EXTERNAL_FUNCTIONS;
USE SCHEMA EXTERNAL_FUNCTIONS;

CREATE OR REPLACE EXTERNAL FUNCTION send_cloudwatch_logs(groupname STRING, streamname STRING, dimension_json VARCHAR)
  RETURNS VARIANT
  API_INTEGRATION = int_cloudwatch_api
  AS '<APIの呼び出しURL>';

プロシージャ

以下のSQLを実行し、プロシージャを登録します。
直近10分以内のアクセス履歴を取得するプロシージャです。

qiita.rb
CREATE OR REPLACE PROCEDURE cloudwatch_logs_query()
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS $$
    result = '';

    try {
        // アクセス履歴取得直近10分以内
        access_history_query = 
        `SELECT *
        FROM SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY
        WHERE QUERY_START_TIME >= DATEADD(minute, -10, CURRENT_TIMESTAMP)
        ORDER BY QUERY_START_TIME;`;

        access_history = snowflake.execute({sqlText: access_history_query});

        while (access_history.next()) {
            // クエリ結果の取得
            result_json = {
                query_id: access_history.getColumnValue(1),
                query_start_time: access_history.getColumnValue(2),
                user_name: access_history.getColumnValue(3),
                direct_objects_accessed: access_history.getColumnValue(4),
                base_objects_accessed: access_history.getColumnValue(5),
                objects_modified: access_history.getColumnValue(6),
                objects_modified_by_ddl: access_history.getColumnValue(7),
                policies_referenced: access_history.getColumnValue(8),
                parent_query_id: access_history.getColumnValue(9),
                root_query_id: access_history.getColumnValue(10)
            }

            // アクセス履歴の送信
            send_logs_query = `SELECT send_cloudwatch_logs(
                \'snowflake-cloudwatch-logs\',
                \'access-history\',
                \'${JSON.stringify(result_json)}\'
            )`;

            snowflake.execute({sqlText: send_logs_query})
        }

        result = 'Success';
    }
    catch (err) {
        result =  "Failed: Code: " + err.code + "\nState: " + err.state;
        result += "\nMessage: " + err.message;
        result += "\nStack Trace:\n" + err.stackTraceTxt;
    }

    return result;
$$;

通常のプロシージャ実行は以下のコマンドで実行可能です。

qiita.rb
CALL cloudwatch_logs_query();

タスクの作成と登録

タスクを登録することで、プロシージャを定期実行させることができます。

以下のSQLを実行し、タスクの作成と有効化を行います。

qiita.rb
-- タスク作成
CREATE OR REPLACE TASK cloudwatch_logs_task
  WAREHOUSE = compute_wh  -- 適切なウェアハウスを指定
  SCHEDULE = 'USING CRON */10 * * * * UTC'  -- 10分ごとに実行 (UTCタイムゾーン)
AS
  CALL cloudwatch_logs_query();

-- タスク有効化
ALTER TASK cloudwatch_logs_task RESUME;

-- タスク一時停止
-- ALTER TASK oracle_replication_task SUSPEND;

タスクの詳細は以下のコマンドで確認可能です。

qiita.rb
SHOW TASKS LIKE 'cloudwatch_logs_task';

動作確認

タスクが実行されると、以下のように対象のログストリームにログが出力されます。
image.png

さいごに

Snowflake の監査ログをモニタリングサービス(CloudWatch)に連携する方法についてご紹介しました。

モニタリングサービスに連携することで、必要なログを一元管理できることに加え、ログの分析もしやすくなります。

また、外部関数を利用してログを連携できるだけでなく、プロシージャとタスクを組み合わせることで処理の定期実行が可能となり、運用を効率化することができます。
Snowflake のログ連携でお悩みの方は、本記事の内容を参考に外部関数の利用を検討してみてください。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?