0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

cloudwatch logs -> lambda -> firehose -> s3

Last updated at Posted at 2025-04-06

lambda role

AWSTemplateFormatVersion: "2010-09-09"
Description: "Kinesis Firehose to S3"

Resources:
  LambdaInvokePermission:
    Type: "AWS::Lambda::Permission"
    Properties:
      FunctionName: LogProcessingFunction
      Action: "lambda:InvokeFunction"
      Principal: "logs.amazonaws.com"
      # SourceArn: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:${LogGroupName}:*"
      SourceArn: !Sub "arn:aws:logs:ap-northeast-1:xxx:log-group:/aws/lambda/ttt-bk-OAuthFunction-3wfxLYADV3Fl:*"

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: LambdaExecutionPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                  - firehose:PutRecord
                  - firehose:PutRecordBatch
                Resource: "*"

Outputs:
  LambdaExecutionRoleArn:
    Description: "ARN of the Lambda Execution Role"
    Value: !GetAtt LambdaExecutionRole.Arn
    Export:
      Name: LambdaExecutionRoleArn

service role

AWSTemplateFormatVersion: "2010-09-09"
Description: "Kinesis Firehose to S3"

Resources:
  # SubscriptionExecutionRole:
  #   Type: AWS::IAM::Role
  #   Properties:
  #     AssumeRolePolicyDocument:
  #       Version: "2012-10-17"
  #       Statement:
  #         - Effect: Allow
  #           Principal:
  #             Service:
  #               - logs.amazonaws.com
  #           Action:
  #             - sts:AssumeRole
  #     Policies:
  #       - PolicyName: InvokeLambdaPolicy
  #         PolicyDocument:
  #           Version: "2012-10-17"
  #           Statement:
  #             - Effect: Allow
  #               Action:
  #                 - lambda:InvokeFunction
  #               Resource: "arn:aws:lambda:ap-northeast-1:123456789012:function:your-lambda-function-name"

  FirehoseExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - firehose.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: FirehoseAccessPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:AbortMultipartUpload
                Resource: !Sub "arn:aws:s3:::aaaaaaaaaaaaaaaaaaaa/*"


Outputs:
  # SubscriptionExecutionRoleArn:
  #   Description: ARN of the subscription execution role
  #   Value: !GetAtt SubscriptionExecutionRole.Arn
  #   Export:
  #     Name: SubscriptionExecutionRoleArn
  
  FirehoseExecutionRoleArn:
    Description: ARN of the Firehose execution role
    Value: !GetAtt FirehoseExecutionRole.Arn
    Export:
      Name: FirehoseExecutionRoleArn

lambda

AWSTemplateFormatVersion: '2010-09-09'
Resources:
  LogProcessingFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: "LogProcessingFunction" 
      Handler: index.lambda_handler
      Runtime: python3.9
      Role: !ImportValue LambdaExecutionRoleArn
      Code:
        ZipFile: |
          import gzip
          import json
          import base64
          import boto3
          import io

          firehose_client = boto3.client('firehose')
          FIREHOSE_STREAM_NAME = 'logprocessing-firehose-stream'

          def lambda_handler(event, context):
              try:
                  # Base64デコード
                  compressed_payload = event['awslogs']['data']
                  decoded_data = base64.b64decode(compressed_payload)

                  # GZIP解凍
                  with gzip.GzipFile(fileobj=io.BytesIO(decoded_data)) as gz:
                      uncompressed_payload = gz.read()

                  # JSONに変換
                  log_data = json.loads(uncompressed_payload)

                  # ログイベントを整形
                  records = []
                  for log_event in log_data['logEvents']:
                      record = {
                          'Data': json.dumps({
                              'timestamp': log_event['timestamp'],
                              'message': log_event['message'],
                              'logStream': log_data['logStream'],
                          }) + '\n'
                      }
                      records.append(record)

                  # Firehoseに送信
                  response = firehose_client.put_record_batch(
                      DeliveryStreamName=FIREHOSE_STREAM_NAME,
                      Records=records
                  )
                  print("Firehose Response:", response)

              except Exception as e:
                  print("Error:", e)
                  raise e

statefull

AWSTemplateFormatVersion: '2010-09-09'
Resources:
  SubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      LogGroupName: "/aws/lambda/ttt-bk-OAuthFunction-xxx" # 対象のCloudWatch Logグループ名
      FilterPattern: ""  # 全てのログを送信
      DestinationArn: "arn:aws:lambda:ap-northeast-1:xxx:function:LogProcessingFunction" # Lambda関数のARN
      # RoleArn: !ImportValue SubscriptionExecutionRoleArn # スタック間でエクスポートされたロールをインポート

  FirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn: LogDataBucket
    Properties:
      DeliveryStreamName: logprocessing-firehose-stream  # Firehoseの名前を指定
      DeliveryStreamType: DirectPut
      S3DestinationConfiguration:
        BucketARN: "arn:aws:s3:::logprocessingfunctionttt"
        RoleARN: !ImportValue FirehoseExecutionRoleArn
        Prefix: "logs/"
        BufferingHints:
          SizeInMBs: 5
          IntervalInSeconds: 60
        CompressionFormat: GZIP


  LogDataBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: logprocessingfunctionttt


Outputs:
  FirehoseStreamName:
    Description: Name of the Firehose delivery stream
    Value: !Ref FirehoseDeliveryStream
    Export:
      Name: FirehoseStreamName

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?