Auto Loader | Databricks on AWS [2022/4/19時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Auto Loaderはクラウドストレージに到着する新規データファイルをインクリメンタルかつ効率的に処理します。Auto LoaderはAWS S3 (s3://
), Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://
)、Google Cloud Storage (GCS, gs://
)、Azure Blob Storage (wasbs://
)、ADLS Gen1 (adl://
)、Databricksファイルシステム (DBFS, dbfs:/
)からデータファイルをロードすることができます。Auto Loaderでは、JSON
、CSV
、PARQUET
、AVRO
、ORC
、TEXT
、BINARYFILE
ファイルフォーマットを取り込むことができます。
Auto LoaderはcloudFiles
という構造化ストリーミングソースを提供します。クラウドファイルストレージの入力ディレクトリパスを提供することで、cloudFiles
ソースは、到着するファイル、そして、オプションによっては既存ファイルも自動で処理します。
Auto Loaderは、一時間あたり数百万のファイルがロードされるパイプラインで、数十億のファイルをバックフィルする必要があるようなストレージアカウントからデータをロードできるようにスケールすることができます。
Auto Loaderの動作原理
Auto Loaderでは新規ファイルを検知するために2つのモードをサポートします: ディレクトリ一覧とファイル通知です。
- ディレクトリ一覧: Auto Loaderは入力ディレクトリの一覧を生成することで新規ファイルを識別します。ディレクトリ一覧モードを用いることで、クラウドストレージのデータにアクセスする以外のアクセス権設定なしに、Auto Loaderのストリームをクイックに起動することができます。Databricksランタイム9.1以降では、お使いのクラウドストレージの語順でファイルが到着しているかどうかを自動で検知し、新規ファイルを検知するのに必要なAPI呼び出しの数を劇的に削減します。詳細はインクリメンタルな一覧を参照ください。
- ファイル通知: Auto Loaderでは通知サービスと入力ディレクトリのファイルイベントをサブスクライブするキューサービスを自動的にセットアップすることができます。ファイル通知モードはより高性能であり、大規模な入力ディレクトリや大規模ファイルにスケールすることができますが、セットアップには追加のアクセス権が必要となります。詳細はファイル通知の活用をご覧ください。
これらのモードの可用性を以下に示します。
クラウドストレージ | ディレクトリ一覧 | ファイル通知 |
---|---|---|
AWS S3 | 全バージョン | 全バージョン |
ADLS Gen2 | 全バージョン | 全バージョン |
GCS | 全バージョン | Databricksランタイム9.1以降 |
Azure Blob Storage | 全バージョン | 全バージョン |
ADLS Gen1 | Databricksランタイム7.3以降 | 未サポート |
DBFS | 全バージョン | マウントポイントのみ |
ファイルが検知されると、お使いのAuto Loaderパイプラインのチェックポイントの格納場所のスケーラブルなキーバリューストア(RocksDB)にそれらのメタデータが永続化されます。このキーバリューストアによって、確実に一度のみ処理(exactly-once)が行われることを保証します。ストリームを再起動する合間にファイル検知モードを切り替えることができ、依然として一度のみのデータ処理を保証することができます。実際、このようにして、Auto Loaderは既存ファイルを含むディレクトリのバックフィルの実行と、ファイル通知を通じて検知される新規ファイルの継続的処理の両方が可能となります。
処理の失敗時には、チェックポイントに格納されている情報を用いてAuto Loaderはその地点から処理を再開することができ、Delta Lakeに対してデータ書き込みを行う際に一度のみ書き込まれることを確実にし続けます。耐障害性や一度のみ実行するせまんティクスを実現するために、ご自身で状態を維持管理する必要はありません。
いつCOPY INTOを使用し、いつAuto Loaderを使用するのか
COPY INTOコマンドは、Delta Lakeに対する一度のみの書き込み保証を行うために、インクリメンタルにデータをロードするためのもう一つの便利な方法です。Auto LoaderかCOPY INTOを選択する際の検討事項をいくつか示します。
- 数千規模のファイルを取り込むのであれば、
COPY INTO
を使う方が望ましいです。長い間にわたる数百万のファイルが予想されるのであれば、Auto Loaderを使ってください。Auto LoaderはCOPY INTOよりもコスト低くファイルを検知でき、処理を複数のバッチに分割することができます。 - 頻繁にデータのスキーマが変更されるのであれば、Auto Loaderはスキーマ推定、スキーマ進化に関わる優れた機能を提供します。詳細はスキーマ推定とスキーマ進化をご覧ください。
- 再度アップロードされたファイルのサブセットのロードは、COPY INTOで管理した方が若干簡単になる場合があります。Auto Loaderでは、ファイルのサブセットをを再処理することが困難です。しかし、Auto Loaderのストリームが稼働しているのと同時に、ファイルのサブセットをリロードするためにCOPY INTOを使用することができます。
Apache SparkのFileStreamSource
に対するメリット
Apache Sparkでは、spark.readStream.format(fileFormat).load(directory)
を用いてファイルをインクリメンタルに読み込むことができます。Auto Loaderはファイルソースと比べて以下のメリットがあります。
- スケーラビリティ: Auto Loaderは数十億のファイルを効率的に検知することができます。計算リソースを無駄にすることなしに、非同期でバックフィルを実行することができます。
- パフォーマンス: Auto Loaderによるファイル検知のコストは、ファイルが到着するであろうディレクトリの数ではなく、取り込まれるファイルの数でスケールします。最適化されたディレクトリ一覧をご覧ください。
- スキーマ推定とスキーマ進化のサポート: Auto Loaderはスキーマのドリフトを検知することができ、スキーマの変更が起きた際に通知を行い、無視されるか失われるデータを救助します。スキーマ推定とスキーマ進化をご覧ください。
- コスト: Auto Loaderはストレージに存在するファイルの一覧を取得するためにネイティブのクラウドAPIを使用します。さらに、Auto Loaderのファイル通知モードは、ディレクトリ一覧を完全に回避するのでさらにクラウドコストの削減に役立ちます。Auto Loaderはファイル検知をより安価にできるように、ストレージ上のファイル通知を自動でセットアップすることができます。
クイックスタート
以下のコードサンプルでは、クラウドストレージに到着する新規データファイルをAuto Loaderがどのように検知するのかをデモします。
-
ファイルアップロード用のディレクトリを作成します。
Pythonuser_dir = '<my-name>@<my-organization.com>' upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
Scalaval user_dir = "<my-name>@<my-organization.com>" val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload" dbutils.fs.mkdirs(upload_path)
-
以下のサンプルCSVファイルを作成し、DBFSファイルブラウザを用いてディレクトリにファイルをアップロードします。
WA.csv
:city,year,population Seattle metro,2019,3406000 Seattle metro,2020,3433000
OR.csv
:city,year,population Portland metro,2019,2127000 Portland metro,2020,2151000
-
Auto Loaderを起動するために以下のコードを実行します。
Pythoncheckpoint_path = '/tmp/delta/population_data/_checkpoints' write_path = '/tmp/delta/population_data' # Set up the stream to begin reading incoming files from the # upload_path location. df = spark.readStream.format('cloudFiles') \ .option('cloudFiles.format', 'csv') \ .option('header', 'true') \ .schema('city string, year int, population long') \ .load(upload_path) # Start the stream. # Use the checkpoint_path location to keep a record of all files that # have already been uploaded to the upload_path location. # For those that have been uploaded since the last check, # write the newly-uploaded files' data to the write_path location. df.writeStream.format('delta') \ .option('checkpointLocation', checkpoint_path) \ .start(write_path)
Scalaval checkpoint_path = "/tmp/delta/population_data/_checkpoints" val write_path = "/tmp/delta/population_data" // Set up the stream to begin reading incoming files from the // upload_path location. val df = spark.readStream.format("cloudFiles") .option("cloudFiles.format", "csv") .option("header", "true") .schema("city string, year int, population long") .load(upload_path) // Start the stream. // Use the checkpoint_path location to keep a record of all files that // have already been uploaded to the upload_path location. // For those that have been uploaded since the last check, // write the newly-uploaded files' data to the write_path location. df.writeStream.format("delta") .option("checkpointLocation", checkpoint_path) .start(write_path)
-
ステップ3のコードが実行している状態で、以下のコードでディレクトリに書き込まれたデータに対するクエリーを実行します。
Pythondf_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ '''
Scalaval df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ */
-
ステップ3のコードが実行している状態で、以下の追加のCSVファイルを作成し、DBFSファイルブラウザを用いてディレクトリにファイルをアップロードします。
ID.csv
:city,year,population Boise,2019,438000 Boise,2020,447000
MT.csv
:city,year,population Helena,2019,81653 Helena,2020,82590
Misc.csv
:city,year,population Seattle metro,2021,3461000 Portland metro,2021,2174000 Boise,2021,455000 Helena,2021,81653
-
ステップ3のコードが実行している状態で、以下のコードでディレクトリに書き込まれたデータ、そして、アップロード用ディレクトリでAuto Loaderが検知し、書き込み用ディレクトリに書き込まれたファイルからの新規データに対するクエリーを実行します。
Pythondf_population = spark.read.format('delta').load(write_path) display(df_population) ''' Result: +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ '''
Scalaval df_population = spark.read.format("delta").load(write_path) display(df_population) /* Result +----------------+------+------------+ | city | year | population | +================+======+============+ | Seattle metro | 2019 | 3406000 | +----------------+------+------------+ | Seattle metro | 2020 | 3433000 | +----------------+------+------------+ | Helena | 2019 | 81653 | +----------------+------+------------+ | Helena | 2020 | 82590 | +----------------+------+------------+ | Boise | 2019 | 438000 | +----------------+------+------------+ | Boise | 2020 | 447000 | +----------------+------+------------+ | Portland metro | 2019 | 2127000 | +----------------+------+------------+ | Portland metro | 2020 | 2151000 | +----------------+------+------------+ | Seattle metro | 2021 | 3461000 | +----------------+------+------------+ | Portland metro | 2021 | 2174000 | +----------------+------+------------+ | Boise | 2021 | 455000 | +----------------+------+------------+ | Helena | 2021 | 81653 | +----------------+------+------------+ */
-
クリーンアップするために、ステップ3の実行中のコードをキャンセルし、以下のコードを実行し、アップロード、チェックポイント、書き込み用ディレクトリを削除します。
Pythondbutils.fs.rm(write_path, True) dbutils.fs.rm(upload_path, True)
Scaladbutils.fs.rm(write_path, true) dbutils.fs.rm(upload_path, true)
チュートリアル:Auto LoaderによるDelta Lakeへの継続的データ取り込みもご覧ください。
スキーマ推定とスキーマ進化
注意
この機能はDatabricksランタイム8.2以降で利用できます。
Auto LoaderではCSV, JSON, バイナリー(binaryFile
), テキストファイルフォーマットのスキーマ推定、スキーマ進化にサポートしています。詳細はSchema inference and evolution in Auto Loaderをご覧ください。
大規模データに対するAuto Loaderのスケーリング
Trigger.AvailableNowとレート制限の使用
注意
この機能はDatabricksランタイム10.1以降のScalaで利用できます。
この機能はDatabricksランタイム10.2以降のPythonとScalaで利用できます。
Auto LoaderはTrigger.AvailableNow
を用いることで、バッチジョブとしてDatabricksジョブでスケジュールすることができます。AvailableNow
トリガーは、クエリーの開始時刻の前に到着した全てのファイルを処理するようにAuto Loaderに指示します。ストリームが開始した後にアップロードされた新規ファイルは、次のトリガーまで無視されます。
Trigger.AvailableNow
を用いることで、ファイル検知はデータ処理と非同期で実行され、データは例と制限を用いた複数のマイクロバッチで処理できるようになります。デフォルトではAuto Loaderはマイクロバッチごとに最大1000ファイルを処理します。マイクロバッチでいくつのファイルを処理するのか、何バイトを処理するのかを、cloudFiles.maxFilesPerTrigger
やcloudFiles.maxBytesPerTrigger
で設定することができます。ファイルの制限はハードリミットですが、バイト数の制限はソフトリミットであり、maxBytesPerTrigger
以上のバイト数が処理される場合があります。両方のオプションが設定された際には、Auto Loaderはこれらの制限に到達するまで可能な限り多くのファイルを処理します。
最適化されたディレクトリ一覧
注意
この機能はDatabricksランタイム9.0以降で利用できます。
Auto Loaderは、代替手段よりも効率的なディレクトリ一覧を用いることで、クラウドストレージシステムのファイルを検知することができます。例えば、/some/path/YYYY/MM/DD/HH/fileName
というように、5分間隔でファイルをアップロードしている場合、これらのディレクトリの全てのファイルを検索するためにApache Sparkのファイルソースは並列で全てのサブディレクトリの一覧を作成するので、ストレージに対して 1 (ベースディレクトリ) + 365 (日毎) * 24 (時間ごと) = 8761のLIST API呼び出しを行います。Auto Loaderはストレージからフラット化された結果を受け取ることで、APIの呼び出し数をストレージ上のファイル数
/ それぞれのAPI呼び出しで返却される結果の数
(S3は1000、ADLS Gen2は5000、GCSは1024)に削減し、お使いのクラウドコストを劇的に削減します。
インクリメンタルな一覧
注意
本機能はDatabricksランタイム9.1 LTS以降で利用できます。
名前順に作成されたファイルに対しては、Auto Loaderはファイル名の語順とディレクトリ全体の内容の一覧を作成するのではなく、最近追加されたファイルを一覧することでディレクトリ一覧の効率を改善する最適化一覧APIを活用します。
デフォルトでは、Auto Loaderは、以前に完了したディレクトリ一覧との比較とチェックを行うことで、特定のディレクトリにインクリメンタルな一覧を適用できるのかを自動で検知します。auto
モードでのデータの結果的完全性を保証するために、Auto Loaderは7回連続のインクリメンタル一覧を行なった後に完全なディレクトリ一覧を自動でトリガーします。完全ディレクトリ一覧の頻度はcloudFiles.backfillInterval
で制御でき、指定された周期で非同期のバックフィルをトリガーすることができます。
cloudFiles.useIncrementalListing
を"true"
か"false"
(デフォルトは"auto"
)に設定することで、インクリメンタルな一覧を明示的に有効、無効化することができます。明示的に有効化されると、Auto Loaderはバックフィル周期が指定されるまで完全ディレクトリ一覧をトリガーしません。AWS Kinesis Firehose、AWS DMS、Azure Data Factoryのようなサービスは、ストレージシステムに語順でファイルをアップロードするように設定できるサービスです。語順ディレクトリの構造のサンプルに関しては付録を参照ください。
ファイル通知の活用
バケットに対して名前順でファイルが到着しない場合、Auto Loaderが時間当たり数百万のファイルを取り込めるようにスケールできるようにファイル通知を活用することができます。cloudFiles.useNotifications
をtrue
に設定し、クラウドリソースを作成するのに必要な権限を与えることで、Auto Loaderは自動でファイル通知をセットアップすることができます。さらに、これらのリソースを作成するためにAuto Loaderの認証情報を提供するために以下の追加オプションが必要になる場合があります。以下の表ではAuto Loaderによってどのリソースが作成されるのかをまとめています。
クラウドストレージ | サブスクリプションサービス | キューサービス | プレフィックス(1) | 制限(2) |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | databricks-auto-ingest | S3バケットあたり100 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | ストレージアカウントあたり500 |
GCS | Google Pub/Sub | Google Pub/Sub | databricks-auto-ingest | GCSバケットあたり100 |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | ストレージアカウントあたり500 |
- Auto Loaderがこのプロフィックスでリソースに名前をつけます。
- ファイル通知パイプラインの同時起動数です。
ファイル通知サービスを作成するのに必要なアクセス権をAuto Loaderに指定できない場合、次のセクションにあるようにファイル通知サービスを作成するためのDatabricks ScalaノートブックでsetUpNotificationServices
メソッドを使用するように皆様のクラウド管理者に依頼することができます。あるいは、クラウド管理者が手動でファイル通知サービスをセットアップし、ファイル通知を活用できるようにキューのIDを教えてもらうことができます。詳細については、ファイル通知のオプションをご覧ください。
任意のタイミングでファイル通知とディレクトリ一覧を切り替えることができ、依然として一度のみの処理を保証することができます。
注意
クラウドプロバイダーは、非常に稀ば状況においては全てのファイルイベントの100%のデリバリーを保証せず、ファイルイベントのレーテンシーに対する厳密なSLAを提供しません。データの完全性の要件があり、全てのファイルが特定のSLAで検知されることを保証するために、cloudFiles.backfillInterval
オプションを用いて、Auto Loaderによる定期的なバックフィルを行うことをお勧めします。定期的なバックフィルの起動は重複を引き起こしません。
特定のストレージアカウントに対する制限値以上に、ファイル通知パイプラインを実行する必要がある場合には、以下の方法が考えられます。
- ファイル通知ではなくインクリメンタルな一覧を活用できるように、ファイルのアップロード方法を再度設計し直すことを考えます。
- コンテナ全体あるいはディレクトリ固有のキューに対するバケットをサブスクライブする単一のキューからの通知を分散させるAWS LambdaやAzure Functions、Google Cloud Functionsを活用します。
ファイル通知のイベント
AWS S3では、putかマルチパートアップロードでアップロードされたのかに関係なく、ファイルがS3バケットにアップロードされた際にはObjectCreated
イベントを生成します。
ADLS Gen2は、お使いのGen2コンテナにファイルが出現すると異なるイベント通知を行います。
- ファイルを処理する際、Auto Loaderは
FlushWithClose
イベントをリッスンします。 - Databricksランタイム8.3以降で作成されたAuto Loaderのストリームはファイルの検知のために
RenameFile
アクションをサポートしています。RenameFile
アクションは、リネームされたファイルのサイズを取得するためにストレージシステムに対するAPIリクエストを必要とします。 - Databricksランタイム9.0以降で作成されたAuto Loaderのストリームはファイルの検知のために
RenameDirectory
アクションをサポートしています。RenameDirectory
アクションは、リネームされたディレクトリの内容一覧を取得するためにストレージシステムに対するAPIリクエストを必要とします。
Google Cloud Storageはファイルがアップロードされた際にOBJECT_FINALIZE
イベントを生成します。これには上書きとファイルコピーが含まれます。失敗したアップロードではこのイベントは生成されません。
ファイル通知リソースの管理
Auto Loaderによって作成された通知サービス、キューサービスを管理するためにScala APIを使用することができます。このAPIを使用する前にアクセス権で説明されている権限をセットアップするようにリソースを設定する必要があります。
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by Auto Loader
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
<prefix>-<resource-suffix>
の名前のキューとサブスクリプションを作成するためにsetUpNotificationServices(<resource-suffix>)
を使用します(プレフィックスはファイル通知のオプションにまとめているようにストレージシステムに依存します)。これによって、cloudFiles
ソースのユーザーは、リソースを作成するよりも少ない権限を持つことになります。アクセス権をご覧ください。
setUpNotificationServices
を呼び出す際にのみnewManager
に"path"
オプションを指定します。listNotificationServices
やtearDownNotificationServices
では不要です。これは、ストリーミングクエリーを実行するのと同じpath
となります。
クラウドストレージ | セットアップAPI | 一覧API | ティアダウンAPI |
---|---|---|---|
AWS S3 | 全バージョン | 全バージョン | 全バージョン |
ADLS Gen2 | 全バージョン | 全バージョン | 全バージョン |
GCS | Databricksランタイム9.1以降 | Databricksランタイム9.1以降 | Databricksランタイム9.1以降 |
Azure Blob Storage | 全バージョン | 全バージョン | 全バージョン |
ADLS Gen1 | 未サポート | 未サポート | 未サポート |
イベントの保持
注意
本機能はDatabricksランタイム8.4以降で利用できます。
Auto Loaderは一度のみの取り込み保証を行うために、RocksDBを用いてチェックポイントの中で検知したファイルを追跡し続けます。大容量のデータセットに対しては、ストレージコストとAuto Loaderの起動時間を削減するためにcloudFiles.maxFileAge
を用いてチェックポイントからイベントを期限切れにすることができます。cloudFiles.maxFileAge
に指定できる最小値は"14 days"
となります。RocksDBにおける削除はトゥームストーンのエントリとして表示され、定常状態になる前にイベントが期限切れになるため、一時的にはストレージの使用量は増加します。
警告!
cloudFiles.maxFileAge
は大規模データセットのコストコントロールの手段として提供しており、時間あたり数百万のファイルを取り込みます。不適切にcloudFiles.maxFileAge
をチューニングすると、データ品質問題を引き起こす場合があります。このため、絶対に必要でない場合を除きこれをチューニングすることはお勧めしません。
cloudFiles.maxFileAge
オプションをチューニングしようとすると、Auto Loaderによって未処理のファイルが無視されたり、処理済みのファイルが有効期限切れとなって再処理が行われ、重複データが生成される場合があります。cloudFiles.maxFileAge
を選択する際に検討すべき事項を以下に示します。
- 長い間を置いてからストリームを再起動する際、キューから取得される
cloudFiles.maxFileAge
より古いファイル通知イベントは無視されます。同様に、ディレクトリ一覧を使用している際、ダウンタイムの期間に出現したファイルでcloudFiles.maxFileAge
より古いファイルは無視されます。 - ディレクトリ一覧モードを使用しており、
cloudFiles.maxFileAge
を例えば"1 month"
に設定している場合、ストリームを停止してからcloudFiles.maxFileAge
を"2 months"
に設定して再起動すると、1ヶ月より古く、2ヶ月より新しい全てのファイルが再処理されます。
cloudFiles.maxFileAge
をチューニングするのにベストなアプローチは、ゆるい有効期限、例えば"1 year"
からスタートし、"9 months"
のように短縮していくというものです。このオプションを設定し、最初にストリームを起動した際にcloudFiles.maxFileAge
より古いデータは取り込まれませんので、古いデータを取り込みたい場合には、ストリームを起動する際にこのオプションを設定すべきではありません。
プロダクションでのAuto Loaderの実行
プロダクション環境でAuto Loaderを実行するためには、ストリーミングのベストプラクティスに従うことをお勧めします。
Auto Loaderのモニタリング
ストリーミングアプリケーションをモニタリングするためには、Apache SparkのStreaming Query Listener interfaceを使用することをお勧めします。
Auto Loaderはバッチごとにストリーミングクエリーリスナーにメトリクスを報告します。バックログにどれだけのファイルが存在するのか、バックログがどのくらいのサイズなのかは、ストリーミングクエリー進捗ダッシュボードのRaw Dataタブの下のnumFilesOutstanding
とnumBytesOutstanding
で確認することができます。
{
"sources" : [
{
"description" : "CloudFilesSource[/path/to/source]",
"metrics" : {
"numFilesOutstanding" : "238",
"numBytesOutstanding" : "163939124006"
}
}
]
}
Databricksランタイム10.1以降でファイル通知モードを使用している際は、AWSとAzureのメトリクスにはapproximateQueueSize
としてクラウドキューになるファイルイベント数の近似値が含まれます。
Delta Live TablesにおけるAuto Loaderの利用
Auto LoaderはDelta Live TablesでSQLやPythonから使用することができます。Delta Live TablesでAuto Loaderを使用する際、スキーマの場所やチェックポイントの場所を指定する必要はありません。これらは、ご自身のパイプラインのDelta Live Tablesによって管理されます。
以下の例では、CSVやJSONファイルからのデータセットを作成するためにAuto Loaderを使用しています。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE INCREMENTAL LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderとサポートされているフォーマットオプションを使用することができます。以下の例ではタブ区切りのCSVファイルからデータを読み込んでいます。
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t"))
注意
Delta Live Tablesでファイルを読み込むためにAuto Loaderを使用すると、自動でスキーマとチェックポイントのディレイクトリを設定し管理します。しかし、これらのディレクトリを手動で設定した場合、フルリフレッシュの実行は設定されたディレクトリの内容に影響を与えません。処理の際の思いがけない副作用を避けるために、自動で設定されたディレクトリを使用することをお勧めします。
コストの検討
Auto Loaderを使用する際、主なコストの発生源は計算資源とファイル検知になることでしょう。
計算コストを削減するためには、低レーテンシーの要件がない限り連続的に実行するのではなく、Trigger.AvailableNow
(Databricksランタイム10.1以降)あるいはTrigger.Once
を用いたバッチジョブとしてAuto Loaderをスケジュール処理するようにDatabricksジョブを使用することをお勧めします。
ファイル検知のコストは、ディレクトリ一覧モードにおけるストレージアカウントに対するLISTオペレーション、ファイル通知モードにおけるサブスクリプションサービスとキューサービスのAPIリクエストという形で発生します。ファイル検知のコストを削減するためには以下のことをお勧めします。
- ディレクトリ一覧モードでAuto Loaderを継続的に実行する際に
ProcessingTime
トリガーを指定します。 - 可能であればインクリメンタル一覧を活用できるように、名前順でストレージアカウントにファイルをアップロードするように設計します。
- 特に深くネストされたディレクトリの場合には、ディレクトリ一覧モードでDatabricksランタイム9.0以降を使用します。
- コストを追跡するためにAuto Loaderによって作成されたリソースにタグ付けするためにリソースタグを使います。
Auto Loaderの設定
cloudFiles
ソース固有の設定オプションにはcloudFiles
のプレフィックスがついているので、他の構造化ストリーミングソースオプションと別の名前空間に存在することになります。
ファイルフォーマットのオプション
Auto Loaderを用いることで、JSON
、CSV
、PARQUET
、AVRO
、TEXT
、BINARYFILE
、ORC
ファイルを取り込むことができます。これらのファイルフォーマットに対するオプションについてはフォーマットオプションをご覧ください。
共通のAuto Loaderのオプション
ディレクトリ一覧、ファイル通知モードに対しては以下のオプションを設定することができます。
オプション |
---|
cloudFiles.allowOverwrites タイプ: Boolean 入力ディレクトリのファイル変更が既存データを上書きするかどうかを指定します。Databricksランタイム7.6以降で利用できます。 デフォルト値: false
|
cloudFiles.backfillInterval タイプ: Interval String Auto Loaderが指定された周期、例えば1日に1度バックフィルする場合には 1 day 、1週間に1度バックフィルする場合には1 week で非同期バックフィルを起動することができます。ファイルイベント通知システムはアップロードされた全てのファイルの100%のデリバリーを保証しないため、最終的に全てのファイルが処理されることを保証するために、Databricksランタイム8.4(未サポート)以降で利用できるバックフィルを使用することができます。インクリメンタル一覧を使用している場合には、Databricksランタイム 9.1 LTS以降で利用できる定期的なバックフィルを用いて結果的な完全性を保証することができます。デフォルト値: None |
cloudFiles.format タイプ: String ソースパスのデータファイルフォーマットを指定します。以下の値を指定できます。
|
cloudFiles.includeExistingFiles タイプ: Boolean 入力パスを処理するストリームに既存ファイルを含めるか、初期セットアップ後に到着する新規ファイルのみを処理するのかを指定します。このオプションは最初にストリームを起動した時のみ評価されます。ストリームを再起動した後にこのオプションを変更しても効果はありません。 デフォルト値: true
|
cloudFiles.inferColumnTypes タイプ: Boolean スキーマ推定を活用する際にカラムタイプを推定するかどうかを指定します。デフォルトでは、JSONデータセットを推定する際、カラムは文字列として推定されます。詳細はschema inferenceを参照ください。 デフォルト値: false
|
cloudFiles.maxBytesPerTrigger タイプ: Byte String トリガーごとに処理する新規バイトの最大数です。個々のバッチを10GBのデータに限定するには 10g のようなバイト文字列を指定することができます。これはソフトな最大値です。それぞれ3GBであるファイルが存在する場合、Databricksはマイクロバッチで12GBを処理します。cloudFiles.maxFilesPerTrigger と一緒に指定した場合、DatabricksはcloudFiles.maxFilesPerTrigger かcloudFiles.maxBytesPerTrigger どちらかが先に限界に達するまで処理を行います。Trigger.Once() を使用した際にはこのオプションの効果はありません。デフォルト値: None |
cloudFiles.maxFileAge タイプ: Interval String 重複排除のために、どれだけの期間ファイルイベントを追跡するのかを指定します。1時間あたり数百万ファイル規模のデータを取り込むのではない限り、このパラメーターをチューニングすることはお勧めしません。詳細はイベント保持のセクションをご覧ください。 デフォルト値: None |
cloudFiles.maxFilesPerTrigger タイプ: Integer トリガーごとに処理する新規ファイルの最大数を指定します。 cloudFiles.maxBytesPerTrigger と一緒に指定した場合、DatabricksはcloudFiles.maxFilesPerTrigger かcloudFiles.maxBytesPerTrigger どちらかが先に限界に達するまで処理を行います。Trigger.Once() を使用した際にはこのオプションは効果はありません。デフォルト値: 1000 |
cloudFiles.partitionColumns タイプ: String ファイルのディレクトリ構造から推論したいHiveスタイルのパーティションカラムをカンマ区切りで指定します。Hiveスタイルのパーティションカラムは、 base_path>/a=x/b=1/c=y/file.format のようにイコールで結合されるキーバリューのペアとなります。この例では、パーティションカラムはa 、b 、c となります。スキーマ推定を使用しており、データを読み込む<base_path> を指定している場合、デフォルトではこれらのカラムは自動でスキーマに追加されます。スキーマを指定した場合、Auto Loaderはスキーマにこれらのカラムが含まれていることを期待します。これらのカラムをスキーマに含めたくない場合には、これらのカラムを無視するために"" を指定することができます。さらに、以下の例のように複雑なディレクトリ構造のファイルパスからカラムを推定したい場合にはこのオプションを使用することができます。<base_path>/year=2022/week=1/file1.csv <base_path>/year=2022/month=2/day=3/file2.csv <base_path>/year=2022/month=2/day=4/file3.csv year,month,day としてcloudFiles.partitionColumns を指定することで、file1.csv はyear=2022 を返却しますが、month とday はnull となります。file2.csv とfile3.csv についてはmonth とday は適切にパースされます。デフォルト値: None |
cloudFiles.schemaEvolutionMode タイプ: String データで新規カラムが検知された際のスキーマ進化のモードを指定します。デフォルトでは、JSONデータセットを推定する際、カラムは文字列型として推定されます。詳細はschema evolutionをご覧ください。 デフォルト値:スキーマが指定されない場合には "addNewColumns" 、それ以外の場合はnone
|
cloudFiles.schemaHints タイプ: String スキーマ推定の際にAuto Loaderに指定するスキーマ情報です。詳細はschema hintsをご覧ください。 デフォルト値: None |
cloudFiles.schemaLocation タイプ: String 推定されたスキーマと以降の変更を保存する場所です。詳細はschema inferenceをご覧ください。 デフォルト値: None(スキーマを推定する際には必須です) |
cloudFiles.validateOptions タイプ: Boolean Auto Loaderのオプションを検証し、未知あるいは一貫性のないオプションでエラーを返すかどうかを指定します。 デフォルト値: true
|
ディレクトリ一覧のオプション
以下のオプションはディレクトリ一覧モードで使用します。
オプション |
---|
cloudFiles.useIncrementalListing タイプ: String ディレクトリ一覧モードで完全な一覧ではなくインクリメンタルな一覧を使用するのかどうかを指定します。デフォルトでは、Auto Loaderはベストエフォートで指定されたディレクトリにインクリメンタルな一覧を適用できるかどうかを自動で検知しようとします。 true かfalse を指定することで、明示的にインクリメンタルな一覧か完全なディレクトリ一覧を使うのかを指定することができます。Databricksランタイム9.1 LTS以降で利用できます。 デフォルト値: auto 指定できる値: auto, true, false |
ファイル通知のオプション
以下のオプションはファイル通知モードで使用します。
オプション |
---|
cloudFiles.fetchParallelism タイプ: Integer キューサービスからメッセージをフェッチする際のスレッド数です。 デフォルト値: 1 |
cloudFiles.pathRewrites タイプ: A JSON string 複数のS3バケットからファイル通知を受け取る queueUrl を指定し、これらのコンテナのデータにアクセスするように設定されたマウントポイントを活用した場合にのみ必要となります。マウントポイントでbucket/key のプレフィクスでリライトするためにこのオプションを使用します。プレフィクスのみがリライトされます。例えば、{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"} という設定では、パスs3://<databricks-mounted-bucket>/path/2017/08/fileA.json はdbfs:/mnt/data-warehouse/2017/08/fileA.json にリライトされます。デフォルト値: None |
cloudFiles.resourceTags タイプ: Map(String, String) 関連リソースの識別、紐付けに役立つキーバリューペアのシリーズです。例えば、 cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue") AWSにおける詳細については、Amazon SQS cost allocation tagsとConfiguring tags for an Amazon SNS topicをご覧ください。(1) Azureにおける詳細については、Naming Queues and MetadataとEvent Subscriptionsの properties.labels のカバレッジをご覧ください。Auto Loaderはラベルとして、これらJSON内のキーバリュータグのペアを格納します。(1)GCPにおける詳細については、Reporting usage with labelsをご覧ください。(1) デフォルト値: None |
cloudFiles.useNotifications タイプ: Boolean 新規ファイルがある際にファイル通知モードを使うかどうかを指定します。 false の場合、ディレクトリ一覧モードを使用します。Auto Loaderの動作原理をご覧ください。デフォルト値: false
|
(1) Auto Loaderはデフォルトではベストエフォートで以下のキーバリュータグのペアを追加します。
-
vendor
: Databricks -
path
: データがロードされるパス。GCPではラベルの制限のため利用不可。 -
checkpointLocation
: ストリームのチェックポイントのパス。GCPではラベルの制限のため利用不可。 -
streamId
: ストリームに対するグローバルでユニークなID。
これらのキーの名前は予約されており、これらの値を上書きしてはいけません。
AWS固有のオプション
オプション |
---|
cloudFiles.region タイプ: String ソースのS3バケットの存在するリージョンであり、ここにAWS SNSサービスとSQSサービスが作成されます。 デフォルト値: Databricksランタイム9.0以降ではEC2インスタンスのあるリージョンとなります。Databricksランタイム8.4以前ではリージョンを指定する必要があります。 |
loudFiles.useNotifications = true
を選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。
オプション |
---|
cloudFiles.queueUrl タイプ: String SQSキューのURLです。指定した場合、Auto Loaderは自身のAWS SNS、SQSサービスをセットアップするのではなく、このキューからのイベントを直接消費します。 デフォルト値: None |
IAMロールが利用できない、異なるクラウドからデータを取り込む際にAWS SNS、SQSにアクセスするために以下のオプションで認証情報を指定することができます。
オプション |
---|
cloudFiles.awsAccessKey タイプ: String ユーザーに対するAWSアクセスキーのIDです。 cloudFiles.awsSecretKey と共に指定する必要があります。デフォルト値: None |
cloudFiles.awsSecretKey タイプ: String ユーザーに対するAWSのシークレットキーです。 cloudFiles.awsAccessKey と共に指定する必要があります。デフォルト値: None |
cloudFiles.roleArn タイプ: String 移譲を受けるIAMロールのARNです。このロールはお使いのクラスターのインスタンスプロファイル、あるいは cloudFiles.awsAccessKey とcloudFiles.awsSecretKey による認証情報から移譲を受けることができます。デフォルト値: None |
cloudFiles.roleExternalId タイプ: String cloudFiles.roleArn を用いてロールの移譲を受ける際に指定するIDです。デフォルト値: None |
cloudFiles.roleSessionName タイプ: String cloudFiles.roleArn`を用いてロールの移譲を受ける際に指定するオプションのセッション名です。 デフォルト値: None |
cloudFiles.stsEndpoint タイプ: String cloudFiles.roleArn`を用いてロールの移譲を受ける際にAWS STSにアクセスする際に指定するオプションのエンドポイントです。 デフォルト値: None |
Azure固有のオプション
loudFiles.useNotifications = true
を選択し、Auto Loaderで使用する通知サービスをセットアップするようにするには、以下のオプションを全て指定する必要があります。
オプション |
---|
cloudFiles.clientId タイプ: String クライアントIDあるいはサービスプリンシパルのアプリケーションIDです。 デフォルト値: None |
cloudFiles.clientSecret タイプ: String サービスプリンシパルのクライアントシークレットです。 デフォルト値: None |
cloudFiles.connectionString タイプ: String アカウントアクセスキー、あるいは共有アクセスシグネチャ(SAS)に基づくストレージアカウントに対する接続文字列です。 デフォルト値: None |
cloudFiles.resourceGroup タイプ: String ストレージアカウントが作成されるAzureリソースグループです。 デフォルト値: None |
cloudFiles.subscriptionId タイプ: String リソースグループが作成されるAzureサブスクリプションIDです。 デフォルト値: None |
cloudFiles.tenantId タイプ: String サービスプリンシパルが作成されるAzureテナントIDです。 デフォルト値: None |
重要!
Databricksランタイム9.1以降で、Azure Chinaとガバメントリージョンでは自動通知セットアップを利用することができます。古いDBRのバージョンでは、これらのリージョンにおけるファイル通知でAuto Loaderを使うにはqueueName
を指定する必要があります。
loudFiles.useNotifications = true
を選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。
オプション |
---|
cloudFiles.queueName タイプ: String Azureキューの名称です。指定した場合、自身のAzure Event GridとQueue Storageサービスをセットアップするのではなく、クラウドファイルソースはこのキューからイベントを直接消費します。この場合、お使いの cloudFiles.connectionString ではキューに対する読み取り権限のみが必要となります。デフォルト値: None |
Google固有のオプション
注意
Google Cloudにおけるファイル通知モードはパブリックプレビューです。Databricks Runtime 9.1 LTS以降でサポートされています。
Googleサービスアカウントを使用することで、Auto Loaderは自動で通知サービスをセットアップすることができます。Google service setupに従うことでサービスアカウントの移譲を受けるようにクラスターを設定することができます。サービスアカウントで必要な権限は、ファイル通知リソースをセットアップする際に必要となるアクセス権で指定されます。そうでない場合、Auto loaderに通知サービスをセットアップさせたい場合には、以下のオプションで認証情報を指定することができます。
オプション |
---|
cloudFiles.client タイプ: String GoogleサービスアカウントのクライアントIDです。 デフォルト値: None |
cloudFiles.clientEmail タイプ: String Googleサービスアカウントのメールアドレスです。 デフォルト値: None |
cloudFiles.privateKey タイプ: String Googleサービスアカウントに対して生成されるプライベートキーです。 デフォルト値: None |
cloudFiles.privateKeyId タイプ: String Googleサービスアカウントに対して生成されるプライベートキーのIDです。 デフォルト値: None |
cloudFiles.projectId タイプ: String GCSバケットが存在するプロジェクトのIDです。Google Cloud Pub/Subのサブスクリプションもこのプロジェクト内に作成されます。 デフォルト値: None |
loudFiles.useNotifications = true
を選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。
オプション |
---|
cloudFiles.subscription タイプ: String Google Cloud Pub/Subサブスクリプションの名前です。指定した場合、自身のGCS通知とGoogle Cloud Pub/Subサービスをセットアップするのではなく、クラウドファイルソースはこのキューからイベントを消費します。 デフォルト値: None |
リファレンス
Auto Loaderの概要とデモンストレーションについては、こちらのYouTube動画(59分)をご覧ください。
Auto Loaderの使い方詳細については以下をご覧ください。
- Ingest CSV data with Auto Loader
- Ingest JSON data with Auto Loader
- Ingest Avro data with Auto Loader
- Ingest image data with Auto Loader
- Schema inference and evolution in Auto Loader
一般的なデータロードのパターン
globパターンを用いたディレクトリやファイルのフィルタリング
パスにglobパターンを指定することで、ディレクトリやファイルをフィルタリングすることができます。
パターン | 説明 |
---|---|
? |
任意の一文字にマッチ |
* |
0文字以上の文字列にマッチ |
[abc] |
文字列セット{a,b,c}の1文字にマッチ |
[a-z] |
文字列レンジ{a...z}の1文字にマッチ |
[^a] |
文字列セット、レンジ{a}以外の1文字にマッチ。左のカッコの直後に^を指定する必要があることに注意してください。 |
{ab,cd} |
文字列セット{ab,cd}の文字列にマッチ |
{ab,c{de, fh}} |
文字列セットの{ab, cde, cfh}の文字列にマッチ |
以下のようにプレフィックスのパターンを指定するのにpath
を使用します。
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base_path>/*/files")
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base_path>/*/files")
重要!
明示的にサフィックスのパターンを指定するにはpathGlobFilter
オプションを指定する必要があります。path
はプレフィックスのフィルターのみを提供します。
例えば、異なるサフィックスを持つファイルを含みディレクトリでpng
ファイルのみをパースしたい場合には、以下のように行います。
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base_path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base_path>)
FAQ
ファイルに追記、上書きされた際に再度Auto Loaderは処理を行いますか?
cloudFiles.allowOverwrites
を有効化しない限り、ファイルは1度だけ処理されます。ファイルに追記、上書きが行われた場合、Databricksはどのバージョンのファイルが処理されたのかを保証しません。動作の一貫性を保つためには、不変のファイルの取り込みのみにAuto Loaderを使用することをお勧めします。これが要件に合わない場合には、Databricks担当にお問い合わせください。
データファイルが継続的に到着する場合、例えば一日に一度というように定期的な周期で到着する場合、このソースを使い続けるべきでしょうか?メリットはありますか?
両方YESです。この場合、Trigger.Once
かTrigger.AvailableNow
(Databricksランタイム10.2以降で利用できます)の構造化ストリーミングジョブをセットアップし、予期されるファイルの到着時刻の後に実行するようにスケジュールします。Auto Loaderは頻度が高い、頻度の低いアップデート両方に対してうまく動作します。最終的なアップデートが非常に大規模な場合、Auto Loaderは入力サイズに応じて適切にスケールします。Auto Loaderの効率的なファイル検知技術とスキーマ進化の能力は、Auto Loaderをインクリメンタルデータ取り込みに対して頼りのある手段にしています。
ストリームを再起動する際にチェックポイントの場所を変更したら何が起きますか?
チェックポイントの場所ではストリームの重要な識別情報を保持しています。チェックポイントの場所を意図を持って変更することは、以前のストリームを破棄し、新たなストリームを開始することを意味します。
事前にイベント通知サービスを作成する必要がありますか?
いいえ。ファイル通知モードを選択し、必要なアクセス権を与えれば、Auto Loaderはファイル通知サービスを作成します。ファイル通知の活用をご覧ください。
Auto Loaderによって作成されたイベント通知のリソースをどのようにクリーンアップしますか?
リソースの一覧、ティアダウンにクラウドリソースマネージャを使用することができます。クラウドプロバイダーのUIやAPIを用いて手動でこれらのリソースを削除することができます。
同じバケット・コンテナーで異なる入力ディレクトリから複数のストリーミングクエリーを実行することができますか?
親子関係のディレクトリでない限り可能です。例えば、prod-logs/
とprod-logs/usage/
は親子関係があるため動作しません。
バケット・コンテナーに既存のファイル通知がある場合に、この機能を使うことができますか?
入力ディレクトリが既存の通知プレフィクスと競合しない限り(例えば、上述の親子関係のディレクトリ)可能です。
トラブルシュート
エラー
java.lang.RuntimeException: Failed to create event grid subscription.
最初にAuto Loaderを実行する際にこのエラーに遭遇した場合、お使いのAzureサブスクリプションでEvent Gridがリソースプロバイダーとして登録されていません。Azureポータルで登録を行うには以下の手順を踏みます。
- お使いのサブスクリプションに移動します。
- SettingセクションのResource Providersをクリックします。
- プロバイダー
Microsoft.EventGrid
を登録します。
エラー
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
最初にAuto Loaderを実行する際にこのエラーに遭遇した場合、Event Gridとストレージアカウントに対するサービスプリンシパルにContributorロールが与えられていることを確認してください。
付録
ファイル通知リソースをセットアップする際に必要となるアクセス権
ADLS Gen2とAzure Blob Storage
入力ディレクトリに対する読み取り権限が必要です。Azure Blob StorageとAzure Data Lake Storage Gen2をご覧ください。
ファイル通知モードを使うためには、イベント通知サービスをセットアップしアクセスするために認証情報を提供する必要があります。Databricksランタイム8.1以降では、認証のためのサービスプリンシパルが必要となります。Databricksランタイム8.0以前では、サービスプリンシパルと接続文字列の両方を指定する必要があります。
-
Azureビルトインロールを用いたサービスプリンシパル
クライアントIDとクライアントシークレットのフォーマットでAzure Active Directoryアプリケーションとサービスプリンシパルを作成します。
このアプリケーションに対して、入力パスが含まれるストレージアカウントに対する以下のロールを割り当てます。- Contributor: このロールは、キューやイベントサブスクリプションのようなストレージアカウントに対するリソースのセットアップに使用されます。
-
Storage Queue Data Contributor: このロールは、キューからのメッセージの取得、削除のようなキューのオペレーションに使用されます。このロールは、Databricksランタイム8.1以降で、接続文字列なしにサービスプリンシパルを指定する際に必要となります。
このアプリケーションに関連リソースグループに対する以下のロールを割り当てます。 - EventGrid EventSubscription Contributor: このロールは、イベントサブスクリプションの作成、一覧のようなイベントグリッドサブスクリプションのオペレーションに使用されます。
詳細は、Assign Azure roles using the Azure portalをご覧ください。
-
カスタムロールを用いたサービスプリンシパル
上述のロールで必要となる過剰なアクセス権に懸念がある場合、以下のようにAzureロールのJSONフォーマットで最低限のカスタムロールを作成することができます。
"permissions": [
{
"actions": [
"Microsoft.EventGrid/eventSubscriptions/write",
"Microsoft.EventGrid/eventSubscriptions/read",
"Microsoft.EventGrid/eventSubscriptions/delete",
"Microsoft.EventGrid/locations/eventSubscriptions/read",
"Microsoft.Storage/storageAccounts/read",
"Microsoft.Storage/storageAccounts/write",
"Microsoft.Storage/storageAccounts/queueServices/read",
"Microsoft.Storage/storageAccounts/queueServices/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/read",
"Microsoft.Storage/storageAccounts/queueServices/queues/delete"
],
"notActions": [],
"dataActions": [
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
"Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
],
"notDataActions": []
}
]
そして、このカスタムロールをアプリケーションに割り当てることができます。
詳細は、[Assign Azure roles using the Azure portal](https://docs.microsoft.com/azure/role-based-access-control/role-assignments-portal)を参照ください。
-
接続文字列
Auto Loaderでは、キューからメッセージを取得、削除、キューの作成のようなAzure Queue Storageに対する認証を行うために接続文字列が必要となります。入力ディレクトリパスが存在するのと同じストレージアカウントにキューが作成されます。お使いのアカウントキーや共有アクセスシグネチャ(SAS)で接続文字列を見つけることができます。SASトークンを設定する際、以下のアクセス権を指定します。
AWS S3
入力ディレクトリに対する読み取り権限が必要となります。詳細はS3接続の詳細をご覧ください。
ファイル通知モードを使うには、お使いのIAMユーザーやロールに以下のJSONポリシーをアタッチします。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
ここでは、
-
<bucket-name>
: 例えばauto-logs
のように、ストリームがファイルを読み込むS3バケット名を指定します。databricks-*-logs
のように、ワイルドカードとして*
を使うことができます。お使いのDBFSパスに対するS3バケットを見つけ出すには、%fs mounts
をノートブックで実行することで、全てのDBFSマウントポイントを一覧することができます。 -
<region>
:us-west-2
のようにS3バケットが存在するAWSリージョンを指定します。 -
<account-number>
: S3バケットを保有するAWSアカウントの番号です。例えば、123456789012
を指定します。アカウント番号を指定したくない場合には*
を使用します。
SQSとSNS ARNにおける文字列databricks-auto-ingest-*
の指定は、SQSとSNSサービスを作成する際にcloudFiles
ソースが使う名前のプレフィックスになります。Databricksはストリームの初回実行で通知サービスをセットアップするので、初回実行の後にポリシーの権限を縮小することができます(例えば、ストリームを停止して再起動)。
注意
上述のポリシーは、通知サービスであるS3バケット通知、SNS、SQSサービスのセットアップに必要となる権限のみに関するものであり、すでにS3バケットへの読み取りアクセス権は持っていることを前提としています。S3の読み取り専用権限を追加する必要がある場合には、JSONドキュメントのDatabricksAutoLoaderSetup
文のAction
リストに以下の内容を追加してください。
s3:ListBucket
s3:GetObject
初期セットアップ後のアクセス権の縮小
上述したリソースセットアップのアクセス権は、ストリームの初回実行にのみ必要となります。初回実行の後は権限を縮小した以下のIAMポリシーに切り替えることができます。
重要!
縮小された権限では、処理を失敗した際に新規ストリーミングクエリーを起動したり、リソースを再作成することはできません(例えば、誤ってSQSキューを削除した場合)。リソースを一覧したりティアダウンするために、クラウドリソースマネジメントAPIを使用することもできません。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:DeleteMessageBatch",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility",
"sqs:ChangeMessageVisibilityBatch"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
異なるAWSアカウントでセキュアにデータを取り込む
Auto LoaderはIAMロールの委任を受けることで、異なるAWSアカウントからデータをロードすることができます。AssumeRole
で作成された一時的なセキュリティクレディンシャルを設定した後は、クロスアカウントでAuto Loaderでクラウドファイルをロードすることができます。AWSクロスアカウントに対してAuto Loaderをセットアップするには、DatabricksにおけるAssumeRoleポリシーを用いたS3バケットに対するセキュアなクロスアカウントアクセスをご覧ください。以下のことを確認してください。
- AssumeRoleメタロールがクラスターに割り当てられていることを確認。
- 以下のプロパティを含むようにクラスターのSpark設定をセット。
fs.s3a.credentialsType AssumeRole
fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
fs.s3a.acl.default BucketOwnerFullControl
GCS
GCSバケット及びバケット上の全てのオブジェクトに対するlist
、get
の権限が必要です。詳細はGoogleのドキュメントのIAM permissionsをご覧ください。
ファイル通知モードを使うには、GCSサービスアカウントとGoogle Cloud Pub/Subのリソースにアクセスする際に使用されるアカウントに権限を追加する必要があります。
Google Cloud Pub/Subリソースに使用されるサービスアカウントに対しては、以下の権限を追加する必要があります。
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
これを行うためには、これらの権限を持つIAMカスタムロールを作成するか、これらの権限をカバーする既存GPCロールを割り当てます。
GCSサービスアカウントの特定
対応するプロジェクトのGoogle Cloud Consoleで、Cloud Storage > Settings
に移動します。このページで、GCSサービスアカウントのメールアドレスを含む「Cloud Storage Service Account」というタイトルのセクションを見つけることができます。
ファイル通知モード向けのカスタムGoogle Cloud IAMロールの作成
対応するプロジェクトのGoogle Cloud Consoleで、IAM & Admin > Roles
に移動します。次に、画面上部でロールを作成するか、既存ロールを更新します。ロールの作成、編集画面では、Add Permissions
をクリックします。ロールに対して権限を追加することができるメニューがポップアップします。
ファイルの名前順
名前順で並び替えられるファイルに対しては、アップロードされる新規ファイルは、既存ファイルよりも大きい名前順を持つプレフィックスが必要となります。以下に、名前順で並び替えられるディレクトリの例を示します。
バージョン管理されるファイル
Delta Lakeのテーブルは名前順でトランザクションログをコミットします。
<path_to_table>/_delta_log/00000000000000000000.json
<path_to_table>/_delta_log/00000000000000000001.json <- guaranteed to be written after version 0
<path_to_table>/_delta_log/00000000000000000002.json <- guaranteed to be written after version 1
...
AWS DMSはバージョン管理された形でAWS S3にCDCファイルをアップロードします。
database_schema_name/table_name/LOAD00000001.csv
database_schema_name/table_name/LOAD00000002.csv
...
日付でパーティションが作成されたファイル
日付でパーティショニングされた形でファイルをアップロードすることができ、インクリメンタル一覧を活用することができます。以下に例を示します。
// <base_path>/yyyy/MM/dd/HH:mm:ss-randomString
<base_path>/2021/12/01/10:11:23-b1662ecd-e05e-4bb7-a125-ad81f6e859b4.json
<base_path>/2021/12/01/10:11:23-b9794cf3-3f60-4b8d-ae11-8ea320fad9d1.json
...
// <base_path>/year=yyyy/month=MM/day=dd/hour=HH/minute=mm/randomString
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/442463e5-f6fe-458a-8f69-a06aa970fc69.csv
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/8f00988b-46be-4112-808d-6a35aead0d44.csv <- this may be uploaded before the file above as long as processing happens less frequently than a minute
日付のパーティショニングでファイルがアップロードされた際、いくつか注意すべきことがあります。
- 月、日、時、分は名前順を保証するためにゼロによる左側のパディングが必要です(
hour=3
ではなくhour=03
、2021/5/3
ではなく2021/05/03
)。 - 親ディレクトリにおける処理頻度よりも子供のディレクトリの処理頻度が少ない場合は、必ずしもファイルが名前順でアップロードされる必要はありません。
ファイルを日付のパーティショニングの名前順でアップロードできるサービスには以下のようなものがあります。
- Azure Data Factoryでは名前順でファイルをアップロードするように設定することができます。こちらのサンプルをご覧ください。
- Kinesis Firehose
フォーマットオプション
一般的なオプション
以下のオプションは全てのフォーマットに適用されます。
オプション |
---|
modifiedAfter タイプ: Timestamp String 例えば、2021-01-01 00:00:00.000000 UTC+0 指定されたタイムスタンプ以降の修正時のタイムスタンプを持つファイルを取り込むオプションの値です。 デフォルト値: None |
modifiedBefore タイプ: Timestamp String 例えば、2021-01-01 00:00:00.000000 UTC+0 指定されたタイムスタンプ以前の修正時のタイムスタンプを持つファイルを取り込むオプションの値です。 デフォルト値: None |
pathGlobFilter タイプ: String ファイルを選択するためのglobパターンを指定します。 COPY INTO におけるPATTERN と同じものです。デフォルト値: None |
recursiveFileLookup タイプ: Boolean ベースディレクトリ内のデータを再起的にロードし、パーティション推論をスキップするかどうかを指定します。 デフォルト値: false
|
JSON
オプション
オプション |
---|
allowBackslashEscapingAnyCharacter タイプ: Boolean 直後の文字をエスケープするためにバックスラッシュを許可するかどうかを指定します。無効化されている場合、JSON仕様で明示的に一覧されている文字のみがエスケープされます。 デフォルト値: false
|
allowComments タイプ: Boolean パースされたコンテンツ内でJava, C, C++スタイルのコメント( '/' , '*' , '//' )を許可するかどうかを指定します。デフォルト値: false
|
allowNonNumericNumbers タイプ: Boolean not-a-number( NaN )を適切な浮動小数点の値として許可するかどうかを指定します。デフォルト値: true
|
allowNumericLeadingZeros タイプ: Boolean ゼロで始まる整数値(00001など)を許可するかどうかを指定します。 デフォルト値: false
|
allowSingleQuotes タイプ: Boolean 文字列を引用(名前や文字列の値)するためにシングルクオート(アポストロフィ、 '\' )を許可するかどうかを指定します。デフォルト値: true
|
allowUnquotedControlChars タイプ: Boolean JSON文字列の中にエスケープされていない制御文字(32より小さい値のASCII文字、タブ、ラインフィード文字を含む)を含めて良いかどうかを指定します。 デフォルト値: false
|
allowUnquotedFieldNames タイプ: Boolean クオートされていないフィールド名(JavaScriptでは許可されますが、JSON仕様では許可されません)の利用を許可するかどうかを指定します。 デフォルト値: false
|
badRecordsPath タイプ: String 不正なJSONレコードに関する情報を記録するファイルのパスです。 デフォルト値: None |
columnNameOfCorruptRecord タイプ: String 不正でパースできないレコードを格納するカラム名です。パーシングの mode がDROPMALFORMED の場合、このカラムは空になります。デフォルト値: _corrupt_record
|
dateFormat タイプ: String 日付文字列をパースするフォーマットです。 デフォルト値: yyyy-MM-dd
|
dropFieldIfAllNull タイプ: Boolean スキーマ推定の際に、全てがnull値のカラム、空の配列、空のstructsのカラムを無視するかどうかを指定します。 デフォルト値: false
|
encodingあるいはcharset タイプ: String JSONファイルのエンコーディング名を指定します。オプションについては java.nio.charset.Charset をご覧ください。multiline がtrue の場合、UTF-16 やUTF-32 を使うことはできません。デフォルト値: UTF-8
|
inferTimestamp タイプ: Boolean タイムスタンプ文字列を TimestampType として推定するかどうかを指定します。true に設定すると、スキーマ推定の処理時間が長くなります。デフォルト値: false
|
lineSep タイプ: String 二つの連続するJSONレコードの区切り文字です。 デフォルト値: None、 \r , \r\n , \n をカバーします。 |
locale タイプ: String java.util.Locale のIDです。JSONのパーシングする際に、デフォルトの日付、タイムスタンプ、数値に影響を与えます。デフォルト値: US
|
mode タイプ: String 不正レコードのハンドリングにおけるパーサーのモードです。 'PERMISSIVE' , 'DROPMALFORMED' , 'FAILFAST' のいずれかとなります。デフォルト値: PERMISSIVE
|
multiLine タイプ: Boolean JSONレコードが複数行にわたるかどうかを指定します。 デフォルト値: false
|
prefersDecimal タイプ: Boolean スキーマ推定の際、floatやdoubleを DecimalType として推定するかどうかを指定します。デフォルト値: false
|
primitivesAsString タイプ: Boolean 数字やbooleanのようなプリミティブ型を StringType として推定するかどうかを指定します。デフォルト値: false
|
rescuedDataColumn タイプ: String データ型の不一致やスキーマミスマッチ(カラム名の大文字小文字を含む)でパーシングできなかった全てのデータを別のカラムで収集するかどうかを指定します。Auto Loaderを使用する際には、このカラムは自動で含まれます。詳細はRescued data columnを参照ください。 デフォルト値: None |
timestampFormat タイプ: String パースするタイムスタンプ文字列のフォーマットです。 デフォルト値: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
|
timeZone タイプ: String タイムスタンプと日付をパーシングする際に使用する java.time.ZoneId となります。デフォルト値: None |
CSV
オプション
オプション |
---|
badRecordsPath タイプ: String 不正なCSVレコードに関する情報を記録するファイルのパスです。 デフォルト値: None |
charToEscapeQuoteEscaping タイプ: Char クオートをエスケープするために使用する文字です。例えばレコード [ " a\\", b ] に対して
デフォルト値: '\0'
|
columnNameOfCorruptRecord タイプ: String 不正でパースできないレコードを格納するカラム名です。パーシングの mode がDROPMALFORMED の場合、このカラムは空になります。デフォルト値: _corrupt_record
|
comment タイプ: Char テキストの行の最初にある場合に行コメントと表現する文字を定義します。コメントのスキップを無効化するには '\0' を使用します。デフォルト値: '#'
|
dateFormat タイプ: String 日付文字列をパースするフォーマットです。 デフォルト値: yyyy-MM-dd
|
emptyValue タイプ: String 空の値を表現する文字列です。 デフォルト値: ""
|
encodingあるいはcharset タイプ: String CSVファイルのエンコーディング名を指定します。オプションについては java.nio.charset.Charset をご覧ください。multiline がtrue の場合、UTF-16 やUTF-32 を使うことはできません。デフォルト値: UTF-8
|
enforceSchema タイプ: Boolean CSVファイルに対して指定された、推定されたスキーマの適用を強制するかどうかを指定します。オプションが有効化された場合、CSVファイルのヘッダーは無視されます。Auto Loaderを使用する際、データを救助しスキーマ進化を実現するために、デフォルトでこのオプションは無視されます。 デフォルト値: true
|
escape タイプ: Char データをパーシングする際に使用するエスケープ文字です。 デフォルト値: '\'
|
header タイプ: Boolean CSVファイルにヘッダーがあるかどうかを指定します。スキーマを推定する際、Auto Loaderはファイルにヘッダーがあることを前提にします。 デフォルト値: false
|
ignoreLeadingWhiteSpace タイプ: Boolean パースされた値それぞれの頭の空白を無視するかどうかを指定します。 デフォルト値: false
|
ignoreTrailingWhiteSpace タイプ: Boolean パースされた値それぞれの後ろの空白を無視するかどうかを指定します。 デフォルト値: false
|
inferSchema タイプ: Boolean パースされたCSVレコードのデータ型を推定するか、全てのカラムが StringType であると仮定するかどうかを指定します。true に設定すると、データに対する追加の処理が必要となります。デフォルト値: false
|
lineSep タイプ: String 二つの連続するCSVレコードの区切り文字です。 デフォルト値: None、 \r , \r\n , \n をカバーします。 |
locale タイプ: String java.util.Locale のIDです。JSONのパーシングする際に、デフォルトの日付、タイムスタンプ、数値に影響を与えます。デフォルト値: US
|
maxCharsPerColumn タイプ: Int パースする値の最大文字数です。メモリーエラーの回避に使うことができます。デフォルトは -1 であり、無制限となります。デフォルト値: -1
|
maxColumns タイプ: Int レコードあたりの最大列数のハードリミットです。 デフォルト値: 20480
|
mergeSchema タイプ: Boolean 複数ファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。Auto Loaderでスキーマ推定を行う際にはデフォルトで有効化されます。 デフォルト値: false
|
mode タイプ: String 不正レコードのハンドリングにおけるパーサーのモードです。 'PERMISSIVE' , 'DROPMALFORMED' , 'FAILFAST' のいずれかとなります。デフォルト値: PERMISSIVE
|
multiLine タイプ: Boolean CSVレコードが複数行にわたるかどうかを指定します。 デフォルト値: false
|
nanValue タイプ: String FloatType やDoubleType をパーシングする際のnon-a-numberの値の文字列表現を指定します。デフォルト値: "NaN"
|
negativeInf タイプ: String FloatType やDoubleType をパーシングする際の負の無限大の文字列表現を指定します。デフォルト値: "-Inf"
|
nullValue タイプ: String ヌル値を示す文字列です。 デフォルト値: ""
|
parserCaseSensitive(非推奨) タイプ: Boolean ファイルを読み込んでいる間にヘッダーで宣言されているカラムをスキーマに割り当てる際に、大文字小文字を区別するかどうかを指定します。有効化されている場合、大文字小文字が異なるカラムは rescuedDataColumn にレスキューされます。このオプションはreaderCaseSensitive によって非推奨となりました。デフォルト値: false
|
positiveInf タイプ: String FloatType やDoubleType をパーシングする際の正の無限大の文字列表現を指定します。デフォルト値: "Inf"
|
quote タイプ: Char 値にフィールドの区切り文字が含まれている際に使用されるエスケープ文字です。 デフォルト値: '\'
|
rescuedDataColumn タイプ: String データ型の不一致やスキーマミスマッチ(カラム名の大文字小文字を含む)でパーシングできなかった全てのデータを別のカラムで収集するかどうかを指定します。Auto Loaderを使用する際には、このカラムは自動で含まれます。詳細はRescued data columnを参照ください。 デフォルト値: None |
sepあるいはdelimiter タイプ: String カラムの区切り文字です。 デフォルト値: ","
|
timestampFormat タイプ: String パースするタイムスタンプ文字列のフォーマットです。 デフォルト値: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
|
timeZone タイプ: String タイムスタンプと日付をパーシングする際に使用する java.time.ZoneId となります。デフォルト値: None |
unescapedQuoteHandling タイプ: String エスケープされていないクオート(引用符)の取り扱い戦略を指定します。オプションは以下の通りとなります。
デフォルト値: STOP_AT_DELIMITER
|
PARQUET
オプション
オプション |
---|
datetimeRebaseMode タイプ: String JulianとProleptic Gregorianカレンダーの間のDATEとTIMESTAMPのリベースを制御します。 EXCEPTION , LEGACY , CORRECTED を指定できます。デフォルト値: LEGACY
|
int96RebaseMode タイプ: String JulianとProleptic Gregorianカレンダーの間のINT96のリベースを制御します。 EXCEPTION , LEGACY , CORRECTED を指定できます。デフォルト値: LEGACY
|
mergeSchema タイプ: Boolean 複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。 デフォルト値: false
|
AVRO
オプション
オプション |
---|
avroSchema タイプ: String ユーザーがAvroフォーマットで指定できるオプションのスキーマです。Avroを読み込んでいる際、このオプションに実際のAvroのスキーマと違うが互換性のある進化スキーマを設定することができます。 デフォルト値: None |
datetimeRebaseMode タイプ: String JulianとProleptic Gregorianカレンダーの間のDATEとTIMESTAMPのリベースを制御します。 EXCEPTION , LEGACY , CORRECTED を指定できます。デフォルト値: LEGACY
|
mergeSchema タイプ: Boolean 複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。 デフォルト値: false
|
BINARYFILE
オプション
バイナリーファイルには追加の設定オプションはありません。
TEXT
オプション
オプション |
---|
encoding タイプ: String CSVファイルのエンコーディング名を指定します。オプションについては java.nio.charset.Charset をご覧ください。デフォルト値: UTF-8
|
lineSep タイプ: String 二つの連続するテキストレコードの区切り文字です。 デフォルト値: None、 \r , \r\n , \n をカバーします。 |
wholeText タイプ: Boolean ファイルを単一のレコードとして読み込むかどうかを指定します。 デフォルト値: false
|
ORC
オプション
オプション |
---|
mergeSchema タイプ: Boolean 複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。 デフォルト値: false
|