AWSTemplateFormatVersion : '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Serverless Data Pipeline
Parameters:
ApiStageName:
Type: String
Default: dev
Description: "The API Gateway Stage name (e.g. dev, prod, etc.)"
FirehoseS3Prefix:
Type: String
Default: firehose/
Description: "The S3 Key prefix for Kinesis Firehose."
FirehoseCompressionFormat:
Type: String
Default: GZIP
AllowedValues: [UNCOMPRESSED, GZIP, Snappy]
Description: "Compression format used by Kinesis Firehose"
FirehoseBufferingInterval:
Type: Number
Default: 60
MinValue: 60
MaxValue: 900
Description: "How long Firehose will wait before writing a new batch into S3"
FirehoseBufferingSize:
Type: Number
Default: 10
MinValue: 1
MaxValue: 128
Description: "Maximum batch size in MB"
LambdaTimeout:
Type: Number
Default: 30
MinValue: 5
MaxValue: 300
Description: "Maximum Lambda execution time in seconds"
LambdaMemorySize:
Type: String
Default: 1024
AllowedValues: [128,192,256,320,384,448,512,576,640,704,768,832,896,960,1024,1088,1152,1216,1280,1344,1408,1472,1536]
AthenaDatabaseName:
Type: String
Default: serverless_data_pipeline
Description: "The Athena database name"
AthenaTableName:
Type: String
Default: records
Description: "The Athena table name"
ProjectId:
Type: String
Description: CodeStar projectId used to associate new resources to team members
Default: atum
Resources:
DeliveryBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub '${ProjectId}-firehose'
# StreamProcessFunction:
# Type: AWS::Serverless::Function
# Properties:
# Handler: handler.lambda_handler
# Runtime: python2.7
# Timeout: !Ref LambdaTimeout
# MemorySize: !Ref LambdaMemorySize
StreamProcessFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub '${ProjectId}-cw-subscription-filter'
CodeUri: function
Handler: lambda_function.lambda_handler
Runtime: python3.8
Policies:
- CloudWatchPutMetricPolicy: {}
PostWeightSubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
Properties:
DestinationArn: !GetAtt StreamProcessFunction.Arn
# !Join
# - ''
# - - !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:
# - - !Ref StreamProcessFunction
FilterPattern: ""
LogGroupName: !Sub /aws/lambda/${ProjectId}-post-weight
# !Join
# - ''
# - - /aws/lambda/
# - !Ref StreamProcessFunction
LambdaPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt StreamProcessFunction.Arn
Principal: logs.ap-northeast-1.amazonaws.com
SourceArn:
!Join
- ''
- - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:*:* #/aws/lambda/${ProjectId}-post-weight
# - :live
DeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
DependsOn:
- DeliveryStreamPolicy
Properties:
DeliveryStreamType: DirectPut
ExtendedS3DestinationConfiguration:
Prefix: !Ref FirehoseS3Prefix
BucketARN: !GetAtt DeliveryBucket.Arn
BufferingHints:
IntervalInSeconds: !Ref FirehoseBufferingInterval
SizeInMBs: !Ref FirehoseBufferingSize
CompressionFormat: !Ref FirehoseCompressionFormat
#EncryptionConfiguration:
# KMSEncryptionConfig:
# AWSKMSKeyARN: ARN
RoleARN: !GetAtt DeliveryStreamRole.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !GetAtt StreamProcessFunction.Arn
DeliveryStreamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Condition:
StringEquals:
'sts:ExternalId': !Ref 'AWS::AccountId'
DeliveryStreamPolicy:
Type: AWS::IAM::Policy
Properties:
Roles:
- !Ref DeliveryStreamRole
PolicyName: !Sub ${ProjectId}-firehose-delivery-policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:AbortMultipartUpload'
- 's3:GetBucketLocation'
- 's3:GetObject'
- 's3:ListBucket'
- 's3:ListBucketMultipartUploads'
- 's3:PutObject'
Resource:
- !GetAtt DeliveryBucket.Arn
- !Join
- ''
- - 'arn:aws:s3:::'
- !Ref DeliveryBucket
- '*'
- Effect: Allow
Action:
- 'lambda:InvokeFunction'
- 'lambda:GetFunctionConfiguration'
Resource:
- !GetAtt StreamProcessFunction.Arn