CloudWatch Logsに出力されたログはJSON形式に限らず、さまざまなフォーマットが存在します。Lambda関数でログインサイトAPIを直接呼び出してログを抽出し処理する方法はありますが、ログインサイトAPIの使い方は通常のSQLと異なり、慣れていないところがあります。
代わりに、Lambda関数からCloudWatch Logsへ直接アクセスして処理するのではなく、CloudWatch LogsのログデータをS3に出力し、Athenaを使用してSQL文を実行して検索する方が使いやすいと考えます。
CloudWatch LogsからKinesis Data FirehoseもしくはKinesis Data Streamへの出力には、CloudWatch Logsのサブスクリプションフィルタの設定が必要です。Kinesis Data Streamは直接S3に連携できない仕様となり、そのため、Kinesis Data Stream → Kinesis Data Firehose → S3という流れになります。
※リアルタイムでデータ処理を行いたい場合、Kinesis Data Stream → Kinesis Data Firehose → S3の形で対応する必要があります。
■構成図
①②両方とも実現できますが、KinesisFirehoseの設定で、Sourceは違います。
①の場合、KinesisDataStream、②の場合、Direct PUTという違いがあります。
S3への出力元となるCloudWatchLogs(ロググループ)の作成については割愛とさせていただきますので、任意のロググループを利用してください。
■作成方法
★ Amazon Kinesis → データストリーム → asp-apachelog-kinesis ★
- 名前
- asp-apachelog-kinesis
- 容量モード
- プロビジョンド
- プロビジョニングされたシャード
- 1
- 書き込み容量
- 最大1 MiB/秒/1,000 レコード/秒
- 読み込み容量
- 最大2 MiB/秒
- 暗号化
- KMS のカスタマーマスターキー (CMK):alias/aws/kinesis
- データ保持
- 24 時間 (1 日)
- 拡張 (シャードレベル) メトリクス
- 無効
★ Amazon Kinesis → Data Firehose → asp-apachelog-kinesisfirehose ★
- ソースの設定
- Kinesis データストリーム(asp-apachelog-kinesis)
- レコードを変換および転換
- ※AWS Lambda でソースレコードを変換(Lambda関数asp-log-data-transform-func)
- ランタイム
- nodejs18.x
- バッファサイズ
- 1 MiB
- タイムアウト
- 1 分
- バッファ間隔
- 60 秒
- 送信先の設定
- S3 バケット(asp-apachelog-bucket)
※KinesisDataFirehoseは、S3、Lambdaへアクセスする権限が必要ですので、以下の通り作成してください。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"s3:Abort*",
"s3:DeleteObject*",
"s3:GetBucket*",
"s3:GetObject*",
"s3:List*",
"s3:PutObject",
"s3:PutObjectLegalHold",
"s3:PutObjectRetention",
"s3:PutObjectTagging",
"s3:PutObjectVersionTagging"
],
"Resource": [
"arn:aws:s3:::asp-apachelog-bucket",
"arn:aws:s3:::asp-apachelog-bucket/*"
],
"Effect": "Allow"
},
{
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:ap-northeast-1:××:log-group:monitorApacheLogGroup-stack-DeliveryStreamLogGroup9D8FA3BB-LaERqMLkDrFu:*",
"Effect": "Allow"
},
{
"Action": "lambda:InvokeFunction",
"Resource": [
"arn:aws:lambda:ap-northeast-1:000000000000:function:asp-log-data-transform-func",
"arn:aws:lambda:ap-northeast-1:000000000000:function:asp-log-data-transform-func:*"
],
"Effect": "Allow"
}
]
}
■Lambda(asp-log-data-transform-func)
今回はEC2から吐き出したApache access logsを処理するため、Lambdaを作成しました。
var import_buffer = require("buffer");
var zlib = __toESM(require("zlib"));
console.log("Loading function");
var parser = /(.+) - - (\[.+\]) (\".+\") (.+) (.+) (\".+\") (\".+\")$/;
exports.handler = (event, context, callback) => {
let success = 0;
let failure = 0;
const output = event.records.map((record) => {
const entrys = import_buffer.Buffer.from(record.data, "base64");
const entry = zlib.gunzipSync(entrys).toString("utf-8");
const logDataObject = JSON.parse(entry);
console.log(logDataObject.logEvents[0].message);
const match = parser.exec(logDataObject.logEvents[0].message);
console.log(match);
if (match) {
const result = `${match[1]},${match[2]},${match[3]},${match[4]},${match[5]},${match[6]},${match[7]}`;
const payload = import_buffer.Buffer.from(result, "utf8").toString("base64");
success++;
return {
recordId: record.recordId,
result: "Ok",
data: payload
};
} else {
failure++;
return {
recordId: record.recordId,
result: "ProcessingFailed",
data: record.data
};
}
});
console.log(`Processing completed. Successful records ${success}, Failed records ${failure}.`);
callback(null, {records: output});
};
CSV Formatへ変換してS3へ溜める形となります。
Lambdaの権限は、処理結果を分析するため、以下の通り、CloudWatchLogsへアクセス権限のみで良いです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
■Athena
CREATE EXTERNAL TABLE `asp_apache_logs`(
`client_ip` string COMMENT 'from deserializer',
`request_received_time` string COMMENT 'from deserializer',
`client_request` string COMMENT 'from deserializer',
`server_status` string COMMENT 'from deserializer',
`returned_obj_size` string COMMENT 'from deserializer',
`flg` string COMMENT 'from deserializer',
`agent` string COMMENT 'from deserializer')
ROW FORMAT SERDE
'com.amazonaws.glue.serde.GrokSerDe'
WITH SERDEPROPERTIES (
'input.format'='^%{IPV4:client_ip},%{GREEDYDATA:request_received_time},%{QUOTEDSTRING:client_request},%{DATA:server_status},%{DATA: returned_obj_size},%{QUOTEDSTRING:flg},%{QUOTEDSTRING:agent}$')
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
's3://asp-apachelog-bucket/environment/data'
TBLPROPERTIES (
'transient_lastDdlTime'='1698454305')
■Athena SQL文(DML)実行イメージ
結果を監視したい場合、別途Athenaへクエリを発行するLambdaを作成すれば良いです。
■まとめ
CloudWatch Logsに出力されたログの処理について、Kinesis Data Stream → Kinesis Data Firehose → S3 → Athenaを例に挙げましたが、Kinesisの利用には料金がかかり、コスト面には十分に注意が必要です。