やりたい事
2022/01/13開催の SB Tech Festival で登壇した内容、「ビッグデータ活用の第一歩AWS環境での大容量ログ可視化」の詳細手順(後半)ページです。
このエントリで解説するのは以下のイメージ
前半部分のFargate+Firelens+CloudwatchLogsについてはこちらの記事を参照してください。
手順
2022/01/11時点でECSコンソールは新しいエクスペリエンスが提供されていますが、本エントリでは旧UIをベースに解説しています。
1.OpenSearchクラスターを作成する
AWSコンソールからOpenSearchダッシュボードを開き、新しいドメインを作成します。
項目名 | 設定内容 |
---|---|
ドメイン名 | <任意の名前> |
カスタムエンドポイント | チェック無し |
デプロイタイプ | 開発及びテスト |
バージョン | 1.0 ※より新しいバージョンがリリースされていたとしても、Lambdaコードの後方互換が切られる可能性があります。自身でコード改修出来ない場合は、1.0選択をお願いします |
自動調整 | 無効化 |
データノード | t3.small.search ※インスタンスサイズを大きくするのは任意ですが、課金に直結するのでご注意ください。t3.smallは無料利用枠有り |
ノードの数 | 1 |
ストレージタイプ | EBS |
EBS ボリュームタイプ | 汎用(SSD) |
ノード当たりのEBSストレージサイズ | 20GiB |
きめ細かなアクセスコントロール | チェック無し |
アクセスポリシー | ドメインレベルのアクセスポリシーの設定(ビジュアルエディタでダッシュボードへのアクセスを許可するIPv4/v6アドレスを指定します) |
暗号化 | 全てチェック |
AWS KMSキーを選択する | AWS 所有キーを使用する |
作成後、10分ほど時間をおいてデプロイが完了する事を確認してください。
(クラスターのヘルスは黄色or緑のいずれかであれば、次の手順へ進んで問題ありません。)
2.CloudWatch Logs用IAMロールを作成する
以下のJSONをコピペする。これはLambdaからCloudwatchLogs及びElasticSearchの必要な操作が出来るように権限を付与する内容。
より権限を絞ったり、リソースを指定するのがより良いですが、今回はその他リソースが無い前提で広く権限を持たせます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "logs:*",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "es:*",
"Resource": "*"
}
]
}
次の画面で、作成済みのIAM管理ポリシーを指定しLambda向けロール作成完了。
IAM管理ポリシー、IAMロールのタグや名前は任意の内容でOK。
3.CloudWatch Logsサブスクリプションフィルターを作成する
前半で作成したCloudWatch Logグループ /apache-dummy-logを選択し、図示の通りOpenSearch向けのサブスクリプションフィルターを作成
項目名 | 設定内容 |
---|---|
アカウントを選択 | This account |
Amazon OpenSearch Service cluster | <手順1で作成したOpenSearchドメインを指定> |
ログの形式 | JSON |
サブスクリプションフィルターのパターン | <未記入のまま> |
サブスクリプションフィルター名 | <任意の名前を記載> |
以下のようにサブスクリプションフィルターが作成された事を確認してください。
4.Lambdaで軽いETL処理を実装する
今回はApache2のダミーログを発生させていますが、ご存じの通りApache2のログはスペース区切りでフィールド名がありません。
OpenSearchへ投入する前に、Lambda上でパースしてフィールド名を割り当てましょう。
Lambdaコンソールを開き、関数一覧の中にLogsToElasticsearch_apachedummylog-opensearchがある事を確認し、クリックします。
手順#2で作成したIAMロールを指定し、保存をクリックします。
続いてコードタブを開き、関数の76行目以降に以下を追加します。
var messageConvJson = logEvent.message.replace(/"/g,'\"');
messageConvJson = messageConvJson.replace(/{\\"/,'{"');
messageConvJson = messageConvJson.replace(/\\"\s*:\s*\\"/g,'":"');
messageConvJson = messageConvJson.replace(/\\"\s*,\s*\\"/g,'","');
messageConvJson = messageConvJson.replace(/\\"}/,'"}');
var messageJson = JSON.parse(messageConvJson);
let [ remote_host, remote_logname, remote_user, ts, tz, method, resource, protocol, statusCode, size, referer, ...userAgent ] = messageJson.log.split(' ');
statusCode = +statusCode;
size = +size;
ts = ts.substring(1);
tz = tz.substring(0, tz.length - 1);
const accesstime = ts + tz;
userAgent = userAgent.join('');
source['@remote_host'] = remote_host;
source['@accesstime'] = accesstime;
source['@method'] = method;
source['@resource'] = resource;
source['@protocol'] = protocol;
source['@statusCode'] = statusCode;
source['@size'] = size;
source['@referer'] = referer;
source['@userAgent'] = userAgent;
CloudWatch Logsから自動デプロイされるコードに、上記を含めたコード全体も載せておきます。
こちらをコピペしていただくのが簡単です。
コード全体
// v1.1.2
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
var endpoint = 'search-apachedummylog-opensearch-amepwsh24gynwv63reszss5u4e.ap-northeast-1.es.amazonaws.com';
// Set this to true if you want to debug why data isn't making it to
// your Elasticsearch cluster. This will enable logging of failed items
// to CloudWatch Logs.
var logFailedResponses = false;
exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer.from(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));
if (error) {
logFailure(error, failedItems);
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD
var indexName = [
'cwl-' + timestamp.getUTCFullYear(), // year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
].join('.');
var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
//console.log("logEvent.message = " + logEvent.message);
var messageConvJson = logEvent.message.replace(/"/g,'\"');
messageConvJson = messageConvJson.replace(/{\\"/,'{"');
messageConvJson = messageConvJson.replace(/\\"\s*:\s*\\"/g,'":"');
messageConvJson = messageConvJson.replace(/\\"\s*,\s*\\"/g,'","');
messageConvJson = messageConvJson.replace(/\\"}/,'"}');
//console.log("messageConvJson = " + messageConvJson);
var messageJson = JSON.parse(messageConvJson);
//console.log("messageJson.log = " + messageJson.log);
let [ remote_host, remote_logname, remote_user, ts, tz, method, resource, protocol, statusCode, size, referer, ...userAgent ] = messageJson.log.split(' ');
statusCode = +statusCode;
size = +size;
ts = ts.substring(1);
tz = tz.substring(0, tz.length - 1);
const accesstime = ts + tz;
userAgent = userAgent.join('');
source['@remote_host'] = remote_host;
source['@accesstime'] = accesstime;
source['@method'] = method;
source['@resource'] = resource;
source['@protocol'] = protocol;
source['@statusCode'] = statusCode;
source['@size'] = size;
source['@referer'] = referer;
source['@userAgent'] = userAgent;
console.log(source)
var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}
function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};
for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}
var jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}
source[key] = value;
}
}
return source;
}
var jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}
return {};
}
function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}
function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
var error;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
if (response.statusCode !== 200 || info.errors === true) {
// prevents logging of failed entries, but allows logging
// of other errors such as access restrictions
delete info.items;
error = {
statusCode: response.statusCode,
responseBody: info
};
}
callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 : 1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');
var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str, 'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
function logFailure(error, failedItems) {
if (logFailedResponses) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
}
}
最後にCloudWatch Logsコンソール上に、本Lambda関数のログストリームが作成され、エラー発生が無い事を確認します。
5.OpenSearchの初期設定を実行する
OpenSearchコンソールから、OpenSearch Dashboards の URLを確認し、アクセスします。
※正常にアクセスできない場合、アクセス元IPアドレスの指定が不完全な可能性が大きいです。
ドメインの「セキュリティ設定>編集」 からアクセスポリシーを開き、ビジュアルエディタから許可したいIPアドレスを設定してください。
OpenSearchにアクセスできたら、左上のハンバーガーメニューから、[Index Management>Indices]と遷移し、「cwl-yyyy.mm.dd」という名前のインデックスが作成されている事を確認します。
確認が出来たら、Index Patternsを登録します。
左上のハンバーガーメニューから、[Stack Management>Index patterns]と遷移し、「Create index pattern」をクリックします。
「Index pattern name」に「cwl-*」を指定し、Next Step。
「Time Field」に「@timestamp」を指定し、「Create index pattern」をクリック。
46のFieldを持つインデックスパターンが作成された事を確認します。
6.OpenSearchでダッシュボードを作る
OpenSearch左上のハンバーガーメニューから、[Discover]へ遷移し、CHANGE INDEX PATTERNメニューから手順5で作成した、[cwl-*]を選択し、正しく転送されたログが表示される事を確認してください。
続いて左上のハンバーガーメニューから、[Visualize]へ遷移し、「Create Visualize」をクリック。
以下3パターンのグラフを作成します。
■Line(単位時間あたりの応答サイズ数合計) → Name:Apache_SizeChart
■Vertical Bar(UA TOP30) → Name: Apache_StatusCode
■Pie(UA TOP30) → Name:Apache_UA
最後に左上のハンバーガーメニューから、[DashBoard]へ遷移し、今作成したVizualizeを並べます。
簡単ですが、Apacheログの可視化が完了しました。
番外編
OpenSearchで用意されているダミーデータで遊んでみる
OpenSearchダッシュボードへ最初に接続した段階でお気づきだと思いますが、OpenSearchには予め用意されているダミーデータがあります。
ECサイトのオーダーデータ、フライトデータ、WEB Logの3点セットになっていて、インデックスファイルのみではなくVizualizeやDashBoardも含まれています。
OpenSearchでどんなことが出来るのか?や、ダッシュボード作成のコツをつかむ事が出来ると思います。
是非活用してみてください。
セッションの中でお話した通り、OpenSearchだけを試してみるアプローチならば、後編手順の #1とこの番外編のみでも触り始める事が出来ます。
全行程終了
もし、個人アカウントで試されている方おられましたら、Cloudwatch Logsのログ課金とOpenSearchクラスタの課金は高額になる可能性がありますので、その点だけ留意ください。
今回はApache2のダミーログを生成してテストをしてみましたが、ここまでの道のりをマスターすれば、あとはデータを本番に切り替えるだけです。
本番データと似た構造のダミーデータを使うなどして検証をさらに1歩2歩進める事で、どんなアウトプットが必要なのか、そのためにどういったデータが必要か、どんなダッシュボードが必要か等、どんどん明確になってくるはずです。
また一旦このような環境を作ってしまえば、例えば以下のように他のAWSサービスを利用して、検証を進めるのもずいぶんとハードルが低くなった事と思います。
データ活用とは、初めから「おむつとビール」が得られるものではなく、そのような相関に気づく為の土台を作る事こそが本質です。
このエントリが、皆様のデータ活用の推進や、ログ分析基盤を構築する一助になれば幸いです。