Simplifying Data Ingestion with Auto Loader for Delta Lakeの翻訳です。
Databricksのユーザーが、様々なデータソースからDelta Lakeにデータをインクリメンタルに読み込める新機能、Auto Loader、および一連のパートナーインテグレーションをパブリックプレビュー(訳者注:現時点ではプレビューではありません)として発表できることを嬉しく思います。Auto Loaderは、クラウドストレージにデータが到着するたびに、継続的かつ効率的にデータを読み込む、Apache Sparkに最適化されたクラウドファイルソースです。パートナーインテグレーションによるデータ投入ネットワークによって、数百のデータソースを直接Delta Lakeに読み込むことが可能となります。
全てのデータを統合
企業にはサイロ化された様々なデータが数多く存在します。これは、データベース(Oracle、MySQL、Postgresなど)からアプリケーション(Salesforce、Marketo、HubSpot)まで多岐に渡ります。数多くの分析ユースケースにおいては、意味のあるレポートや予測を行うために多岐にわたるデータソースからのデータを必要とします。例えば、完全なファンネル分析レポートにおいては、hubspotにあるリードの情報からPostgresデータベースにある製品サインアップの情報まで多岐にわたる情報が必要となります。
データウェアハウスにはPython/Rの機械学習ライブラリが効率的にアクセスできないため、データウェアハウスにのみデータを集中させることはアンチパターンと言えます。分析ユースケースは、シンプルなSQLレポートの構築から、より高度な機械学習の予測まで含まれるので、様々なユースケースに対応できるようにオープンなフォーマットによる集中管理されたデータレイクを構築することが重要となります。
昨年(2019年)のDelta Lakeのオープンソース化以来、数千の企業が以前よりも高信頼、高効率なオープンフォーマットによる集中管理データレイクを構築しています。DatabricksのDelta LakeはACIDトランザクションを提供し、BIツールにおけるアドホックSQLクエリーからスケジュールされたオフライントレーニングジョブまで、様々なアクセスパターンに対応できるようデータを活用するためには重要な効率的なインデックスも提供します。このような、BIからMLのユースケースに対応し、計算資源とストレージを分離することで、オープンフォーマット、かつ、集中管理、高信頼、高効率な単一の真実となるデータ構築パターンを「レイクハウス」と呼びます。
図1. Delta Lakeにおける一般的なデータフロー。データはingestionテーブルにロードされ、以降のテーブルでクレンジングされたのち、ML、BIユースケースで活用されます。
レイクハウス構築における重要な課題は、様々なデータソースからのデータを統合することです。データジャーニーにおいては、データチームにおいて二つの一般的なシナリオが存在します:
- サードパーティのデータソースからのデータ投入: 多くの場合、HotspotやPorstgresデータベースなど様々な内部のデータソースに価値のあるユーザーデータを格納しています。それらのデータソースからデータを取り出し、Delta Lakeに格納するためには、それぞれのデータソースに特化したコネクターを開発する必要があります。
- クラウドストレージからのデータ投入: データソースからクラウドストレージにデータを投入するメカニズムは既に存在しています。クラウドストレージにデータが到着するたびに、新規のデータを特定し、さらなる処理のためにDelta Lakeに読み込む必要があります。
サードパーティデータソースからのデータ投入
内部のデータソースからデータを投入するためには、それぞれに特化したコネクターを開発する必要があります。データソースのAPIを用いて、ソーススキーマをDelta Lakeのスキーマ機能にマッピングするコネクターを構築するには多大なる時間と労力が必要となります。さらに、APIやソースのスキーマが変更するのに合わせてコネクターをメンテナンスする必要もあります。このメンテナンス問題は、データソースが追加されるたびにさらに大きなものとなります。
ユーザーの皆様が全てのデータをDelta Lakeで容易にアクセスできるようにするために、我々は一連のデータ投入製品(data ingestion product)とパートナーシップを結びました。このデータ投入製品パートナーのネットワークは、クラウドストレージのDelta Lakeに直接データを登録、格納することができるように、Databricksとネイティブのインテグレーションを構築しました。これによって、データサイエンティスト、データアナリストは、容易に様々なデータソースからのデータを活用し始められるようになります。
Azure Databricksの利用者は既に、様々なデータソースからクラウドストレージにデータを投入するために、Azure Data Factoryとのインテグレーションを活用しています。また、皆様が様々なデータソースからデータを投入できるように、新たにFivetran、Qlik、Infoworks、StreamSets、Syncsortとのパートナシップを発表できることを嬉しく思います。また、このデータ投入製品ネットワークを近いうちにInformatica、Segment、Stitchに拡大する予定です。
図2. データ登録製品パートナーと、パートナー製品からDelta Lakeにデータを取り込める人気のあるデータソースによるエコシステム
クラウドストレージからのデータ投入
クラウドのblogストレージに新たなデータが到着するたびにインクリメンタルにデータを処理し、分析に適した形に変換することは、ETLワークロードにおける一般的なワークフローです。しかし、低コストでクラウドblobストレージから継続的なデータの読み込み、および、「exactly-once」の保証、低レーテンシー、DevOps労力の最小化を達成することは困難です。
Delta Lakeにデータが格納されれば、Delta LakeのACIDトランザクションによって、データを信頼性高く読み込めるようになります。Deltaテーブルからストリームでデータを読み込むには、 新規にファイルが追加されたことを迅速に検知するテーブルのトランザクションログを活用するDeltaソース(Azure|AWS)を使用できます。
しかし、クラウドストレージに投入された生のファイルをDeltaテーブルにロードする部分に主要なボトルネックが存在しています。従来のファイルベースのストリーミングソース(Azure|AWS)は、繰り返しクラウドストレージのディレクトリの一覧を取得し、どのようなファイルがあるのかを追跡することで新たなファイルを識別します。繰り返し行われるファイル一覧取得処理によって、ディクレクトリにファイルが追加されるたびにコスト、レーテンシーは増加していきます。この問題を解決するために、データチームは以下のいずれかのワークアラウンドをとることになります。
- エンドツーエンドの高いデータレーテンシー: データは数分ごとに到着したとしても、ディレクトリのデータを一緒にまとめてスケジュールで処理を行います。日あるいは時間単位のパーティションディレクトリが一般的なテクニックとなります。これは後段のデータ利用者に対するデータ可用性のSLAを引き伸ばします。
- 手動のDevOpsアプローチ: SLAを低く保つために、新たなファイルがメッセージキューに到着したことを通知し、新たなファイルを処理できるように、クラウドの通知サービス、メッセージキューサービスを活用することもできます。このアプローチは、手動でのクラウドサービスのセットアッププロセスを必要とするだけではなく、データをロードする複数のETLジョブが必要となった際には容易に複雑なものになります。さらに、ディレクトリにある既存ファイルの再処理には、クラウド通知の設定に加え、ファイル一覧の取得、ファイル操作が必要となり、設定がさらに複雑になります。
低コスト、低レーテンシー、DevOps労力を最小化した状態で、データチームがシームレスに生データをロードできるように、Auto Loaderは上記の制限を解決する最適化されたファイルソースとなります。必要なことはソースディレクトリのパスを指定し、ストリーミングジョブをスタートするだけです。「cloudFiles」と呼ばれる新たな構造化ストリーミングソースは、入力ディレクトリにおけるファイルイベントをサブスクライブするファイル通知サービスを自動でセットアップし、新たなファイルが到着するたびに処理を行います。オプションでディレクトリ内の既存ファイルを処理することが可能です。
図3. 低いSLAでexactly-onceのデータ投入を実現するためには、複数のクラウドサービスの手動セットアップが必要となります。Auto Loaderはこれらの複雑な作業をアウトボックスで利用できるようにします。
Auto Loaderを使用するメリット
- ファイル状態管理が不要: クラウドストレージに新規ファイルが到着するたびにインクリメントに処理します。何のファイルが到着したのかに関する状態管理は不要です。
- スケーラブル: ディレクトリ内のファイルの一覧を作成することなしに、クラウドサービスとRocksDBを用いることで効率的に新規ファイルの到着を追跡します。このアプローチはディレクトリに数百万のファイルがあったとしてもスケールします。
- 使いやすい: ファイルにインクリメンタル処理に必要な通知、メッセージキューサービスを自動でセットアップします。ユーザー側でのセットアップは不要です。
Auto Loaderによるストリーミングロード
Apache Sparkで馴染みのあるload APIを活用して、ストリーミングジョブに対する最小限のコード変更で利用をスタートできます。
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/input/path")
Auto Loaderによるスケジュールバッチ読み込み
データが数時間ごとに投入されるのであれば、構造化ストリーミングのTrigger.Onceモードを用いて、スケジュールジョブでauto loaderを活用できます。
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/input/path")
df.writeStream.trigger(Trigger.Once)
.format("delta")
.start("/output/path")
Databricksのジョブスケジューラ(Azure|AWS)を用いて、上のコードを時間単位、日次で実行することで、新たなデータをインクリメントにロードすることができます。上のアプローチではデータ到着が遅延するシナリオを心配する必要はありません。
COPYコマンドによるスケジュールバッチ読み込み
宣言的文法を好むユーザーは、SQL COPYコマンドを用いることで、スケジュールベースでDelta Lakeにデータをロードすることができます。COPYコマンドは冪等性があるので、失敗した場合にも安全に再実行できます。このコマンドは前回ロードされたファイルを自動的に無視し、exactly-onceセマンティクスを保証します。これによって、データチームは容易に頑健性のあるデータパイプラインを構築できます。
コマンドの文法は以下の通りとなっています。詳細に関しては、COPYコマンドのドキュメント(Azure|AWS)を参照ください。
COPY INTO tableIdentifier
FROM { location | (SELECT identifierList FROM location) }
FILEFORMAT = { CSV | JSON | AVRO | ORC | PARQUET }
[ FILES = ( '' [ , '' ] [ , ... ] ) ]
[ PATTERN = '' ]
[ FORMAT_OPTIONS ('dataSourceReaderOption' = 'value', ...) ]
[ COPY_OPTIONS ('force' = {'false', 'true'}) ]
図4: 新機能によるDelta Lakeへのデータ投入。Auto Loaderによるストリーミングロードはexactly-onceのデータ投入を保証します。COPYコマンドによるバッチロードはリトライしても冪等性があります。
データ投入機能を使ってみる
全てのデータをデータレイクに統合することは、機械学習、ビジネスアナリティクスのユースケースが成功するために重要なことであり、多くの企業で取り組まれています。効率的なデータレイクを構築するための数千のユーザージャーニーを支援するために、Auto Loaderとパートナーインテグレーションを発表できることを嬉しく思います。現時点ではプレビュー(訳者注:現状はプレビューではありません)です。ドキュメントでは、Delta Lakeにデータをロードするために、パートナーインテグレーションの使用法(Azure|AWS)、Auto Loader(Azure|AWS)、COPYコマンド(Azure|AWS)が説明されています。