背景
Firehoseでは、1行1レコードとして出力したり、Lambdaで1レコードごとに処理するためには、追加での設定が必要です。
画面からだと簡単に設定できますが、CloudFormationおよびCDKでは、設定方法が直感的とは言い難いです。
この記事では、CDKのみを使用して、こちらの画像の状態にする方法を紹介します。
設定項目解説
New line delimiter/改行の区切り文字
設定することで、各レコードが1行ごとに出力されます。
{"id":1}
{"id":2}
JSONL形式になるため、後続の処理でS3から取り出した時に処理しやすくなります。
設定しない場合に困ること
これを設定しないと、いわゆるJSONL形式ではなく、全てのデータが1行として出力されます。
{"id":1}{"id":2}
Multi record deaggregation/マルチレコードのディスアグリゲーション
こちらは入力データに対する処理です。
Firehoseでは、S3など宛先に出力する前にLambdaで追加処理を行えます。
しかし、特に設定しない場合、Lambdaには以下のようなデータが渡される場合があります。
{"id":1}{"id":2}
設定しない場合に困ること
このデータ形式は、二つの点で扱いづらいです。
- Lambdaでパース処理を書く必要がある
入力されたデータは、JSONでもJSONLでもないため、}{
を見つけて配列に分割するなどの追加処理が必要になります。
単純に手間ですし、入力値が信頼できない場合は、データ分割攻撃を警戒しなければなりません。 - レコードごとに動的パーティショニングを行えない
FirehoseでLambdaを使う動機の一つに、入力データを参照して動的パーティショニングを行いたいというものがあります。
複数レコードがまとめられてしまうと、意図しないパーティションにレコードが割り振られる可能性があります。
実際にそのようなデータが送られてくるサービスとして、CloudWatch Metricsがあります。
deaggregationを設定せずにLambda関数でデータを取り出すと、次のような形式になっていて、JSONとして直接扱えません。
{"metric_stream_name":"QuickFull-C2cPk2","account_id":"111111111111","region":"ap-northeast-1","namespace":"CloudWatchSynthetics","metric_name":"SuccessPercent","dimensions":{"CanaryName":"test"},"timestamp":1705539300000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":1.0},"unit":"Percent"}{"metric_stream_name":"QuickFull-C2cPk2","account_id":"111111111111","region":"ap-northeast-1","namespace":"CloudWatchSynthetics","metric_name":"Duration","dimensions":{"CanaryName":"test"},"timestamp":1705539300000,"value":{"max":32102.0,"min":32102.0,"sum":32102.0,"count":1.0},"unit":"Milliseconds"}
余談ですが、Metric stream機能を画面から操作すると、宛先にS3を指定することができます。
実際には直接S3に出力することはできず、内部ではFirehoseを使用しています。
結論
Processor
を定義することで設定可能です。
ドキュメント
- RecordDeAggregation
マルチレコードのディスアグリゲーション
項目を設定します。 - Lambda
Lambda関数のARNを指定するために使用します。 - AppendDelimiterToRecord
改行の区切り文字
項目を設定します。
ソースコード全体
import { Construct } from 'constructs'
import { aws_iam as iam, aws_kinesisfirehose, aws_lambda, aws_s3 } from 'aws-cdk-lib'
export class Firehose extends Construct {
constructor(scope: Construct, id: string, bucket: aws_s3.Bucket, lambda: aws_lambda.Function) {
super(scope, id)
const prefix = "AWSLogs/"
const errorOutputPrefix = 'errors/'
const role = new iam.Role(this, 'Role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
})
bucket.grantPut(role, `${prefix}*`)
bucket.grantPut(role, `${errorOutputPrefix}*`)
lambda.grantInvoke(role)
new aws_kinesisfirehose.CfnDeliveryStream(this, 'Default', {
extendedS3DestinationConfiguration: {
bucketArn: bucket.bucketArn,
roleArn: role.roleArn,
prefix: prefix + '!{partitionKeyFromLambda:key}/',
errorOutputPrefix,
dynamicPartitioningConfiguration: {
enabled: true,
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'RecordDeAggregation',
parameters: [
{
parameterName: 'SubRecordType',
parameterValue: 'JSON',
},
],
},
{
type: 'Lambda',
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: lambda.functionArn,
},
],
},
{
type: 'AppendDelimiterToRecord',
parameters: [
{
parameterName: 'Delimiter',
parameterValue: '\\n',
},
],
},
],
},
},
})
}
}
解説
New line delimiter/改行の区切り文字
Processor内で指定します。
parameterValue
に指定する改行コードは、エスケープが二つなことに注意してください。
一つだとエラーになります。
{
type: 'AppendDelimiterToRecord',
parameters: [
{
parameterName: 'Delimiter',
parameterValue: '\\n',
},
],
}
Multi record deaggregation/マルチレコードのディスアグリゲーション
同じくProcessor
内でしています。
ドキュメントが発見できなかったのですが、parameterValue
にはJSON
もしくはDELIMITED
が指定可能です。
{
type: 'RecordDeAggregation',
parameters: [
{
parameterName: 'SubRecordType',
parameterValue: 'JSON',
},
],
}
Lambdaサンプルコード
上記設定を踏まえた、Lambdaのサンプルコードはこちらです。
CloudWatch Metricsを処理するものとなっています。
import base64
import json
def handler(event, context):
result = []
for record in event['records']:
encoded_data = record['data']
payload = base64.b64decode(encoded_data)
json_value = json.loads(payload)
namespace = json_value["namespace"]
record_id = record["recordId"]
result.append({
"recordId": record_id,
"result": "Ok",
"metadata": {
"partitionKeys": {
"key": namespace
}
},
"data": encoded_data
})
return {
"records": result
}
まとめ
Lambdaでのデータ変換を伴うFirehoseを、CDKで構築する際に設定すると便利な項目を紹介しました。
画面上で構築するよりもわかりづらい部分がありますので、この記事が参考になれば幸いです。