Posted at

AWS認定Big Data勉強記 - 8.2: Kinesis Data Firehose

こんにちは、えいりんぐーです

今回は Amazon Kinesis Data Firehose についてまとめます。


参考資料


全般


  • ストリーミングデータをデータストアや分析ツールにロードするサービス


    • ストリーミングデータをキャプチャして変換し、Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Splunk にロードする



  • このプロセスを管理するのに、ハードウェアとソフトウェアのプロビジョニング、デプロイ、継続的なメンテナンスについて管理したり、その他のアプリケーションを記述する必要がない

  • 同じ AWS リージョン内にある 3 つの拠点でデータが同期的に複製されるため、データ転送時の高い可用性と耐久性を提供する

  • 各配信ストリームで、毎秒 2,000 件のトランザクション、毎秒 5,000 件および毎秒 5 MB のレコードの取り込みが可能


概念


  • ソース


    • ストリーミングデータが継続的に生成され、キャプチャされるところ

    • Amazon EC2 インスタンスで実行中のロギングサーバー、モバイルデバイスで実行中のアプリケーション、IoT デバイスのセンサー、Kinesis のストリームなど



  • 配信ストリーム


    • Kinesis Data Firehose の基盤となるエンティティで、そこにデータを送信することにより、Firehose を利用できる



  • レコード


    • データ生成元からストリームに送信される処理対象のデータで、最大サイズは Base64 エンコーディング前で、1,000 KB



  • デスティネーション


    • データが配信されるデータストアで、現在サポートされているデスティネーションは、Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Splunk




使い方


配信ストリーム


  • Amazon S3 への配信前にデータを圧縮できる。GZIP、ZIP、SNAPPY の圧縮形式がサポートされている。さらにデータを Amazon Redshift にロードする場合は、GZIP のみがサポートされている。

  • Amazon S3 バケットに送信されるデータを暗号化できる。配信ストリームを作成する際に、お客様が所有する AWS Key Management Service (KMS) キーによるデータの暗号化を選択できる。

  • CloudWatch Logs のサブスクリプション機能を使用すると、CloudWatch Logs から Kinesis Data Firehose にデータをストリーミングできる。CloudWatch Logs のすべてのログイベントは、既に gzip 形式で圧縮されている。

  • AWS Lambda 関数を呼び出すことで、受信データを変換してから送信先に送信することができる


    • Lambda 関数によるデータ変換を使用する場合、Firehose では Lambda 呼び出しとデータ配信のすべてのエラーを Amazon CloudWatch Logs のログに記録できる。Lambda 呼び出しまたはデータ配信が失敗した場合、ユーザーは特定のエラーログをそこで確認できる



  • ソースレコードバックアップ


    • Lambda 関数によるデータ変換を使用する場合、ソースレコードバックアップを有効にすると、Amazon Kinesis Data Firehose で未変換の受信データは別の S3 バケットに送信される



  • バッファサイズとバッファ間隔


    • 受信ストリーミングデータを送信先に配信する前に、一定のサイズにバッファするか、一定の時間にバッファできる。バッファサイズとバッファ間隔は、配信ストリームの作成時に設定する

    • バッファサイズは MB 単位で、送信先が S3 の場合は 1 MB~128 MB で、送信先が Elasticsearch Service である場合は 1 MB~100 MB

    • バッファサイズはデータ圧縮前に適用される。

    • バッファ間隔は、60 秒から 900 秒の間で 1 秒ごとに設定できる。送信先へのデータ配信が配信ストリームへのデータ書き込みより遅くなった場合、Firehose では、すべてのデータが送信先に配信されるように、バッファサイズを動的に拡大して遅れを取り戻す



  • Redshift


    • 送信先の Redshift クラスターが VPC 内にある場合は、VPC から Firehose の IP アドレスのブロックを解除して、Kinesis Data Firehose から Redshift クラスターにアクセスできるようにする必要がある

    • Redshift を送信先として選択した場合、Kinesis Data Firehose ではまず S3 バケットにデータが送信される。その後 Redshift COPY コマンドが実行され、S3 バケットから Redshift クラスターにデータがロードされる



  • Elasticsearch Service


    • Kinesis Data Firehose では、Amazon Elasticsearch Service インデックスを一定の期間でローテーションさせることができる。


      • なんのこっちゃ



    • Amazon Elasticsearch Service にデータをロードする際に、すべてまたは配信に失敗したデータのみをバックアップでき、この機能を利用してデータの損失を防ぐために、バックアップ用の Amazon S3 バケットを用意する必要がある




データ準備と時間


  • Lambda 関数で変換されたすべてのレコードは、以下の 3 つのパラメータと共に Firehose に戻す必要がある


    • recordId: Lambda 関数の呼び出し中、recordId は各レコードと共に Firehose から Lambda 関数に渡される

    • result: 各レコードの変換結果のステータス


      • "Ok": レコードが期待どおり正しく変換された場合

      • "Dropped": ユーザーの処理ロジックによりレコードが期待どおりドロップされた場合

      • "ProcessingFailed": レコードを期待どおりに変換することができなかった場合



    • data: based64 エンコード後の変換済みデータペイロード



  • ユーザーがデータ変換用の Lambda 関数を作成するために使用できる設計図


    • 一般的な Firehose 処理: 上記のデータ変換およびステータスモデルが含まれる。この設計図はあらゆるカスタム変換ロジックに使用

    • Apache ログから JSON: Apache ログ行を解析して JSON オブジェクトに変換し、事前定義された JSON フィールド名を使用

    • Apache ログから CSV: Apache ログ行を解析して CSV 形式に変換

    • Syslog から JSON: Syslog 行を解析して JSON オブジェクトに変換し、事前定義された JSON フィールド名を使用

    • Syslog から CSV: Syslog 行を解析して CSV 形式に変換




データの追加


  • 配信ストリームにデータを追加するには、Amazon Kinesis Agent や Firehose の PutRecord オペレーションや PutRecordBatch オペレーションを使用

  • Kinesis Agent


    • データを収集して配信ストリームに送信する機能を実現する Java アプリケーション。Linux 上で動く



  • AWS コンソールまたは Firehose API によって配信の作成や更新を行っている場合、Kinesis ストリームを配信ストリームのソースとして設定でき、設定が完了すると、Kinesis ストリームから Firehose によって自動的にデータが読み取られ、指定された送信先にロードされる


    • Kinesis Data Firehose では、各 Kinesis シャードにつき毎秒 1 回 Kinesis Data Streams GetRecords() が呼び出される

    • 配信ストリームのソースに Kinesis データストリームが設定されている場合、Kinesis Data Firehose では LATEST 位置からデータの読み取りを開始



  • Kinesis Data Streams が Firehose 配信ストリームのソースに設定されると、Firehose の PutRecord オペレーションと PutRecordBatch オペレーションが無効になる

  • CloudWatch Logs から Firehose 配信ストリームにデータを追加するには、配信ストリームにイベントを送信する CloudWatch Logs サブスクリプションフィルターを作成する


データ配信


  • UTC 時間のプレフィックスを YYYY/MM/DD/HH 形式で追加してから、Amazon S3 にオブジェクトが送信される

  • S3 オブジェクト名の命名規則は、DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString で、DeliveryStreamVersion は 1 から始まり、配信ストリームの設定が変更されるたびに 1 ずつ増加

  • 送信先が Redshift の場合、S3 オブジェクトを Redshift クラスターにまとめてロードするために、Kinesis Data Firehose にマニフェストファイルを作成

  • 単一の配信ストリームがデータを配信できるのは単一の Amazon S3 バケットに対してのみ

  • 単一の配信ストリームがデータを配信できるのは単一の Amazon Redshift クラスターまたはテーブルのみ

  • 単一の配信ストリームがデータを配信できるのは単一の Amazon Elasticsearch Service ドメインおよびインデックスのみ


トラブルシュート


  • S3 バケットへのデータ配信が失敗した場合、Amazon Kinesis Data Firehose では、最大 24 時間 5 秒ごとにデータの配信が再試行され、24 時間の保持期間が経過しても問題が継続する場合、データは破棄される

  • Redshift クラスターへのデータ配信が失敗した場合、Amazon Kinesis Data Firehose では、最大 60 分間 5 分ごとにデータの配信が再試行される。スキップされたオブジェクトに関する情報はエラーフォルダ内のマニフェストファイルとして S3 バケットに送信されるため、手動によるバックフィルに使用できる

  • 送信先が Amazon Elasticsearch Service である場合、デリバリーストリームの作成時に 0~7200 秒の間再試行期間を指定することができる。Amazon ES ドメインへのデータ配信が失敗した場合、Amazon Kinesis Data Firehose では、指定された期間にデータ配信が再試行される

  • Lambda


    • ネットワークのタイムアウトに到達して Lambda の呼び出し制限に達したなどの理由で関数の呼び出しが失敗する場合


      • この失敗シナリオでは、Firehose はデフォルトで呼び出しを 3 回再試行し、そのあと問題のレコードのバッチをスキップします。スキップされたレコードは処理失敗レコードとして扱われる

      • 呼び出しの再試行回数は、CreateDeliveryStream API および UpdateDeliveryStream API を使用して 0 から 300 の間で設定できる

      • このタイプの失敗では、Firehose のエラーログ機能を使用して、呼び出しエラーを CloudWatch Logs に書き出すことができる



    • レコードが Lambda 関数から返されるときにレコードの変換結果が "ProcessingFailed" に設定される場合


      • これらのレコードは処理失敗レコードとして扱われる

      • Lambda 関数のログ機能を使用して、エラーログを CloudWatch Logs に書き出すことができる



    • どちらのタイプの失敗シナリオでも、処理失敗レコードは S3 バケットの processing_failed フォルダに送信される