この記事はアイスタイル Advent Calendar 2022の17日目の記事です。
はじめまして、アイスタイルのデータ分析システム部でデータエンジニアをしている新卒三年目のshirakihです。
普段は、SaaS 型マーケティング支援サービス【ブランドオフィシャル】のデータ基盤の開発、保守運用に携わっています。
今回は、@grassyさんが9日目の記事で書いていた、データ基盤のクラウド移行に関連したストリーミングデータ収集システムをAWSに移行したお話をしようと思います。 構築するにあたってデータの欠損が無い構成にするために実際に行ったことをいくつかご紹介します。
ブランドオフィシャルの詳細やデータ基盤のクラウド移行について気になる方は、@grassyさんの下記記事に詳しく記載されてるので御覧ください〜
移行前の構成について
さて、どのようなシステムを移行したのかざっくりと説明します。
簡単な構成図、処理の流れは下記になります。
■処理の流れ
- RabbitMQからのメッセージをGoで構築されたConsumerが受け取る
- メッセージをKafkaへpush
- Kafka streamsの処理内でメッセージに足りない情報をAPIで補完する
- HDFS Sink ConnectorでHDFSへシンク
メッセージを受け取る→足りない情報をAPIで補完する→保存するといった流れになります。
先述のとおり、この度ブランドオフィシャルのデータ基盤をオンプレからクラウド(GCP)へ移行しました。そのため、データ収集先がHDFSからBigQueryに変更する必要があり、それに伴ってこのシステムも移行する事になりました。
AWS移行後の構成について
システム移行先にGCPではなくAWSを選択した理由ですが、社内の多くのシステムがAWSへ移行する予定(データ基盤のみGCP)で、今回関連するRabbitMQ、社内APIともにAWSに移行する想定でした。そのため、GCP上に構築するとクラウド間(AWS⇔GCP)での往復転送が必要になるので、クラウド間での転送を最小限するためにAWSを選択しました。
言語については、ブランドオフィシャルのデータ基盤のクラウド移行で選択されたのがPythonだったため、本システムでもPythonを選択しました。
■処理の流れ
- RabbitMQからのメッセージをECS・Fargateで構築されたConsumerが受け取る
- メッセージをKinesis Data Streamsが受け取りLambdaへ転送
- Lambdaの処理内でメッセージに足りない情報をAPIで補完する
- Kinesis Data Firehoseが受け取りS3へ転送する
- BigQuery Data Transfer ServiceでBigQueryへ転送する
ConsumerをECS・Fargateで構築、移行前システムでKafkaが行っていた役割をKinesis+Lambdaで構築し、最終的にS3へ転送する構成にしました。 BigQueryへの転送は日次でData Transfer Serviceを用いて転送してます。(※リトライ機構については後述します。)
本システムは最終的にブランドにデータを提供するサービスとなるので、データの品質担保は重要な課題でした。データの欠損はNGであり、重複については転送先のBigQueryで排除を行うこととし、At least once(重複は許容・欠損は許さない)を満たせるように設計・構築を進めました。
データの欠損がない構成にするために
データの欠損が起きうる箇所として下記でデータの欠損や重複が起きる可能性があります。
そこでそれぞれに対してどのような対策を行ったのか、実際に行った4点をご紹介します。
RabbitMQとFargateの通信でack応答の利用
1つめがRabbitMQとの通信の対策で、RabbitMQとFargateの通信でack応答の利用を行いました。
RabbitMQとFargate間では様々な影響により、メッセージの紛失や重複が発生する可能性があります。
そこで、RabbitMQはクライアントからack応答があるまでメッセージを削除しない仕組みを用いて、Consumerの実装を行いました。
今回の実装では、メッセージをKinesis Data Streamsへ転送ができたらack応答をRabbitMQへ返す仕組みになっています。
以下サンプルです。Consumerの実装にpika、AWSとの疎通にboto3を使用しています。
# boto3を用いてData Streamsへ転送を行い、HTTPStatusCodeを返す
status = datastreams_handler(json.loads(body.decode()), self._queue_name)
if status == 200:
# Kinesis Data Streamsへの転送が成功すれば、Rabbit MQにack応答を返す
basic_ack(basic_deliver.delivery_tag)
else:
logger.error('Exception Data Streams: %s Error', status)
# コンシューミングの停止、再接続
# 省略
Kinesis Data Streamsへの転送が成功すれば、basic_ackによりRabbit MQへackを返すことができます。また、転送が成功しなければ、コンシューミングを停止し再試行を行う仕組みにしています。
このような実装により、Rabbit MQとFargate間でのデータ欠損をなくすことができました。
リトライ機構の構築
2つ目に、APIとの通信の対策でリトライ機構の構築を行いました。
構成図の赤枠で囲っている箇所がリトライ機構になります。
LambdaからAPI通信を行った際、レスポンスエラーやレスポンスが空で返ってくる場合などに、このリトライ機構にメッセージが流れるようになっています。
■処理の流れ
- API通信のレスポンスがエラー
- SQSを介して一時保存用S3に対象メッセージを転送
- リトライ用Lambdaを1時間単位で起動させ、再度API通信を行う
- 成功時:Kinesis Data Firehoseへ転送
- 失敗時:一時保存用S3に対象メッセージを転送(最大3回)
これにより、エラーが起きた時点ではAPIとの疎通ができない場合でも、最大3回までリトライを行うようにすることで、API通信において欠損が限りなく少なくなります。
Kinesis Data Streams、Kinesis Data Firehose、SQSの導入
3つ目にAWSサービス同士の通信の対策として、Kinesis Data Streams、Kinesis Data Firehose、SQSの導入をおこないました。
FargateからLambda、LambdaからS3など直接データを転送せず、At least onceを担保しているKinesis Data Streams、Kinesis Data Firehose、SQSを介す事によって、データの欠損がなく、再試行がしやすい構成にすることができます。
Lambdaでエラーが起きた際のハンドリング
4つ目にLambdaでのエラーハンドリングについてです。
Lambdaは15分で処理がタイムアウトするため、タイムアウトが起きてしまった際のハンドリングを考慮する必要があり、考慮しなければ処理途中のデータなどが欠損してしまう可能性があります。
そこで、今回はタイムアウトの1分前つまり、処理が始まってから14分が経過した時点で、リトライ機構へメッセージを転送する仕組みにするようにしました。これにより、前もって処理の切断を回避でき、データ欠損を回避することができます。
また、その他の例外エラー時のハンドリングとして下記をLambdaのレスポンスに含めるようにしています。
return {
'batchItemFailures': [
{
'itemIdentifier': record['kinesis']['sequenceNumber']
}
]
}
Lambdaはバッチの成功・失敗をレスポンスで判断しています。batchItemFailuresにレコードのシーケンス番号を含めることで、バッチが失敗したと判断し、シーケンス番号をもとにLambdaの再試行を行ってくれます。
これにより、Lambdaで例外によりバッチが失敗した際にも欠損がない構成にできます。
まとめ
今回はストリーミング収集システムをAWS上に構築する上で、データ欠損をどのように防ぐのかを中心にご紹介しました。
同じようなシステムを構築する際に少しでも参考になれば幸いです!
それでは、メリークリスマス&良いお年を〜。