Load data in Databricks SQL | Databricks on AWS [2023/6/27時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
本機能はパブリックプレビューです。アクセスにサインアップするには、このフォームに記入してください。
Databricks SQLを用いてデータを取り込むためにストリーミングテーブルを使うことをお勧めします。ストリーミングテーブルは、ストリーミングやインクリメンタルなデータ処理のサポートが追加された、Unity Catalogのマネージドテーブルです。それぞれのストリーミングテーブルに対して自動でDLTテーブルが作成されます。Kafkaやクラウドオブジェクトストレージからのインクリメンタルなデータロードでストリーミングテーブルを活用することができます。
本書では、Unity Catalogの外部ロケーションとして設定されたクラウドオブジェクトストレージからのデータをロードするために、ストリーミングテーブルの活用をデモンストレーションします。
始める前に
始める前に、以下の点を確認してください:
- Databricksアカウントでサーバレスが有効化されている。詳細に関しては、Use serverless SQL warehousesをご覧ください。
- ワークスペースでUnity Catalogが有効化されている。詳細については、Unity Catalogを使い始めるをご覧ください。
- SQLウェアハウスで
Current
チャンネルを使用している。 - Unity Catalogの外部ロケーションに対する
READ FILES
権限がある、詳細は、Unity Catalogにおける外部ロケーションとストレージ資格情報の管理をご覧ください。 - ストリーミングテーブルを作成するカタログに対する
USE CATALOG
権限。 - ストリーミングテーブルを作成するスキーマに対する
USE SCHEMA
権限。 - ストリーミングテーブルを作成するスキーマにおける
CREATE TABLE
権限。 - クラウドオブジェクトストレージのソースデータのパス。例えば、
s3://myBucket/analysis
。
注意
このチュートリアルでは、あなたがアクセスできるUnity Catalogの外部ロケーションに対応するクラウドオブジェクトストレージのロケーションにロードしたいデータを想定しています。
ソースデータの特定とプレビュー
- ワークスペースのサイドバーで、Queryをクリックし、Create Queryをクリックします。
- クエリーエディタで、ドロップダウンリストから
current
チャンネルを使用しているSQLウェアハウスを選択します。 - 以下をエディタに貼り付け、カッコ(
<>
)内の値をあなたのソースデータを特定する情報で置き換えて、Runをクリックします。
注意
read_files
テーブル値関数を実行する際、関数のデフォルトがあなたのデータをパースできないばあい、スキーマ推定エラーに遭遇する場合があります。例えば、マルチラインのCSVやJSONファイルでマルチラインモードを設定する必要がある場合があります。パーサーのオプションのリストに関しては、read_files table-valued functionをご覧ください。
/* Discover your data in an external location */
LIST "s3://<bucket>/<folder>"
/* Preview your data */
SELECT * FROM read_files("s3://<bucket>/<folder>") LIMIT 10
ストリーミングテーブルへのデータのロード
クラウドオブジェクトストレージからストリーミングテーブルを作成するには、クエリーエディタに以下を貼り付けてRunをクリックします:
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<folder>')
DLTパイプラインを用いたストリーミングテーブルのリフレッシュ
このセクションでは、クエリーで定義されるソースから利用できる最新データでストリーミングテーブルをリフレッシュするパターンを説明します。
ストリーミングテーブルに対するCREATE
オペレーションは、初期の作成とストリーミングテーブルへのデータロードにDatabricks SQLを使用します。ストリーミングテーブルに対するREFRESH
オペレーションでは、Delta Live Tables(DLT)を使用します。ストリーミングテーブルごとにDLTパイプラインが自動で作成されます。ストリーミングテーブルがリフレッシュされると、リフレッシュの処理を行うために、DLTパイプラインのアップデートが起動されます。
REFRESH
コマンドを実行した後は、DLTパイプラインのリンクが返却されます。リフレッシュのステータスをチェックするためにDLTパイプラインのリンクを活用することができます。
注意
最新データを取得するためにテーブルのオーナーのみがストリーミングテーブルをリフレッシュすることができます。テーブルを作成したユーザーが所有者となり、所有者を変更することはできません。
What is Delta Live Tables?をご覧ください。
新規データのみを取り込み
デフォルトでは、read_files
関数はテーブル作成時にソースディレクトリにあるすべての既存データを読み込み、リフレッシュのたびに新規に到着するレコードを処理します。
テーブル作成時にソースディレクトリにある既存データの取り込みを避けるために、ignoreExistingFiles
オプションを使用します。これは、テーブル作成の処理の後にディレクトリに到着したデータのみを意味します。例えば:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', ignoreExistingFiles => false)
ストリーミングテーブルの完全リフレッシュ
フルリフレッシュは、最新の定義を用いて、ソースで利用できるすべてのデータを再処理します。フルリフレッシュは既存データを削除するので、Kafkaのようなデータのすべての履歴を保持しないソースや保持期間が短いソースに対して、フルリフレッシュを呼び出すことはお勧めしません。ソースでデータが利用できない場合、古いデータを復旧できない場合があります。
例えば:
REFRESH STREAMING TABLE my_bronze_table FULL
自動リフレッシュされるストリーミングのスケジュール
定義されたスケジュールをベースとしてストリーミングテーブルが自動でリフレッシュするように設定するために、クエリーエディタに以下を貼り付けてRunを実行します:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
リフレッシュスケジュールクエリーのサンプルは、ALTER STREAMING TABLEをご覧ください。
リフレッシュのステータス追跡
Delta Live Tables UIでストリーミングテーブルを管理するパイプラインを参照する、あるいは、ストリーミングテーブルにDESCRIBE EXTENDED
コマンドを実行することで返却されるRefresh Informationを確認することで、ストリーミングテーブルのリフレッシュのステータスを参照することができます。
DESCRIBE EXTENDED <table-name>
Kafkaからのストリーミング取り込み
Kafkaからのストリーミング取り込みのサンプルに関しては、read_kafkaをご覧ください。
ストリーミングテーブルへのアクセスの許可
他のユーザーがストリーミングテーブルをクエリーできるように、SELECT
権限を付与するには、以下をクエリーエディタに貼り付けてRunをクリックしてください:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Unity Catalogのセキュリティ保護可能オブジェクトに対する権限の付与の詳細については、Unity Catalogにおける権限およびセキュリティ保護可能オブジェクトをご覧ください。
その他のリソース
- Streaming table
- read_files table-valued function
- CREATE STREAMING TABLE
- ALTER STREAMING TABLE
- read_kafka table-valued function