以前こちらのマニュアルを訳していました。
ですが、動かしていなかったので、改めて作成から動作確認までをウォークスルーします。
マニュアルはこちら。
ストリーミングテーブルとは
Databricks では、Databricks SQL を使用してデータを取り込むためにストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Unity Catalog マネージドテーブルです。 Delta Live Tables パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、Kafka およびクラウド オブジェクト ストレージからの増分データ ロードを行うことができます。
データソース(オブジェクトストレージやメッセージングシステム)に新規データが到着した際に、増分のみを取り込むことができるテーブルです。すでにDelta Live Tablesで実装されていた機能が、Databricks SQLからも利用できるようになったという背景があります。
ストリーミングテーブルの作成
ストリーミングテーブルを作成する際にはデータソースを指定する必要がありますが、オブジェクトストレージを指定する際にはデフォルトではオブジェクトストレージにファイルが存在している必要があります。
ですので、以下のようなファイルをボリュームにアップロードしておきます。ここでのパスは、/Volumes/users/takaaki_yayoi/landing
となります。
id,name,age
1,taka,51
2,polka,8
スキーマ推定の際に数値がintに識別されるように、カンマの前後に空白を含めないようにします。
SQLノートブックあるいはSQLエディタで以下を実行します。
USE users.takaaki_yayoi;
CREATE
OR REFRESH STREAMING TABLE streaming_test AS
SELECT
*
FROM
STREAM(
read_files('/Volumes/users/takaaki_yayoi/landing')
)
処理が成功したらテーブルを確認します。
SELECT * FROM streaming_test;
_rescued_data
は、背後で動いているAuto Loaderによって作成されたものです。詳細はこちらをご覧ください。
ストリーミングテーブルの更新
二つ目のファイルをボリュームにアップロードします。
id,name,age
3,ume,9
4,yuki,5
ストリーミングテーブルを更新します。
REFRESH STREAMING TABLE streaming_test;
SELECT * FROM streaming_test;
ストリーミングテーブルを作成すると、そのテーブルを更新するためのDelta Live Tablesパイプラインが作成されます。カタログエクスプローラでストリーミングテーブルを表示すると、更新の詳細を参照からDLTパイプラインを確認することができます。
SQLリファレンス(英語)はこちらです。