0
1

Databricksのストリーミングテーブルを動かしてみる

Posted at

以前こちらのマニュアルを訳していました。

ですが、動かしていなかったので、改めて作成から動作確認までをウォークスルーします。

マニュアルはこちら。

ストリーミングテーブルとは

Databricks では、Databricks SQL を使用してデータを取り込むためにストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Unity Catalog マネージドテーブルです。 Delta Live Tables パイプラインは、ストリーミング テーブルごとに自動的に作成されます。 ストリーミング テーブルを使用して、Kafka およびクラウド オブジェクト ストレージからの増分データ ロードを行うことができます。

データソース(オブジェクトストレージやメッセージングシステム)に新規データが到着した際に、増分のみを取り込むことができるテーブルです。すでにDelta Live Tablesで実装されていた機能が、Databricks SQLからも利用できるようになったという背景があります。

ストリーミングテーブルの作成

ストリーミングテーブルを作成する際にはデータソースを指定する必要がありますが、オブジェクトストレージを指定する際にはデフォルトではオブジェクトストレージにファイルが存在している必要があります。

ですので、以下のようなファイルをボリュームにアップロードしておきます。ここでのパスは、/Volumes/users/takaaki_yayoi/landingとなります。

one.csv
id,name,age
1,taka,51
2,polka,8

スキーマ推定の際に数値がintに識別されるように、カンマの前後に空白を含めないようにします。

SQLノートブックあるいはSQLエディタで以下を実行します。

USE users.takaaki_yayoi;
CREATE
OR REFRESH STREAMING TABLE streaming_test AS
SELECT
  *
FROM
  STREAM(
    read_files('/Volumes/users/takaaki_yayoi/landing')
  )

処理が成功したらテーブルを確認します。

SELECT * FROM streaming_test;

Screenshot 2024-08-05 at 12.55.11.png

_rescued_dataは、背後で動いているAuto Loaderによって作成されたものです。詳細はこちらをご覧ください。

ストリーミングテーブルの更新

二つ目のファイルをボリュームにアップロードします。

two.csv
id,name,age
3,ume,9
4,yuki,5

ストリーミングテーブルを更新します。

REFRESH STREAMING TABLE streaming_test;
SELECT * FROM streaming_test;

Screenshot 2024-08-05 at 12.57.51.png

ストリーミングテーブルを作成すると、そのテーブルを更新するためのDelta Live Tablesパイプラインが作成されます。カタログエクスプローラでストリーミングテーブルを表示すると、更新の詳細を参照からDLTパイプラインを確認することができます。
Screenshot 2024-08-05 at 12.45.00.png
Screenshot 2024-08-05 at 12.45.25.png

SQLリファレンス(英語)はこちらです。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1