Simplifying Streaming Data Ingestion into Delta Lake - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
ほとんどのビジネス上の意思決定は時間にセンシティブなものであり、異なるタイプのソースからリアルタイムでデータを結びつける必要があります。適切なタイミングで適切なデータを取り込むことは、タイムリーな意思決定を実現する鍵となります。時間にセンシティブなデータソースは、IoTセンサー、ソーシャルメディア、クリックストリーム、データベースにおけるチェンジデータキャプチャなど多岐にわたっています。これらのデータからキーとなる洞察を導き出すには、まず始めにレイクハウスに取り込まれる必要があります。これらのデータのキーとなる特性は、ストリーミングとして知られる際限のない形で連続的に到着するというものです。この記事では、ストリーミングデータをどの様にレイクハウスに取り込むのかに関してフォーカスします。
ハイレベルのデータ取り込みフロー
様々なデータソースからのストリーミングデータは、レイクハウスに取り込まれる前にメッセージバスやオブジェクトストレージにステージングされます。ステージングエリアからのデータは、レイクハウスにデータを書き込むApache Sparkの構造化ストリーミング(SS)によって処理されます。ステージングの環境には大きく二つ、以下で議論するクラウドオブジェクトストレージとメッセージバスがあります。
- クラウドオブジェクトストレージは、クラウドにおけるセキュア、高信頼かつスケーラブルな永続化レイヤーです。Amazon S3、Azure ADLS/Blobストレージ、Google Cloud Storage(GCS)がクラウドで広く利用されているオブジェクトストレージです。通常、イベントはバッチにまとめられてクラウドオブジェクトストレージのファイルとして格納され、到着するたびにニアリアルタイムでこれらのファイルを取り込む必要があります。クラウドストレージからのニアリアルタイムのデータ取り込みを必要とするユースケースのサンプルとしては、通話データの記録、IoTイベントログなどがあります。
- メッセージバスシステムは、パブリッシャー/サブスクライバモデルで動作する疎結合のデータバッファーを提供します。Apache Kafka、Apache Pulsar、Azure EventHub、AWS Kinesis、GCP Pub/Subなどがオープンソース、クラウドにおけるメッセージバスシステムの例となります。メッセージバスシステムは低いパブリッシュのレーテンシーや複数のコンシューマーをサポートするための大規模なファンアウトを保証するので、リアルタイムイベントの捕捉には適しています。メッセージバスを使うアプリケーションのサンプルには、クリックストリーム、クレジットカード不正検知などが含まれます。これらのアプリケーションにおいては、後段の処理ですぐに洞察を提供できる様にリアルタイムでデータを取り込む必要があります。
これらの二つの主要なデータステージング環境からレイクハウスにストリーミングデータを取り込むハイレベルのアーキテクチャを図1に示します。
図1. Delta Lakeにストリーミングデータを取り込むハイレベルのビュー
図に示されている様に、様々なソースシステムからのデータは最初にオブジェクトストレージ、あるいはメッセージバスのステージングエリアに到着します。このデータは、メッセージバスのストリーミングコネクターあるいはオブジェクトストアのオートローダーによってレイクハウスに取り込まれます。Delta Live Tables(DLT)は、バッチデータやストリーミングデータ向けの高信頼データパイプラインを作成するためのシンプルな宣言型アプローチであり、大規模なインフラストラクチャを完全に管理します。また、Sparkの構造化ストリーミングをベースとしていますが、この記事ではカバーしません。以降のセクションでは、これらのソースからストリーミングデータを取り込む際の課題のいくつかに関して詳細を説明します。
オブジェクトストアからのデータ取り込み:Auto Loader
通常、ファイルはバッチのデータ取り込みに関係します。しかし、様々なソースからファイル形式でクラウドベースのオブジェクトストアに連続的にデータを取り込むことが多くの場合において共通的なパターンとなります。大抵は、期待されるレーテンシーが数分規模であることが許容されるニアリアルタイム処理を必要とするユースケースにおいてこのパターンが好まれます。さらに、確実に一度きりの処理(exactly once processing)、失敗した取り込みジョブの再処理、タイムトラベル、スキーマのドリフトのような非機能要件も必要となります。
クラウドオブジェクトストアからレイクハウスにロードする際の課題を説明するために、顧客体験を改善し、不正支払いを検知するために必要なリアルタイムでクレジットカードの支払いを処理するシステムを考えてみましょう。多くの場合、異なる支払いチャネルからのトランザクションは、オブジェクトストア上にファイルとしてまとめられます。更なる分析のために、これらのファイルをレイクハウスに取り込む必要があります。これらは支払いのトランザクションなので、確実に一度のみ処理を行い、失敗した際には重複なしに再処理することを保証する必要があります。AWSクラウドでこの様な処理を行う際には、以下の様に複雑なアーキテクチャが必要となります。
- Amazon SQS(Simple Queue Service)を用いて、スケーラブルな方法でAmazon S3に到着する支払いトランザクションファイルを追跡
- Amazon SQLからデータを取り出し、後段の処理をトリガーするAmazon Lambda Functions
- 制御テーブルを用いた支払いトランザクションファイルのステータスの監査
主要な課題は、オブジェクトストアに到着する大量のファイルの追跡、これらのファイルに対して一度のみの処理を保証すること、様々な支払いチャネルからの異なるスキーマを管理することです。
Auto Loaderはクラウドオブジェクトストレージに新規ファイルが到着するとインクリメンタルに処理を行い、ユーザーはカスタムアプリケーションを開発する必要がないので、ストリーミングデータの取り込みをシンプルなものにします。内部状態を保持することで、これまでに処理したファイルを追跡します。処理に失敗した場合には、最後に処理したファイルからスタートするためにこの状態情報を使用します。さらに、データを再実行、再処理する必要がある場合には、ディレクトリ内の既存ファイルを処理するオプションを提供します。Auto Loaderのメリットには以下の様なものがあります。
- 数十億のファイルを処理できる能力
- 計算リソースの最適利用を用いた非同期バックフィル
- パフォーマンスを改善するための最適化ディレクトリ一覧
- スキーマの推定とスキーマドリフトへの対応
- 自動ファイル通知サービスを活用することによるコスト効率の高いファイル通知
Auto Loaderはどの様に動作しますか?
Auto Loaderは新規ファイルを検知するための2つのモードがあります。ファイル通知とディレクトリ一覧です。
ファイル通知: Auto Loaderは入力ディレクトリからのイベントをサブスクライブする通知、キューサービスを自動でセットアップすることができます。ファイル通知モードは高性能であり、大量のファイルを格納する入力ディレクトリにスケールすることもできますが、追加のクラウドのアクセス権が必要となります。このオプションは、ファイルが語順に到着しない場合には好適なものとなり、キューや通知の明示的なセットアップを不要のものにします。このモードを有効化するには、オプションcloudFiles.useNotifications
をtrueに設定し、クラウドリソースを作成するのに必要なアクセス権を指定します。ファイル通知の詳細についてはこちらを参照ください。
ディレクトリ一覧: 新規ファイルを特定する別の方法は、Auto Loaderで設定された入力ディレクトリを一覧するというものです。ディレクトリ一覧モードを用いることで、データへのアクセス権以外の権限なしにAuto Loaderのストリームを起動することができます。Databricksランタイム9.1以降では、Auto Loaderはクラウドストレージにファイルが語順に到着しているかどうかを自動で検知し、新規ファイルを検知するのに必要なAPI呼び出しの総数を劇的に削減します。デフォルトモードでは、7回の連続したインクリメンタルなディレクトリ一覧の後に完全なディレクトリの一覧を行います。しかし、設定cloudFiles.backfillInterval
を用いることで、完全なディレクトリ一覧の頻度を調整することができます。設定cloudFiles.useIncrementalListing
を用いることで、インクリメンタルな一覧を明示的にオンオフすることができます。明示的にこの設定が有効化されると、Auto Loaderは完全な一覧をトリガーしません。ディレクトリ一覧の詳細に関しては、こちらをご覧ください。
新規ファイルが発見されると、それらのメタデータはAuto Loaderパイプラインのチェックポイントの格納場所にあるスケーラブルなキーバリューストア(RocksDB)に永続化されます。これは、これまでに処理したファイルの履歴を保持する状態情報として動作します。このパイプラインは、既存ファイルを格納するディレクトリに対するバックフィルとファイル通知を通じて発見される新規ファイルの同時処理の両方を実行します。
図3. Auto LoaderはRocksDBベースのファイル管理と構造化ストリーミングを用いることで、クラウドストレージからのデータ取り込みをシンプルにします。
メッセージバスからのデータ取り込み
ストリーミングデータは性質上、境界がありません。このデータはバッファーとして動作するメッセージバスにステージングされ、複数のプロデューサが書き込みを行い、大量のコンシューマが読み取りを行える非同期のコミュニケーション手段を提供します。メッセージバスは、不正検知、金融資産取引、ゲーミングのような低レーテンシーのユースケースで広く活用されています。著名なメッセージバスには、Apache Kafka、Apache Pulsar、Azure EventHubs、Amazon Kinesis、Google Cloud Pub/Subが含まれます。しかし、連続的なデータ取り込みは、スケーラビリティ、回復可能性、耐障害性の様な課題を突きつけます。
メッセージバスからレイクハウスへの取り込みにおいては、明示的なSpark構造化ストリーミング(SS)パイプラインがメッセージバスに対する適切なソースコネクターとレイクハウスのシンクに対するコネクターを用いてインスタンス化されます。このケースでの主要な課題はスループットと耐障害性です。
レーテンシー: 低レーテンシーを達成することが常に良いこととは限りません。適切なレーテンシーを選択することでコストを削減できますが、精度とコストはトレードオフの関係にあります。Spark構造化ストリーミングは、ストリーミングデータ処理のタイミングを定義するトリガーによって、インクリメンタルに制御することができます。Spark構造化ストリーミングジョブの低レーテンシーは、トリガーの周期を短くすることで達成することができます。レーテンシーの要件とソースにデータに到着する速度のバランスを見て、構造化ストリーミングのトリガーのインターバルを設定することをお勧めします。非常に小さいトリガー周期を設定すると、システムが不要なほど頻度で新規データの到着をチェックすることになる場合があります。
Sparkの構造化ストリーミングは3種類のトリガータイプがあります。
- デフォルト: デフォルトでは、Sparkの構造化ストリーミングは前回のバッチが完了するとすぐに次のバッチを処理します。多くのユースケースには、デフォルトのトリガーがみなさまの要件に適しています。
- 固定インターバル: 固定インターバルを用いることで、ユーザーが指定したインターバルでジョブを処理することができます。通常は、固定インターバルは特定の時間待ったのちに、大規模なマイクロバッチを実行します。
- 一回: データが固定の周期で到着するケースにおいて、一日中クラスターを起動し続けておくことがリソースの無駄になる場合があります。選択肢の一つとしてジョブをバッチモードで実行するというものがあります。しかし、バッチよりもSpark構造化ストリーミングジョブをOnceモードあるいはAvailableNowモードで実行する方がメリットがあります。これらの実行設定によって、クラスターを稼働し続ける必要はなく、定期的にクラスターを起動し、データを処理したのちにクラスターをシャットダウンすることで、劇的にコストを削減することができます。これはバッチジョブと似ていますが、処理したデータの記録、テーブルレベルの原子性を維持することによる耐障害性、処理にまたがるステートフルなオペレーションなど追加のメリットを提供します。
スループット: Spark構造化ストリーミングで高いスループットを達成するためにチューニングすることができる複数のパラメーターが存在します。トリガータイプを選択することとは別に、キーとなるパラメーターはデータ取り込みジョブの並列度です。高いスループットを達成するためには、メッセージバスのパーティションの数を増やすことができます。通常、SparkにはメッセージバスのパーティションとApache Kafkaに対するSpark構造化ストリーミングのパーティションの間に1:1のマッピングを持ちます。しかし、AWS Kinesisの場合、メッセージはメモリーにプレフェッチされるので、KinesisのシャードとSparkタスクの間には直接のマッピングは存在しません。
バッチサイズとパーティションの数をチューニングすることで高いスループットが達成される現実世界のサンプルを考えてみましょう。銀行のユースケースの一つとして、ストリーミングジョブを用いて1日を通じてリアルタイムのトランザクションが処理されているものとします。しかし、リアルタイムで受信したイベントのいくつかは不正確なものだったとします。このため、問題を修正するために1日の終わりに向けて修正バッチが実行されます。修正バッチは同じストリーミングコードで処理されますが、別のジョブインスタンスで実行されます。別のトピックを流れる連続的フローと比べて、高いスループットを達成するためにパーティションの数とバッチサイズが引き上げられました。
耐障害性: Spakr構造化ストリーミングは、マイクロバッチでジョブを実行するので、耐障害性を達成するために異なる2つの利点を提供します。
- 他のエグゼキューター上でタスクを再スケジューリングすることで、タスクは障害から効率的に復旧することができます。
- 決定論的タスクは、同じタスクを行う複数の処理実行が同じ出力を提供することで、一回のみ処理されることを保証します。
Spark構造化ストリーミングにおいては、失敗したジョブの復旧はクエリーごとのチェックポイント格納場所を用いて実施されます。チェックポイント格納場所にあるオフセットによって、失敗したポイントから正確にジョブを再開することができます。クエリーでチェックポイントの格納場所を指定するには、option("checkpointLocation", "dbfs://checkpointPath")
を使用します。
リプレイ可能なソースと冪等性のあるシンクによって、Spark構造化ストリーミングは、プロダクションレベルのアプリケーションで要件になることが多い一回のみの処理のセマンティックスを実現することができます。
まとめ
ストリーミングデータの取り込みは、レイクハウスで時間にセンシティブな意思決定を実現する最初の一歩となります。この記事では、ストリーミングデータソースを、ファイルの連続的なフローやメッセージバスサービスにカテゴリ分けしました。Auto Loaderは、Spark構造化ストリーミングを用いてファイルソースからニアリアルタイムでのデータ取り込みをシンプルにし、ファイル到着の自動検知、大規模データを処理するスケーラビリティ、スキーマ推定やコスト効率の高いデータ取り込みを提供します。メッセージバスサービスからのデータ取り込みにおいては、Spark構造化ストリーミングは、それぞれのクラウドプロバイダーで用いられるメッセージバスサービスの大部分とインテグレーションされた堅牢性のあるデータ取り込みフレームワークを実現します。大部分のプロダクションレベルのアプリケーションは、コストを最小化し、高い精度を達成するために、レーテンシーとスループットのトレードオフを必要とします。