0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

firehose

Last updated at Posted at 2020-12-08
AWSTemplateFormatVersion : '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Data Pipeline

Parameters:
  ApiStageName:
    Type: String
    Default: dev
    Description: "The API Gateway Stage name (e.g. dev, prod, etc.)"
  FirehoseS3Prefix:
    Type: String
    Default: firehose/
    Description: "The S3 Key prefix for Kinesis Firehose."
  FirehoseCompressionFormat:
    Type: String
    Default: GZIP
    AllowedValues: [UNCOMPRESSED, GZIP, Snappy]
    Description: "Compression format used by Kinesis Firehose"
  FirehoseBufferingInterval:
    Type: Number
    Default: 60
    MinValue: 60
    MaxValue: 900
    Description: "How long Firehose will wait before writing a new batch into S3"
  FirehoseBufferingSize:
    Type: Number
    Default: 10
    MinValue: 1
    MaxValue: 128
    Description: "Maximum batch size in MB"
  LambdaTimeout:
    Type: Number
    Default: 30
    MinValue: 5
    MaxValue: 300
    Description: "Maximum Lambda execution time in seconds"
  LambdaMemorySize:
    Type: String
    Default: 1024
    AllowedValues: [128,192,256,320,384,448,512,576,640,704,768,832,896,960,1024,1088,1152,1216,1280,1344,1408,1472,1536]
  AthenaDatabaseName:
    Type: String
    Default: serverless_data_pipeline
    Description: "The Athena database name"
  AthenaTableName:
    Type: String
    Default: records
    Description: "The Athena table name"
  ProjectId:
    Type: String
    Description: CodeStar projectId used to associate new resources to team members
    Default: atum

Resources:
  DeliveryBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub '${ProjectId}-firehose'

  # StreamProcessFunction:
  #   Type: AWS::Serverless::Function
  #   Properties:
  #     Handler: handler.lambda_handler
  #     Runtime: python2.7
  #     Timeout: !Ref LambdaTimeout
  #     MemorySize: !Ref LambdaMemorySize

  StreamProcessFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub '${ProjectId}-cw-subscription-filter'
      CodeUri: function
      Handler: lambda_function.lambda_handler
      Runtime: python3.8
      Policies:
        - CloudWatchPutMetricPolicy: {}
  PostWeightSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      DestinationArn: !GetAtt StreamProcessFunction.Arn
        # !Join
        #   - ''
        #   - - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:
        #   - - !Ref StreamProcessFunction
      FilterPattern: ""
      LogGroupName: !Sub /aws/lambda/${ProjectId}-post-weight
        # !Join
        #   - ''
        #   - - /aws/lambda/
        #     - !Ref StreamProcessFunction
  LambdaPermission:
    Type: AWS::Lambda::Permission
    Properties: 
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt StreamProcessFunction.Arn
      Principal: logs.ap-northeast-1.amazonaws.com
      SourceArn: 
        !Join
          - ''
          - - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:* #/aws/lambda/${ProjectId}-post-weight
            # - :live

  DeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    DependsOn:
      - DeliveryStreamPolicy
    Properties:
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        Prefix: !Ref FirehoseS3Prefix
        BucketARN: !GetAtt DeliveryBucket.Arn
        BufferingHints:
          IntervalInSeconds: !Ref FirehoseBufferingInterval
          SizeInMBs: !Ref FirehoseBufferingSize
        CompressionFormat: !Ref FirehoseCompressionFormat
        #EncryptionConfiguration: 
        #  KMSEncryptionConfig:
        #    AWSKMSKeyARN: ARN
        RoleARN: !GetAtt DeliveryStreamRole.Arn
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: Lambda
              Parameters:
                - ParameterName: LambdaArn
                  ParameterValue: !GetAtt StreamProcessFunction.Arn

  DeliveryStreamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: 'sts:AssumeRole'
            Condition:
              StringEquals:
                'sts:ExternalId': !Ref 'AWS::AccountId'

  DeliveryStreamPolicy:
    Type: AWS::IAM::Policy
    Properties:
      Roles:
        - !Ref DeliveryStreamRole
      PolicyName: !Sub ${ProjectId}-firehose-delivery-policy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 's3:AbortMultipartUpload'
              - 's3:GetBucketLocation'
              - 's3:GetObject'
              - 's3:ListBucket'
              - 's3:ListBucketMultipartUploads'
              - 's3:PutObject'
            Resource:
              - !GetAtt DeliveryBucket.Arn
              - !Join 
                - ''
                - - 'arn:aws:s3:::'
                  - !Ref DeliveryBucket
                  - '*'
          - Effect: Allow
            Action:
              - 'lambda:InvokeFunction'
              - 'lambda:GetFunctionConfiguration'
            Resource:
              - !GetAtt StreamProcessFunction.Arn

import boto3
import gzip
import json
import logging
import os
import base64
# from StringIO import StringIO
from io import StringIO

firehose = boto3.client('firehose')


def lambda_handler(event, context):
    print(event)

    # encodedLogsData = event['awslogs']['data']
    # print(encodedLogsData)
    # print(type(encodedLogsData))
    # print(encodedLogsData.decode('base64'))
    # decodedLogsData = gzip.GzipFile(fileobj=StringIO(encodedLogsData.decode('base64', 'strict'))).read()
    # allEvents = json.loads(decodedLogsData)
    decoded_data = base64.b64decode(event['awslogs']['data'])
    json_data = json.loads(gzip.decompress(decoded_data))
    print(json_data)

    records = []

    for event in allEvents['logEvents']:

        logEvent = {
            'Data': str(event['message']) + "\n"
        }

        records.insert(len(records), logEvent)

        if len(records) > 499:
            firehose.put_record_batch(
                DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'],
                Records=records
            )

            records = []

    if len(records) > 0:
        firehose.put_record_batch(
            DeliveryStreamName=os.environ['DELIVERY_STREAM_NAME'],
            Records=records
        )
PostWeightSubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    Properties:
      DestinationArn: !GetAtt DeliveryStream.Arn # StreamProcessFunction.Arn
        # !Join
        #   - ''
        #   - - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:
        #   - - !Ref StreamProcessFunction
      FilterPattern: ""
      LogGroupName: !Sub /aws/lambda/${ProjectId}-post-weight
        # !Join
        #   - ''
        #   - - /aws/lambda/
        #     - !Ref StreamProcessFunction
      RoleArn: !GetAtt CWLRole.Arn
  CWLRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Sid: 'aaaaaaaaaaaaaa'
            Effect: Allow
            Principal:
              Service: logs.ap-northeast-1.amazonaws.com
            Action: 'sts:AssumeRole'
            # Condition:
            #   StringEquals:
            #     'sts:ExternalId': !Sub ${AWS::AccountId}

  CWLPolicy:
    Type: AWS::IAM::Policy
    Properties:
      Roles:
        - !Ref CWLRole
      PolicyName: !Sub ${ProjectId}-cwl-subscription-policy
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - 'firehose:*'
              - 'kinesis:*'
            Resource: !GetAtt DeliveryStream.Arn
0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?