LoginSignup
3
0

More than 1 year has passed since last update.

[AWS] Cloudwatch_LogsからLambda経由で軽いETLしつつOpenSearchでApacheログを可視化する方法

Last updated at Posted at 2022-01-11

やりたい事

2022/01/13開催の SB Tech Festival で登壇した内容、「ビッグデータ活用の第一歩AWS環境での大容量ログ可視化」の詳細手順(後半)ページです。

このエントリで解説するのは以下のイメージ

2022-01-11_09h20_031.png

前半部分のFargate+Firelens+CloudwatchLogsについてはこちらの記事を参照してください。

手順

2022/01/11時点でECSコンソールは新しいエクスペリエンスが提供されていますが、本エントリでは旧UIをベースに解説しています。

1.OpenSearchクラスターを作成する

AWSコンソールからOpenSearchダッシュボードを開き、新しいドメインを作成します。
2022-01-11_15h46_39.png

項目名 設定内容
ドメイン名 <任意の名前>
カスタムエンドポイント チェック無し
デプロイタイプ 開発及びテスト
バージョン 1.0
※より新しいバージョンがリリースされていたとしても、Lambdaコードの後方互換が切られる可能性があります。自身でコード改修出来ない場合は、1.0選択をお願いします
自動調整 無効化
データノード t3.small.search
※インスタンスサイズを大きくするのは任意ですが、課金に直結するのでご注意ください。t3.smallは無料利用枠有り
ノードの数 1
ストレージタイプ EBS
EBS ボリュームタイプ 汎用(SSD)
ノード当たりのEBSストレージサイズ 20GiB
きめ細かなアクセスコントロール チェック無し
アクセスポリシー ドメインレベルのアクセスポリシーの設定(ビジュアルエディタでダッシュボードへのアクセスを許可するIPv4/v6アドレスを指定します)
暗号化 全てチェック
AWS KMSキーを選択する AWS 所有キーを使用する

作成後、10分ほど時間をおいてデプロイが完了する事を確認してください。
(クラスターのヘルスは黄色or緑のいずれかであれば、次の手順へ進んで問題ありません。)
2022-01-11_15h57_31.png

2.CloudWatch Logs用IAMロールを作成する

IAM管理ポリシーを新規作成し、
2022-01-11_09h42_56.png

JSONタブを開き
2022-01-11_09h43_27.png

以下のJSONをコピペする。これはLambdaからCloudwatchLogs及びElasticSearchの必要な操作が出来るように権限を付与する内容。
より権限を絞ったり、リソースを指定するのがより良いですが、今回はその他リソースが無い前提で広く権限を持たせます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:*",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": "es:*",
            "Resource": "*"
        }
    ]
}

続いてIAMロールを新規作成し、
2022-01-11_09h45_45.png

以下の通りにロールの利用方法を指定。
2022-01-11_19h31_32.png

次の画面で、作成済みのIAM管理ポリシーを指定しLambda向けロール作成完了。

IAM管理ポリシー、IAMロールのタグや名前は任意の内容でOK。

3.CloudWatch Logsサブスクリプションフィルターを作成する

前半で作成したCloudWatch Logグループ /apache-dummy-logを選択し、図示の通りOpenSearch向けのサブスクリプションフィルターを作成
2022-01-11_16h10_53.png

項目名 設定内容
アカウントを選択 This account
Amazon OpenSearch Service cluster <手順1で作成したOpenSearchドメインを指定>
ログの形式 JSON
サブスクリプションフィルターのパターン <未記入のまま>
サブスクリプションフィルター名 <任意の名前を記載>

以下のようにサブスクリプションフィルターが作成された事を確認してください。
2022-01-11_16h15_52.png

4.Lambdaで軽いETL処理を実装する

今回はApache2のダミーログを発生させていますが、ご存じの通りApache2のログはスペース区切りでフィールド名がありません。
OpenSearchへ投入する前に、Lambda上でパースしてフィールド名を割り当てましょう。

Lambdaコンソールを開き、関数一覧の中にLogsToElasticsearch_apachedummylog-opensearchがある事を確認し、クリックします。
2022-01-11_16h18_47.png

設定タブを開き、実行ロールの編集をクリックします。
2022-01-12_19h54_05.png

手順#2で作成したIAMロールを指定し、保存をクリックします。
2022-01-12_19h54_00.png

続いてコードタブを開き、関数の76行目以降に以下を追加します。

Node.js
        var messageConvJson = logEvent.message.replace(/"/g,'\"');
        messageConvJson = messageConvJson.replace(/{\\"/,'{"');
        messageConvJson = messageConvJson.replace(/\\"\s*:\s*\\"/g,'":"');
        messageConvJson = messageConvJson.replace(/\\"\s*,\s*\\"/g,'","');
        messageConvJson = messageConvJson.replace(/\\"}/,'"}');
        var messageJson = JSON.parse(messageConvJson);

        let [ remote_host, remote_logname, remote_user, ts, tz, method, resource, protocol, statusCode, size, referer, ...userAgent ] = messageJson.log.split(' ');

        statusCode = +statusCode;
        size = +size;
        ts = ts.substring(1);
        tz = tz.substring(0, tz.length - 1);
        const accesstime = ts + tz;
        userAgent = userAgent.join('');

        source['@remote_host'] = remote_host;
        source['@accesstime'] = accesstime;
        source['@method'] = method;
        source['@resource'] = resource;
        source['@protocol'] = protocol;
        source['@statusCode'] = statusCode;
        source['@size'] = size;
        source['@referer'] = referer;
        source['@userAgent'] = userAgent;

CloudWatch Logsから自動デプロイされるコードに、上記を含めたコード全体も載せておきます。
こちらをコピペしていただくのが簡単です。

コード全体
Node.js
// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');

var endpoint = 'search-apachedummylog-opensearch-amepwsh24gynwv63reszss5u4e.ap-northeast-1.es.amazonaws.com';

// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;

exports.handler = function(input, context) {
    // decode input from base64
    var zippedInput = new Buffer.from(input.awslogs.data, 'base64');

    // decompress the input
    zlib.gunzip(zippedInput, function(error, buffer) {
        if (error) { context.fail(error); return; }

        // parse the input from JSON
        var awslogsData = JSON.parse(buffer.toString('utf8'));

        // transform the input to Elasticsearch documents
        var elasticsearchBulkData = transform(awslogsData);

        // skip control messages
        if (!elasticsearchBulkData) {
            console.log('Received a control message');
            context.succeed('Control message handled successfully');
            return;
        }

        // post documents to the Amazon Elasticsearch Service
        post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
            console.log('Response: ' + JSON.stringify({
                "statusCode": statusCode
            }));

            if (error) {
                logFailure(error, failedItems);
                context.fail(JSON.stringify(error));
            } else {
                console.log('Success: ' + JSON.stringify(success));
                context.succeed('Success');
            }
        });
    });
};

function transform(payload) {
    if (payload.messageType === 'CONTROL_MESSAGE') {
        return null;
    }

    var bulkRequestBody = '';

    payload.logEvents.forEach(function(logEvent) {
        var timestamp = new Date(1 * logEvent.timestamp);

        // index name format: cwl-YYYY.MM.DD
        var indexName = [
            'cwl-' + timestamp.getUTCFullYear(),              // year
            ('0' + (timestamp.getUTCMonth() + 1)).slice(-2),  // month
            ('0' + timestamp.getUTCDate()).slice(-2)          // day
        ].join('.');

        var source = buildSource(logEvent.message, logEvent.extractedFields);
        source['@id'] = logEvent.id;
        source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
        source['@message'] = logEvent.message;
        source['@owner'] = payload.owner;
        source['@log_group'] = payload.logGroup;
        source['@log_stream'] = payload.logStream;

        //console.log("logEvent.message = " + logEvent.message);
        var messageConvJson = logEvent.message.replace(/"/g,'\"');
        messageConvJson = messageConvJson.replace(/{\\"/,'{"');
        messageConvJson = messageConvJson.replace(/\\"\s*:\s*\\"/g,'":"');
        messageConvJson = messageConvJson.replace(/\\"\s*,\s*\\"/g,'","');
        messageConvJson = messageConvJson.replace(/\\"}/,'"}');
        //console.log("messageConvJson = " + messageConvJson);
        var messageJson = JSON.parse(messageConvJson);
        //console.log("messageJson.log = " + messageJson.log);

        let [ remote_host, remote_logname, remote_user, ts, tz, method, resource, protocol, statusCode, size, referer, ...userAgent ] = messageJson.log.split(' ');

        statusCode = +statusCode;
        size = +size;
        ts = ts.substring(1);
        tz = tz.substring(0, tz.length - 1);
        const accesstime = ts + tz;
        userAgent = userAgent.join('');

        source['@remote_host'] = remote_host;
        source['@accesstime'] = accesstime;
        source['@method'] = method;
        source['@resource'] = resource;
        source['@protocol'] = protocol;
        source['@statusCode'] = statusCode;
        source['@size'] = size;
        source['@referer'] = referer;
        source['@userAgent'] = userAgent;
        console.log(source)

        var action = { "index": {} };
        action.index._index = indexName;
        action.index._type = payload.logGroup;
        action.index._id = logEvent.id;

        bulkRequestBody += [
            JSON.stringify(action),
            JSON.stringify(source),
        ].join('\n') + '\n';
    });
    return bulkRequestBody;
}

function buildSource(message, extractedFields) {
    if (extractedFields) {
        var source = {};

        for (var key in extractedFields) {
            if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
                var value = extractedFields[key];

                if (isNumeric(value)) {
                    source[key] = 1 * value;
                    continue;
                }

                var jsonSubString = extractJson(value);
                if (jsonSubString !== null) {
                    source['$' + key] = JSON.parse(jsonSubString);
                }

                source[key] = value;
            }
        }
        return source;
    }

    var jsonSubString = extractJson(message);
    if (jsonSubString !== null) {
        return JSON.parse(jsonSubString);
    }

    return {};
}

function extractJson(message) {
    var jsonStart = message.indexOf('{');
    if (jsonStart < 0) return null;
    var jsonSubString = message.substring(jsonStart);
    return isValidJson(jsonSubString) ? jsonSubString : null;
}

function isValidJson(message) {
    try {
        JSON.parse(message);
    } catch (e) { return false; }
    return true;
}

function isNumeric(n) {
    return !isNaN(parseFloat(n)) && isFinite(n);
}

function post(body, callback) {
    var requestParams = buildRequest(endpoint, body);

    var request = https.request(requestParams, function(response) {
        var responseBody = '';
        response.on('data', function(chunk) {
            responseBody += chunk;
        });

        response.on('end', function() {
            var info = JSON.parse(responseBody);
            var failedItems;
            var success;
            var error;

            if (response.statusCode >= 200 && response.statusCode < 299) {
                failedItems = info.items.filter(function(x) {
                    return x.index.status >= 300;
                });

                success = {
                    "attemptedItems": info.items.length,
                    "successfulItems": info.items.length - failedItems.length,
                    "failedItems": failedItems.length
                };
            }

            if (response.statusCode !== 200 || info.errors === true) {
                // prevents logging of failed entries, but allows logging
                // of other errors such as access restrictions
                delete info.items;
                error = {
                    statusCode: response.statusCode,
                    responseBody: info
                };
            }

            callback(error, success, response.statusCode, failedItems);
        });
    }).on('error', function(e) {
        callback(e);
    });
    request.end(requestParams.body);
}

function buildRequest(endpoint, body) {
    var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
    var region = endpointParts[2];
    var service = endpointParts[3];
    var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
    var date = datetime.substr(0, 8);
    var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
    var kRegion = hmac(kDate, region);
    var kService = hmac(kRegion, service);
    var kSigning = hmac(kService, 'aws4_request');

    var request = {
        host: endpoint,
        method: 'POST',
        path: '/_bulk',
        body: body,
        headers: {
            'Content-Type': 'application/json',
            'Host': endpoint,
            'Content-Length': Buffer.byteLength(body),
            'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
            'X-Amz-Date': datetime
        }
    };

    var canonicalHeaders = Object.keys(request.headers)
        .sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
        .map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
        .join('\n');

    var signedHeaders = Object.keys(request.headers)
        .map(function(k) { return k.toLowerCase(); })
        .sort()
        .join(';');

    var canonicalString = [
        request.method,
        request.path, '',
        canonicalHeaders, '',
        signedHeaders,
        hash(request.body, 'hex'),
    ].join('\n');

    var credentialString = [ date, region, service, 'aws4_request' ].join('/');

    var stringToSign = [
        'AWS4-HMAC-SHA256',
        datetime,
        credentialString,
        hash(canonicalString, 'hex')
    ] .join('\n');

    request.headers.Authorization = [
        'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
        'SignedHeaders=' + signedHeaders,
        'Signature=' + hmac(kSigning, stringToSign, 'hex')
    ].join(', ');

    return request;
}

function hmac(key, str, encoding) {
    return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}

function hash(str, encoding) {
    return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}

function logFailure(error, failedItems) {
    if (logFailedResponses) {
        console.log('Error: ' + JSON.stringify(error, null, 2));

        if (failedItems && failedItems.length > 0) {
            console.log("Failed Items: " +
                JSON.stringify(failedItems, null, 2));
        }
    }
}

コピペが完了したら、デプロイボタンをクリックします。
2022-01-11_16h22_02.png

最後にCloudWatch Logsコンソール上に、本Lambda関数のログストリームが作成され、エラー発生が無い事を確認します。
2022-01-11_16h16_03.png

5.OpenSearchの初期設定を実行する

OpenSearchコンソールから、OpenSearch Dashboards の URLを確認し、アクセスします。
2022-01-11_16h25_42.png

※正常にアクセスできない場合、アクセス元IPアドレスの指定が不完全な可能性が大きいです。
 ドメインの「セキュリティ設定>編集」 からアクセスポリシーを開き、ビジュアルエディタから許可したいIPアドレスを設定してください。

OpenSearchにアクセスできたら、左上のハンバーガーメニューから、[Index Management>Indices]と遷移し、「cwl-yyyy.mm.dd」という名前のインデックスが作成されている事を確認します。
2022-01-11_16h28_22.png

確認が出来たら、Index Patternsを登録します。
左上のハンバーガーメニューから、[Stack Management>Index patterns]と遷移し、「Create index pattern」をクリックします。
2022-01-11_16h48_51.png

「Index pattern name」に「cwl-*」を指定し、Next Step。
2022-01-11_16h28_51.png

「Time Field」に「@timestamp」を指定し、「Create index pattern」をクリック。
2022-01-11_16h28_59.png

46のFieldを持つインデックスパターンが作成された事を確認します。
2022-01-11_16h51_55.png

6.OpenSearchでダッシュボードを作る

OpenSearch左上のハンバーガーメニューから、[Discover]へ遷移し、CHANGE INDEX PATTERNメニューから手順5で作成した、[cwl-*]を選択し、正しく転送されたログが表示される事を確認してください。
2022-01-11_16h29_39.png

続いて左上のハンバーガーメニューから、[Visualize]へ遷移し、「Create Visualize」をクリック。
以下3パターンのグラフを作成します。
2022-01-11_16h31_19.png

■Line(単位時間あたりの応答サイズ数合計) → Name:Apache_SizeChart
2022-01-11_16h41_59.png

■Vertical Bar(UA TOP30) → Name: Apache_StatusCode
2022-01-11_16h36_31.png

■Pie(UA TOP30) → Name:Apache_UA
2022-01-11_16h37_39.png

最後に左上のハンバーガーメニューから、[DashBoard]へ遷移し、今作成したVizualizeを並べます。
2022-01-11_16h44_31.png

簡単ですが、Apacheログの可視化が完了しました。

番外編

OpenSearchで用意されているダミーデータで遊んでみる

OpenSearchダッシュボードへ最初に接続した段階でお気づきだと思いますが、OpenSearchには予め用意されているダミーデータがあります。
2022-01-11_17h04_46.png

ECサイトのオーダーデータ、フライトデータ、WEB Logの3点セットになっていて、インデックスファイルのみではなくVizualizeやDashBoardも含まれています。
2022-01-11_18h18_14.png

OpenSearchでどんなことが出来るのか?や、ダッシュボード作成のコツをつかむ事が出来ると思います。
是非活用してみてください。
セッションの中でお話した通り、OpenSearchだけを試してみるアプローチならば、後編手順の #1とこの番外編のみでも触り始める事が出来ます。

全行程終了

もし、個人アカウントで試されている方おられましたら、Cloudwatch Logsのログ課金とOpenSearchクラスタの課金は高額になる可能性がありますので、その点だけ留意ください。

今回はApache2のダミーログを生成してテストをしてみましたが、ここまでの道のりをマスターすれば、あとはデータを本番に切り替えるだけです。
本番データと似た構造のダミーデータを使うなどして検証をさらに1歩2歩進める事で、どんなアウトプットが必要なのか、そのためにどういったデータが必要か、どんなダッシュボードが必要か等、どんどん明確になってくるはずです。
また一旦このような環境を作ってしまえば、例えば以下のように他のAWSサービスを利用して、検証を進めるのもずいぶんとハードルが低くなった事と思います。
2022-01-11_15h36_48.png

データ活用とは、初めから「おむつとビール」が得られるものではなく、そのような相関に気づく為の土台を作る事こそが本質です。
このエントリが、皆様のデータ活用の推進や、ログ分析基盤を構築する一助になれば幸いです。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0