How to Extract Large Query Results Through Cloud Object Stores - The Databricks Blogの翻訳です。
TableauやMicrosoft Power BIのようなビジネスインテリジェンス(BI)ツールは、データ転送のボトルネックとなるSQLエンドポイントに対して、多くの場合シングルスレッドでデータを取得するため、従来型のデータウェアハウスから大規模なクエリー結果を取得する際には遅くなることで有名です。データアナリストは使っているBIツールをDatabricks SQLのエンドポイントに接続し、我々のSimbarドライバーに統合されているODBC/JDBCプロトコルを通じて、テーブルのデータに対するクエリーを実行することができます。Databricksランタイム8.3とSimba ODBC 2.6.17ドライバーでリリースされたCloud Fetchを用いることで、BIツールで高速なデータ転送を実現するために、AWS S3やAzure Data Lake Storageのようなクラウドストレージ経由で並列でデータを取り出す新たな機構を活用することができます。Cloud Fetchを用いた我々の実験では、並列性によって抽出性能の10倍もの高速化を達成しています。
動機付けおよび課題
BIツールは、クエリー実行処理の詳細を隠蔽しながらも、分析アプリケーションを実行することでデータアナリストに素晴らしい可視化を提供できるため、大企業での利用が増加しています。BIツールはクエリーを実行し結果を取得するために、標準的なODBC/JDBCプロトコルを用いてSQLエンドポイントと通信を行います。Cloud Fetchの導入前は、DatabricksでもApache SparkTMを用いて同様のアプローチを採用していました。この場合、エンドツーエンドの抽出性能は通常、シングルスレッドのSQLエンドポイントからお使いのBIツールに結果を返却するまでの時間が支配的な要因となります。
Cloud Fetch以前は、図1に表現されているデータフローは比較的シンプルなものでした。BIツールは、クエリーが計算スロット上で並列で実行されるクラスターによって構成されるSQLエンドポイントに接続します。クエリーの結果は、クライアントとクラスター間の通信を調整するノードとして振る舞うSQLエンドポイント上に収集されます。SQLエンドポイントのリソースの上限を超えずに大規模データを提供するために、100MB以上のデータがローカルディスクに保存されるように、SQLエンドポイント上でdisk-spilling(ディスクへの溢れ)を有効にします。結果が収集され、時にはdisk-spillが起きた後で、リクエストしたBIツールに結果を返却することができます。サーバーはすべてのデータを一括で返却せず、代わりに複数の小さなchunkに分割します。
図1 典型的なデータウェアハウスからシングルスレッドでデータを抽出するBIのデータフローの概要
我々は、データフローを非効率的なものにするスケーラビリティーの問題と、数百MBのデータを抽出する際にSQLエンドポイントがボトルネックとなるリスクを特定しました。
- マルチテナント 限られたegress帯域は、同じSQLエンドポイントにアクセスする複数ユーザーで共有されます。同時実行ユーザーの数が増えると、それぞれのデータ抽出性能は劣化します。
- 並列性の欠如 クラスターはクエリーを並列実行しますが、エグゼキューターからクエリー結果を収集しBIツールに渡す部分はシングルスレッドとなります。クライアントは数MBのchunkを逐次取り出す間、SQLエンドポイントでの結果の格納、提供がボトルネックとなります。
Cloud Fetchのアーキテクチャ
これらの制限に対応するために、結果の読み書きの両方が並列で行われるようにデータ抽出アーキテクチャを見直しました。ハイレベルにおいては、それぞれのクエリーは利用可能な計算資源で実行する複数のタスクに分割され、これらのタスクは Azure Data Lake Storage、AWS S3、Google Cloud Storageに結果を書き込みます。SQLエンドポイントはクライアントにサイン済みのURLとしてファイルの一覧を送付し、クライアントはクラウドストレージから直接並列でデータをダウンロードすることができます。
図2 Cloud Fetchアーキテクチャによる並列データ抽出の概要
データレイアウト クエリータスクは入力データセットの個々のパーティションを処理し、Arrowでシリアライズされた結果を生成します。Apache Arrowは列指向インメモリデータ分析のデファクト標準となっており、多くのオープンソースプロジェクトで既に活用されています。それぞれのクエリータスクは、Arrowのストリーミングフォーマットで20MBのchunkでクラウドストレージにデータを書き込みます。それぞれのファイルの中は、固定数の行、バイトで構成される複数のArrowバッチとなります。アップロードされたチャンクに対してさらにLZ4圧縮をかけて、帯域の狭い環境のユーザーのデータ取得をサポートしています。
結果の収集 SQLエンドポイントはMB規模、GB規模のクエリー結果を収集するのではなく、クラウドストレージへのリンクを格納するので、メモリーフットプリントとdisk-spillingのオーバーヘッドが劇的に削減されます。我々の実験では、Cloud Fetchを用いることで、1MB以上のサイズのクエリーにおいては2倍の改善が認められました。しかし、1MBより小さい結果をクラウドストレージにアップロードする際には、無視できないレーテンシーに苦しめられる傾向がありました。このため、結果を制御し小規模なクエリー結果のレーテンシーを回避するか、結果をアップロードし大規模なクエリー結果のスループットを改善するかを選択できるハイブリッドのフェッチ機構を設計しました。SQLエンドポイントに結果を収集する際、3つのシナリオが考えられます。
- すべてのタスクがArrowバッチを返却し、トータルのサイズが1MB以下の場合。これはレーテンシーが問題となり、クラウドストレージからのデータ取得が理想的でない非常に短いクエリーのケースです。上述したシングルスレッドの機構で結果を直接クライアントに返却します。
- すべてのタスクがArrowバッチを返却し、トータルのサイズが1MB以上の場合、あるいは、タスクがArrowバッチとクラウドファイルの両方を返却する場合。この場合、タスクと同じデータレイアウトを用いて、SQLエンドポイントから残りのArrowバッチをクラウドストレージにアップロードし、結果のファイル一覧を格納します。
- すべてのタスクがクラウドファイルへのリンクを返却する。この場合、クラウドへのリンクをメモリに保存し、フェッチのリクエストに対してリンクを返却します。
フェッチリクエスト SQLエンドポイントでデータが利用できる場合、BIツールは小さなchunkをリクエストして逐次でフェッチを行います。フェッチリクエストを受けて、SQLエンドポイントは現在のオフセットに対応するファイルを取得し、一連のサイン済みURLをクライアントに返却します。これらのURLはクラウドプロバイダーを認識する必要がなく、基本的なHTTPクライアントを用いてダウンロードすることができるため、BIクライアントに対しては便利なものとなっています。BIツールは、並列で返却されたファイルをダウンロードし、中身を解凍し、Arrowバッチからそれぞれの行を抽出します。
実験結果 合計3.42GBの400万行の20カラムから構成される合成データを用いてデータ抽出の実験を行いました。Cloud Fetchを用いることで、シングルスレッドのベースラインと比較して、12倍の抽出スループットの改善を観測しました。
使い始めてみる
データ抽出のスピードアップをしたいですか?最新のODBCドライバーをダウンロードすることでCloud Fetchを活用してみてください。この機能は、AWS、Azure上のDatabricksランタイム8.3以降がデプロイされているインタラクティブDatabricksクラスター、あるいはDatabricks SQLで利用することができます。Simba ODBCドライバー2.6.17、間も無く公開されるSimba JDBCドライバー2.6.18に、Cloud Fetchの機能を組み込んでいます。