はじめに
チーム内で当初、ALBのアクセスログがS3のみに出力されており、「調査しずらいな」との声がちらほらあり、Cloudwatchに出力してよりログの調査がしやすくなるよう、対応しました。
しかしながら、AWS CDKかつType Scriptを使って実装している記事が調べた限りではなかなか見当たらず苦戦したので記事に残すことにしました!
僕と同じような状況で困ってる方への助けになれば幸いです!
アーキテクチャ図
調べていくと、上記の図のように、ALBのアクセスログがS3にPUTされたことをトリガーとしてlamda関数を起動し、Cloud Watchに出力するといった形をとっているパターンが多く僕もこちらの構成で行うことにしました。
いざ実装!
何か参考に慣れればと思ったものの、AWS CDKかつType Scriptを使って実装している記事が僕が調べた限りでは見当たらず、ドキュメントを読んだり、生成AIツールを駆使したりしつつ、取り組んでいくことにしました。(本当に生成AI様様でございます、、、)
実装内容
まずは呼び出し元です。
ロググループやIAMロールなどの設定は必要になると思います。
const logGroup = new LogGroup(stack, ${ID}, {
logGroupName: ${ロググループ名},
retention: RetentionDays.INFINITE
});
const lambdaRole = new Role(stack, ${ID}, {
assumedBy: new ServicePrincipal('lambda.amazonaws.com'), // Lambdaにロールを引き受けるための許可をする
roleName: ${IAMロール名}
});
logBucket.grantRead(lambdaRole);
// CloudWatchへのアクセス権限を付与
const logPolicyStatement = new PolicyStatement({
actions: ['logs:CreateLogStream', 'logs:PutLogEvents'],
resources: [logGroup.logGroupArn],
effect: Effect.ALLOW
});
lambdaRole.addToPolicy(logPolicyStatement);
// ClouWatchへの書き込み権限を付与
lambdaRole.addManagedPolicy(
ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole')
);
// Lambda関数を作成
const albLogProcessor = new NodejsFunction(stack, ${ID}, {
entry: ${Lambda関数を定義したファイルのパス},
functionName: ${Lambda関数名},
runtime: Runtime.NODEJS_22_X,
handler: 'handler',
role: ${Lambdaに付与するロール},
timeout: Duration.seconds(300),
environment: {
LOG_GROUP_NAME: ${出力先のロググループ名}
}
});
// LambdaにS3バケットへの読み取り権限を付与
${アクセスログを出力しているS3バケット}.grantRead(albLogProcessor);
// S3オブジェクトの作成をトリガーとしてLambda関数を呼び出す
albLogProcessor.addEventSource(
new S3EventSource(${アクセスログを出力しているS3バケット}, {
events: [EventType.OBJECT_CREATED]
})
);
以下がlambda関数です
import { S3Event, S3Handler } from 'aws-lambda';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import {
CloudWatchLogsClient,
CreateLogStreamCommand,
PutLogEventsCommand
} from '@aws-sdk/client-cloudwatch-logs';
const s3 = new S3Client();
const logs = new CloudWatchLogsClient();
const LOG_GROUP_NAME = process.env.LOG_GROUP_NAME || ''; // 呼び出し元のenvironmentで指定したロググループ名が入る
const MAX_LOG_PAYLOAD_SIZE = 1048576; // 1MB
const MAX_LOG_EVENTS = 10000;
async function downloadFile(bucket: string, key: string): Promise<string> {
// S3ファイルのダウンロードと解凍を行い、文字列に変換
// (略)
return ${解凍したファイル内容};
}
async function processLogEvents(logEvents: LogEvent[], logStreamName: string, objectKey: string) {
let batchCount = 1;
// 1万件ごとにループ
for (let i = 0; i < logEvents.length; i += MAX_LOG_EVENTS) {
const batchLogEvents = logEvents.slice(i, i + MAX_LOG_EVENTS);
const batchSize = batchLogEvents.reduce((acc, event) => acc + calculateEventSize(event), 0);
// サイズが1MB超えていないか判定
if (batchSize <= MAX_LOG_PAYLOAD_SIZE) {
await sendBatch(logStreamName, batchLogEvents, objectKey, batchCount);
batchCount++;
} else {
let currentBatch: LogEvent[] = [];
let currentEventSize = 0;
for (const event of batchLogEvents) {
const eventSize = calculateEventSize(event);
if (currentEventSize + eventSize > MAX_LOG_PAYLOAD_SIZE) {
await sendBatch(logStreamName, currentBatch, objectKey, batchCount);
batchCount++;
currentBatch = [];
currentEventSize = 0;
}
currentBatch.push(event);
currentEventSize += eventSize;
}
if (currentBatch.length > 0) {
await sendBatch(logStreamName, currentBatch, objectKey, batchCount);
batchCount++;
}
}
}
}
// アクセスログのサイズを計算
function calculateEventSize(logEvent: LogEvent) {
return Buffer.byteLength(logEvent.message, 'utf-8') + 26;
}
// CloudWatch Logsに送信
async function sendBatch(logStreamName: string, batchLogEvents: LogEvent[], objectKey: string) {
// ここでPutLogEventsCommandを使用。
const putLogEventsCommand = new PutLogEventsCommand({
logGroupName: LOG_GROUP_NAME,
logStreamName,
logEvents: batchLogEvents
});
try {
await logs.send(putLogEventsCommand);
console.info(`成功: ${objectKey}`);
} catch (error) {
console.error(`失敗: ${objectKey}`, error);
}
}
export const handler: S3Handler = async (event: S3Event) => {
const timestampRegex = /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/;
for (const record of event.Records) {
const bucketName = record.s3.bucket.name;
const objectKey = decodeURIComponent(record.s3.object.key.replace(/\+/g, ' '));
console.info(`ファイル処理開始: ${bucketName}, key: ${objectKey}`);
try {
const fileContent = await downloadFile(bucketName, objectKey);
const logEvents: LogEvent[] = fileContent
.split('\n')
.reduce<LogEvent[]>((acc, line) => {
// S3のログから正規表現でタイムスタンプを取得
const matchedTimeStamp = line.match(timestampRegex);
if (matchedTimeStamp) {
acc.push({
timestamp: new Date(matchedTimeStamp[0]).getTime(),
message: line
});
}
return acc;
}, []);
if (logEvents.length > 0) {
const createLogStreamCommand = new CreateLogStreamCommand({
logGroupName: LOG_GROUP_NAME,
logStreamName
});
await logs.send(createLogStreamCommand);
await processLogEvents(logEvents, logStreamName, objectKey);
} else {
console.warn(`ファイルにログがありません: ${objectKey}`);
}
} catch (error) {
console.error('ファイル処理に失敗しました', error);
}
}
};
工夫したこと
タイムスタンプ
以下の処理でタイムスタンプを詰める処理がありますが、単にnew Date()
してしまうとLambdaの実行日時が入ってしまうので、ログが出力された日時でCloudWatchに出力されるよう、正規表現でタイムスタンプを取得しました。
const logEvents: LogEvent[] = fileContent
.split('\n')
.reduce<LogEvent[]>((acc, line) => {
// S3のログから正規表現でタイムスタンプを取得
const matchedTimeStamp = line.match(timestampRegex);
if (matchedTimeStamp) {
acc.push({
timestamp: new Date(matchedTimeStamp[0]).getTime(),
message: line
});
}
return acc;
}, []);
ログを分割して処理
sendBatch
関数内にあるPutLogEventsCommand
ですが、以下のドキュメントを見るといくつか制限がある見たいです。
- 最大バッチサイズは1,048,576バイト(1MB)。
※ UTF-8のすべてのイベントメッセージの合計に、各ログイベントの26バイトを加えたものとして計算 - 最大イベント数は10000
- 14日以上過去のログはNG
etc...
調査した際に1万件、1MBを超えることは可能性として考えられたため、processLogEvents
関数では
- ログを1万件単位で切り出す
- その中で1MB以下になるようにログを分割して送る
という形でログを送るようにしました。
生成AIを駆使するとはいえど、このあたりの処理を考えるのは一筋縄ではいかなかったですし、
最初、その辺をちゃんと調べてなかったので開発の締め日直前にちょっと焦ってしまいました、、、
おわりに
記事を読んでいただきありがとうございます!
「こういう書き方したほうがいいよ」、「ここ間違えてるよ」などあればご指摘いただけるととても嬉しいです。
少しでも呼んでくださった方の役に立てる情報であれば幸いです。
参考資料