1
1
記事投稿キャンペーン 「2024年!初アウトプットをしよう」

Firehoseのレコード出力の設定をCDKで構築する(改行の区切り文字/動的パーティショニング)

Last updated at Posted at 2024-01-26

背景

Firehoseでは、1行1レコードとして出力したり、Lambdaで1レコードごとに処理するためには、追加での設定が必要です。
画面からだと簡単に設定できますが、CloudFormationおよびCDKでは、設定方法が直感的とは言い難いです。

この記事では、CDKのみを使用して、こちらの画像の状態にする方法を紹介します。

スクリーンショット 2024-01-18 8.52.52.png

設定項目解説

New line delimiter/改行の区切り文字

設定することで、各レコードが1行ごとに出力されます。

{"id":1}
{"id":2}

JSONL形式になるため、後続の処理でS3から取り出した時に処理しやすくなります。

設定しない場合に困ること

これを設定しないと、いわゆるJSONL形式ではなく、全てのデータが1行として出力されます。

{"id":1}{"id":2}

Multi record deaggregation/マルチレコードのディスアグリゲーション

こちらは入力データに対する処理です。
Firehoseでは、S3など宛先に出力する前にLambdaで追加処理を行えます。
しかし、特に設定しない場合、Lambdaには以下のようなデータが渡される場合があります。

{"id":1}{"id":2}

設定しない場合に困ること

このデータ形式は、二つの点で扱いづらいです。

  • Lambdaでパース処理を書く必要がある
    入力されたデータは、JSONでもJSONLでもないため、}{を見つけて配列に分割するなどの追加処理が必要になります。
    単純に手間ですし、入力値が信頼できない場合は、データ分割攻撃を警戒しなければなりません。
  • レコードごとに動的パーティショニングを行えない
    FirehoseでLambdaを使う動機の一つに、入力データを参照して動的パーティショニングを行いたいというものがあります。
    複数レコードがまとめられてしまうと、意図しないパーティションにレコードが割り振られる可能性があります。

実際にそのようなデータが送られてくるサービスとして、CloudWatch Metricsがあります。
deaggregationを設定せずにLambda関数でデータを取り出すと、次のような形式になっていて、JSONとして直接扱えません。

{"metric_stream_name":"QuickFull-C2cPk2","account_id":"111111111111","region":"ap-northeast-1","namespace":"CloudWatchSynthetics","metric_name":"SuccessPercent","dimensions":{"CanaryName":"test"},"timestamp":1705539300000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":1.0},"unit":"Percent"}{"metric_stream_name":"QuickFull-C2cPk2","account_id":"111111111111","region":"ap-northeast-1","namespace":"CloudWatchSynthetics","metric_name":"Duration","dimensions":{"CanaryName":"test"},"timestamp":1705539300000,"value":{"max":32102.0,"min":32102.0,"sum":32102.0,"count":1.0},"unit":"Milliseconds"}

余談ですが、Metric stream機能を画面から操作すると、宛先にS3を指定することができます。
実際には直接S3に出力することはできず、内部ではFirehoseを使用しています。

結論

Processorを定義することで設定可能です。

ドキュメント

  • RecordDeAggregation
    マルチレコードのディスアグリゲーション項目を設定します。
  • Lambda
    Lambda関数のARNを指定するために使用します。
  • AppendDelimiterToRecord
    改行の区切り文字項目を設定します。

ソースコード全体

import { Construct } from 'constructs'
import { aws_iam as iam, aws_kinesisfirehose, aws_lambda, aws_s3 } from 'aws-cdk-lib'

export class Firehose extends Construct {
    constructor(scope: Construct, id: string, bucket: aws_s3.Bucket, lambda: aws_lambda.Function) {
        super(scope, id)

        const prefix = "AWSLogs/"
        const errorOutputPrefix = 'errors/'

        const role = new iam.Role(this, 'Role', {
            assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
        })
        bucket.grantPut(role, `${prefix}*`)
        bucket.grantPut(role, `${errorOutputPrefix}*`)
        lambda.grantInvoke(role)

        new aws_kinesisfirehose.CfnDeliveryStream(this, 'Default', {
            extendedS3DestinationConfiguration: {
                bucketArn: bucket.bucketArn,
                roleArn: role.roleArn,
                prefix: prefix + '!{partitionKeyFromLambda:key}/',
                errorOutputPrefix,
                dynamicPartitioningConfiguration: {
                    enabled: true,
                },
                processingConfiguration: {
                    enabled: true,
                    processors: [
                        {
                            type: 'RecordDeAggregation',
                            parameters: [
                                {
                                    parameterName: 'SubRecordType',
                                    parameterValue: 'JSON',
                                },
                            ],
                        },
                        {
                            type: 'Lambda',
                            parameters: [
                                {
                                    parameterName: 'LambdaArn',
                                    parameterValue: lambda.functionArn,
                                },
                            ],
                        },
                        {
                            type: 'AppendDelimiterToRecord',
                            parameters: [
                                {
                                    parameterName: 'Delimiter',
                                    parameterValue: '\\n',
                                },
                            ],
                        },
                    ],
                },
            },
        })
    }
}

解説

New line delimiter/改行の区切り文字

Processor内で指定します。
parameterValueに指定する改行コードは、エスケープが二つなことに注意してください。
一つだとエラーになります。

{
    type: 'AppendDelimiterToRecord',
    parameters: [
        {
            parameterName: 'Delimiter',
            parameterValue: '\\n',
        },
    ],
}

Multi record deaggregation/マルチレコードのディスアグリゲーション

同じくProcessor内でしています。
ドキュメントが発見できなかったのですが、parameterValueにはJSONもしくはDELIMITEDが指定可能です。

{
    type: 'RecordDeAggregation',
    parameters: [
        {
            parameterName: 'SubRecordType',
            parameterValue: 'JSON',
        },
    ],
}

Lambdaサンプルコード

上記設定を踏まえた、Lambdaのサンプルコードはこちらです。
CloudWatch Metricsを処理するものとなっています。

import base64
import json


def handler(event, context):
    result = []
    for record in event['records']:
        encoded_data = record['data']
        payload = base64.b64decode(encoded_data)

        json_value = json.loads(payload)
        namespace = json_value["namespace"]
        
        record_id = record["recordId"]

        result.append({
            "recordId": record_id,
            "result": "Ok",
            "metadata": {
                "partitionKeys": {
                    "key": namespace
                }
            },
            "data": encoded_data
        })

    return {
        "records": result
    }

まとめ

Lambdaでのデータ変換を伴うFirehoseを、CDKで構築する際に設定すると便利な項目を紹介しました。
画面上で構築するよりもわかりづらい部分がありますので、この記事が参考になれば幸いです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1