0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CloudwatchLogs を Kinesis Firehose 経由で S3 にログストリーム毎に保存する

Last updated at Posted at 2024-09-02

あらすじ

前回こちらの記事で、CloudwatchLogs を Kinesis Firehose 経由で S3 に保存する構成を CDK で構築した。

今回は、CDK ではありませんが次に説明することをやってみたいと思います。

やってみたいこと

S3 バケットに保存される際、S3 のプレフィックスやオブジェクト名に<ログストリーム名>を付与したい。

理由としては、S3 にはロググループ毎では保管できるが、どの<ログストリーム名>のものかわからないためである。

プレフィックスの場合

s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/

s3://<S3バケット名>/<ロググループ名>/<ログストリーム名>/YYYY/MM/DD/hh/に保存できるようする

オブジェクト名の場合

s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<自動で作成されるオブジェクト名>

s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<ログストリーム名+作成されるオブジェクト名>に保存できるようする

先に結論

動的パーティショニング を応用して実現してみます。

動かしてみた方が早いので、ここでは細かい説明は行いません。

前提

以下の通り、1 つのロググループに 001002 の EC2 インスタンスをログストリーム毎に分けて配信可能済みであること。

スクリーンショット 2024-09-02 18.51.26.png

必要な設定

必要な設定は 大まかに分けて 4 つです。

1. Lambda 関数の準備

次の通り Lambda を準備します。

import base64
import json
import gzip
from io import BytesIO

def lambda_handler(event, context):
    output = []
    
    for record in event['records']:
        # 解凍
        compressed_payload = base64.b64decode(record['data'])
        with gzip.GzipFile(fileobj=BytesIO(compressed_payload)) as gzipfile:
            payload = gzipfile.read().decode('utf-8')
        
        log_data = json.loads(payload)
        
        # ロググループ名とログストリーム名を取得
        logGroup = log_data['logGroup']
        logStream = log_data['logStream']

        # partitionKeys としてロググループ名とログストリーム名を作成
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': record['data'],
            'metadata': {
                'partitionKeys': {
                    'log_Group': logGroup,                    
                    'log_Stream': logStream
                }
            }
        }
        
        output.append(output_record)
    
    return {'records': output}

2. Kinesis Firehose で Lambda を指定

Kinesis Firehose で先程作成した Lambda を指定してあげます。

スクリーンショット 2024-09-02 20.13.38.png

3. Kinesis Firehose IAM ロール

Kinesis Firehose の IAM ロールに Lamba へのアクセス許可を追加してあげます。

  • "lambda:InvokeFunction",
  • "lambda:GetFunctionConfiguration"

4. 動的パーティショニング

ここでは、「動的パーティショニング」を有効にするのと S3 バケットプレフィックスで、
!{partitionKeyFromLambda:log_Group}/!{partitionKeyFromLambda:log_Stream}/!{timestamp:yyyy/MM/dd/HH}
とするだけです。

スクリーンショット 2024-09-02 20.05.12.png

partitionKeyFromLambdaを用いて、Lambda で作成した log_Grouplog_Stream を value としています。詳しくは以下のドキュメントを参考にして下さい。

なお、確か「動的パーティショニング」は後から有効にできなかったはずですので、注意が必要です。

動作確認(プレフィックスで分ける場合)

s3://<S3バケット名>/<ロググループ名>/<ログストリーム名>/YYYY/MM/DD/hh/
のようにログストリーム名毎に分けることができたことを確認できたと思います。

001 側

スクリーンショット 2024-09-02 20.21.43.png

{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474848241227510895903041114565891810464664165267865600","timestamp":1725271864178,"message":"10.0.11.231 - - [02/Sep/2024:10:11:03 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848280365318719324284727961077378962538608263495681","timestamp":1725271865933,"message":"10.0.11.231 - - [02/Sep/2024:10:11:04 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474848388256323989815439494678087929911863029822128128","timestamp":1725271870771,"message":"10.0.11.231 - - [02/Sep/2024:10:11:05 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848476032057091231972179762675051055813917361045505","timestamp":1725271874707,"message":"10.0.11.231 - - [02/Sep/2024:10:11:13 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474848487204730435695814373672069905652643031857233922","timestamp":1725271875208,"message":"10.0.11.231 - - [02/Sep/2024:10:11:14 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}

002 側

スクリーンショット 2024-09-02 20.28.37.png

{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474848643889766200571972579962037854602605850348748800","timestamp":1725271882234,"message":"172.31.41.216 - - [02/Sep/2024:10:11:17 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474850963167266847756779425402733212395435833158205440","timestamp":1725271986234,"message":"172.31.41.216 - - [02/Sep/2024:10:13:01 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"-\""}]}

一部設定変更

今度はオブジェクト名にログストリーム名が付与できることを確認したいので、一部設定変更します。

動的パーティショニングのところで、S3 バケットプレフィックスを、
!{partitionKeyFromLambda:log_Group}/!{timestamp:yyyy/MM/dd/HH}/!{partitionKeyFromLambda:log_Stream}_
にします。(最後は / を入れないでください)

スクリーンショット 2024-09-02 20.36.48.png

動作確認 2 (オブジェクト名)

s3://<S3バケット名>/<ロググループ名>YYYY/MM/DD/hh/<ログストリーム名+作成されるオブジェクト名>
で保存されたことが確認できたと思います。

スクリーンショット 2024-09-02 20.47.19.png

001 側

{"messageType":"DATA_MESSAGE","owner":"1234567898012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474961460728237633567838194269239943259741304803688448","timestamp":1725276941116,"message":"10.0.11.231 - - [02/Sep/2024:11:35:40 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961483118185812892583828371101088998696256808026113","timestamp":1725276942120,"message":"10.0.11.231 - - [02/Sep/2024:11:35:41 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"1234567898012","logGroup":"access_log/A001","logStream":"001_i-02045a1975e488eaa","subscriptionFilters":["test"],"logEvents":[{"id":"38474961586861252476457042689602693140315831526028279808","timestamp":1725276946772,"message":"10.0.11.231 - - [02/Sep/2024:11:35:41 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}

002 側

{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474961557758779992374579488346432636298668496297066496","timestamp":1725276945467,"message":"172.31.41.216 - - [02/Sep/2024:11:35:44 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961574528940381669608090781292777330236348794339329","timestamp":1725276946219,"message":"172.31.41.216 - - [02/Sep/2024:11:35:45 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""},{"id":"38474961664066432353770060004047201642013407795305709570","timestamp":1725276950234,"message":"172.31.41.216 - - [02/Sep/2024:11:35:46 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"curl/8.5.0\""}]}
{"messageType":"DATA_MESSAGE","owner":"123456789012","logGroup":"access_log/A001","logStream":"002_i-0a3943350e034d12d","subscriptionFilters":["test"],"logEvents":[{"id":"38474962110059035579183992241553844889317396301754597376","timestamp":1725276970233,"message":"185.16.39.118 - - [02/Sep/2024:11:36:05 +0000] \"GET / HTTP/1.1\" 200 727 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.85 Safari/537.36 Edg/90.0.818.46\""}]}

個人的に気になる点

  • 動的パーティショニングのコストが気になる
  • レコード単位で処理されるため、ログの量が膨大な場合は、Lambda が忙しくなる
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?