あらすじ
前回こちらの記事で、CloudwatchLogs を Kinesis Firehose 経由で S3 に保存する構成を CDK で構築した。
今回は、CDK ではありませんが次に説明することをやってみたいと思います。
やってみたいこと
S3 バケットに保存される際、S3 のプレフィックスやオブジェクト名に<ログストリーム名>を付与したい。
理由としては、S3 にはロググループ毎では保管できるが、どの<ログストリーム名>のものかわからないためである。
プレフィックスの場合
s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/
を
↓
s3://<S3バケット名>/<ロググループ名>/<ログストリーム名>/YYYY/MM/DD/hh/
に保存できるようする
オブジェクト名の場合
s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<自動で作成されるオブジェクト名>
を
↓
s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<ログストリーム名+作成されるオブジェクト名>
に保存できるようする
先に結論
動的パーティショニング を応用して実現してみます。
動かしてみた方が早いので、ここでは細かい説明は行いません。
前提
以下の通り、1 つのロググループに 001
と 002
の EC2 インスタンスをログストリーム毎に分けて配信可能済みであること。
必要な設定
必要な設定は 大まかに分けて 4 つです。
1. Lambda 関数の準備
次の通り Lambda を準備します。
import base64
import json
import gzip
from io import BytesIO
def lambda_handler(event, context):
output = []
for record in event['records']:
# 解凍
compressed_payload = base64.b64decode(record['data'])
with gzip.GzipFile(fileobj=BytesIO(compressed_payload)) as gzipfile:
payload = gzipfile.read().decode('utf-8')
log_data = json.loads(payload)
# ロググループ名とログストリーム名を取得
logGroup = log_data['logGroup']
logStream = log_data['logStream']
# partitionKeys としてロググループ名とログストリーム名を作成
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': record['data'],
'metadata': {
'partitionKeys': {
'log_Group': logGroup,
'log_Stream': logStream
}
}
}
output.append(output_record)
return {'records': output}
2. Kinesis Firehose で Lambda を指定
Kinesis Firehose で先程作成した Lambda を指定してあげます。
3. Kinesis Firehose IAM ロール
Kinesis Firehose の IAM ロールに Lamba へのアクセス許可を追加してあげます。
- "lambda:InvokeFunction",
- "lambda:GetFunctionConfiguration"
4. 動的パーティショニング
ここでは、「動的パーティショニング」を有効にするのと S3 バケットプレフィックスで、
!{partitionKeyFromLambda:log_Group}/!{partitionKeyFromLambda:log_Stream}/!{timestamp:yyyy/MM/dd/HH}
とするだけです。
partitionKeyFromLambda
を用いて、Lambda で作成した log_Group
と log_Stream
を value としています。詳しくは以下のドキュメントを参考にして下さい。
なお、確か「動的パーティショニング」は後から有効にできなかったはずですので、注意が必要です。
動作確認(プレフィックスで分ける場合)
s3://<S3バケット名>/<ロググループ名>/<ログストリーム名>/YYYY/MM/DD/hh/
のようにログストリーム名毎に分けることができたことを確認できたと思います。
001 側
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474848241227510895903041114565891810464664165267865600","timestamp":1725271864178,"message":"10.0.11.231 - - [02/Sep/2024:10:11:03 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848280365318719324284727961077378962538608263495681","timestamp":1725271865933,"message":"10.0.11.231 - - [02/Sep/2024:10:11:04 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474848388256323989815439494678087929911863029822128128","timestamp":1725271870771,"message":"10.0.11.231 - - [02/Sep/2024:10:11:05 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848476032057091231972179762675051055813917361045505","timestamp":1725271874707,"message":"10.0.11.231 - - [02/Sep/2024:10:11:13 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848487204730435695814373672069905652643031857233922","timestamp":1725271875208,"message":"10.0.11.231 - - [02/Sep/2024:10:11:14 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
002 側
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474848643889766200571972579962037854602605850348748800","timestamp":1725271882234,"message":"172.31.41.216 - - [02/Sep/2024:10:11:17 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474850963167266847756779425402733212395435833158205440","timestamp":1725271986234,"message":"172.31.41.216 - - [02/Sep/2024:10:13:01 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"-\""}]}
一部設定変更
今度はオブジェクト名にログストリーム名が付与できることを確認したいので、一部設定変更します。
動的パーティショニングのところで、S3 バケットプレフィックスを、
!{partitionKeyFromLambda:log_Group}/!{timestamp:yyyy/MM/dd/HH}/!{partitionKeyFromLambda:log_Stream}_
にします。(最後は / を入れないでください)
動作確認 2 (オブジェクト名)
s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<ログストリーム名+作成されるオブジェクト名>
で保存されたことが確認できたと思います。
001 側
{"messageType":"DATA_MESSAGE","owner":"1234567898012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474961460728237633567838194269239943259741304803688448","timestamp":1725276941116,"message":"10.0.11.231 - - [02/Sep/2024:11:35:40 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961483118185812892583828371101088998696256808026113","timestamp":1725276942120,"message":"10.0.11.231 - - [02/Sep/2024:11:35:41 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"1234567898012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474961586861252476457042689602693140315831526028279808","timestamp":1725276946772,"message":"10.0.11.231 - - [02/Sep/2024:11:35:41 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
002 側
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474961557758779992374579488346432636298668496297066496","timestamp":1725276945467,"message":"172.31.41.216 - - [02/Sep/2024:11:35:44 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961574528940381669608090781292777330236348794339329","timestamp":1725276946219,"message":"172.31.41.216 - - [02/Sep/2024:11:35:45 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961664066432353770060004047201642013407795305709570","timestamp":1725276950234,"message":"172.31.41.216 - - [02/Sep/2024:11:35:46 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474962110059035579183992241553844889317396301754597376","timestamp":1725276970233,"message":"185.16.39.118 - - [02/Sep/2024:11:36:05 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.46\""}]}
個人的に気になる点
- 動的パーティショニングのコストが気になる
- レコード単位で処理されるため、ログの量が膨大な場合は、Lambda が忙しくなる