11
9

More than 1 year has passed since last update.

DatabricksのAuto Loader

Last updated at Posted at 2021-12-21

Auto Loader | Databricks on AWS [2022/4/19時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Auto Loaderはクラウドストレージに到着する新規データファイルをインクリメンタルかつ効率的に処理します。Auto LoaderはAWS S3 (s3://), Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://)、Google Cloud Storage (GCS, gs://)、Azure Blob Storage (wasbs://)、ADLS Gen1 (adl://)、Databricksファイルシステム (DBFS, dbfs:/)からデータファイルをロードすることができます。Auto Loaderでは、JSONCSVPARQUETAVROORCTEXTBINARYFILEファイルフォーマットを取り込むことができます。

Auto LoaderはcloudFilesという構造化ストリーミングソースを提供します。クラウドファイルストレージの入力ディレクトリパスを提供することで、cloudFilesソースは、到着するファイル、そして、オプションによっては既存ファイルも自動で処理します。

Auto Loaderは、一時間あたり数百万のファイルがロードされるパイプラインで、数十億のファイルをバックフィルする必要があるようなストレージアカウントからデータをロードできるようにスケールすることができます。

Auto Loaderの動作原理

Auto Loaderでは新規ファイルを検知するために2つのモードをサポートします: ディレクトリ一覧とファイル通知です。

  • ディレクトリ一覧: Auto Loaderは入力ディレクトリの一覧を生成することで新規ファイルを識別します。ディレクトリ一覧モードを用いることで、クラウドストレージのデータにアクセスする以外のアクセス権設定なしに、Auto Loaderのストリームをクイックに起動することができます。Databricksランタイム9.1以降では、お使いのクラウドストレージの語順でファイルが到着しているかどうかを自動で検知し、新規ファイルを検知するのに必要なAPI呼び出しの数を劇的に削減します。詳細はインクリメンタルな一覧を参照ください。
  • ファイル通知: Auto Loaderでは通知サービスと入力ディレクトリのファイルイベントをサブスクライブするキューサービスを自動的にセットアップすることができます。ファイル通知モードはより高性能であり、大規模な入力ディレクトリや大規模ファイルにスケールすることができますが、セットアップには追加のアクセス権が必要となります。詳細はファイル通知の活用をご覧ください。

これらのモードの可用性を以下に示します。

クラウドストレージ ディレクトリ一覧 ファイル通知
AWS S3 全バージョン 全バージョン
ADLS Gen2 全バージョン 全バージョン
GCS 全バージョン Databricksランタイム9.1以降
Azure Blob Storage 全バージョン 全バージョン
ADLS Gen1 Databricksランタイム7.3以降 未サポート
DBFS 全バージョン マウントポイントのみ

ファイルが検知されると、お使いのAuto Loaderパイプラインのチェックポイントの格納場所のスケーラブルなキーバリューストア(RocksDB)にそれらのメタデータが永続化されます。このキーバリューストアによって、確実に一度のみ処理(exactly-once)が行われることを保証します。ストリームを再起動する合間にファイル検知モードを切り替えることができ、依然として一度のみのデータ処理を保証することができます。実際、このようにして、Auto Loaderは既存ファイルを含むディレクトリのバックフィルの実行と、ファイル通知を通じて検知される新規ファイルの継続的処理の両方が可能となります。

処理の失敗時には、チェックポイントに格納されている情報を用いてAuto Loaderはその地点から処理を再開することができ、Delta Lakeに対してデータ書き込みを行う際に一度のみ書き込まれることを確実にし続けます。耐障害性や一度のみ実行するせまんティクスを実現するために、ご自身で状態を維持管理する必要はありません。

いつCOPY INTOを使用し、いつAuto Loaderを使用するのか

COPY INTOコマンドは、Delta Lakeに対する一度のみの書き込み保証を行うために、インクリメンタルにデータをロードするためのもう一つの便利な方法です。Auto LoaderかCOPY INTOを選択する際の検討事項をいくつか示します。

  • 数千規模のファイルを取り込むのであれば、COPY INTOを使う方が望ましいです。長い間にわたる数百万のファイルが予想されるのであれば、Auto Loaderを使ってください。Auto LoaderはCOPY INTOよりもコスト低くファイルを検知でき、処理を複数のバッチに分割することができます。
  • 頻繁にデータのスキーマが変更されるのであれば、Auto Loaderはスキーマ推定、スキーマ進化に関わる優れた機能を提供します。詳細はスキーマ推定とスキーマ進化をご覧ください。
  • 再度アップロードされたファイルのサブセットのロードは、COPY INTOで管理した方が若干簡単になる場合があります。Auto Loaderでは、ファイルのサブセットをを再処理することが困難です。しかし、Auto Loaderのストリームが稼働しているのと同時に、ファイルのサブセットをリロードするためにCOPY INTOを使用することができます。

Apache SparkのFileStreamSourceに対するメリット

Apache Sparkでは、spark.readStream.format(fileFormat).load(directory)を用いてファイルをインクリメンタルに読み込むことができます。Auto Loaderはファイルソースと比べて以下のメリットがあります。

  • スケーラビリティ: Auto Loaderは数十億のファイルを効率的に検知することができます。計算リソースを無駄にすることなしに、非同期でバックフィルを実行することができます。
  • パフォーマンス: Auto Loaderによるファイル検知のコストは、ファイルが到着するであろうディレクトリの数ではなく、取り込まれるファイルの数でスケールします。最適化されたディレクトリ一覧をご覧ください。
  • スキーマ推定とスキーマ進化のサポート: Auto Loaderはスキーマのドリフトを検知することができ、スキーマの変更が起きた際に通知を行い、無視されるか失われるデータを救助します。スキーマ推定とスキーマ進化をご覧ください。
  • コスト: Auto Loaderはストレージに存在するファイルの一覧を取得するためにネイティブのクラウドAPIを使用します。さらに、Auto Loaderのファイル通知モードは、ディレクトリ一覧を完全に回避するのでさらにクラウドコストの削減に役立ちます。Auto Loaderはファイル検知をより安価にできるように、ストレージ上のファイル通知を自動でセットアップすることができます。

クイックスタート

以下のコードサンプルでは、クラウドストレージに到着する新規データファイルをAuto Loaderがどのように検知するのかをデモします。

  1. ファイルアップロード用のディレクトリを作成します。

    Python
    user_dir = '<my-name>@<my-organization.com>'
    upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    dbutils.fs.mkdirs(upload_path)
    
    Scala
    val user_dir = "<my-name>@<my-organization.com>"
    val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"
    dbutils.fs.mkdirs(upload_path)
    
  2. 以下のサンプルCSVファイルを作成し、DBFSファイルブラウザを用いてディレクトリにファイルをアップロードします。

    WA.csv:

    city,year,population
    Seattle metro,2019,3406000
    Seattle metro,2020,3433000
    

    OR.csv:

    city,year,population
    Portland metro,2019,2127000
    Portland metro,2020,2151000
    
  3. Auto Loaderを起動するために以下のコードを実行します。

    Python
    checkpoint_path = '/tmp/delta/population_data/_checkpoints'
    write_path = '/tmp/delta/population_data'
    
    # Set up the stream to begin reading incoming files from the
    # upload_path location.
    df = spark.readStream.format('cloudFiles') \
      .option('cloudFiles.format', 'csv') \
      .option('header', 'true') \
      .schema('city string, year int, population long') \
      .load(upload_path)
    
        # Start the stream.
    # Use the checkpoint_path location to keep a record of all files that
    # have already been uploaded to the upload_path location.
    # For those that have been uploaded since the last check,
    # write the newly-uploaded files' data to the write_path location.
    df.writeStream.format('delta') \
      .option('checkpointLocation', checkpoint_path) \
      .start(write_path)
    
    Scala
    val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
    val write_path = "/tmp/delta/population_data"
    
    // Set up the stream to begin reading incoming files from the
    // upload_path location.
    val df = spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("header", "true")
      .schema("city string, year int, population long")
      .load(upload_path)
    
    // Start the stream.
    // Use the checkpoint_path location to keep a record of all files that
    // have already been uploaded to the upload_path location.
    // For those that have been uploaded since the last check,
    // write the newly-uploaded files' data to the write_path location.
    df.writeStream.format("delta")
      .option("checkpointLocation", checkpoint_path)
      .start(write_path)
    
  4. ステップ3のコードが実行している状態で、以下のコードでディレクトリに書き込まれたデータに対するクエリーを実行します。

    Python
    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    '''
    
    Scala
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    */
    
  5. ステップ3のコードが実行している状態で、以下の追加のCSVファイルを作成し、DBFSファイルブラウザを用いてディレクトリにファイルをアップロードします。

    ID.csv:

    city,year,population
    Boise,2019,438000
    Boise,2020,447000
    

    MT.csv:

    city,year,population
    Helena,2019,81653
    Helena,2020,82590
    

    Misc.csv:

    city,year,population
    Seattle metro,2021,3461000
    Portland metro,2021,2174000
    Boise,2021,455000
    Helena,2021,81653
    
  6. ステップ3のコードが実行している状態で、以下のコードでディレクトリに書き込まれたデータ、そして、アップロード用ディレクトリでAuto Loaderが検知し、書き込み用ディレクトリに書き込まれたファイルからの新規データに対するクエリーを実行します。

    Python
    df_population = spark.read.format('delta').load(write_path)
    
    display(df_population)
    
    '''
    Result:
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    '''
    
    Scala
    val df_population = spark.read.format("delta").load(write_path)
    
    display(df_population)
    
    /* Result
    +----------------+------+------------+
    | city           | year | population |
    +================+======+============+
    | Seattle metro  | 2019 | 3406000    |
    +----------------+------+------------+
    | Seattle metro  | 2020 | 3433000    |
    +----------------+------+------------+
    | Helena         | 2019 | 81653      |
    +----------------+------+------------+
    | Helena         | 2020 | 82590      |
    +----------------+------+------------+
    | Boise          | 2019 | 438000     |
    +----------------+------+------------+
    | Boise          | 2020 | 447000     |
    +----------------+------+------------+
    | Portland metro | 2019 | 2127000    |
    +----------------+------+------------+
    | Portland metro | 2020 | 2151000    |
    +----------------+------+------------+
    | Seattle metro  | 2021 | 3461000    |
    +----------------+------+------------+
    | Portland metro | 2021 | 2174000    |
    +----------------+------+------------+
    | Boise          | 2021 | 455000     |
    +----------------+------+------------+
    | Helena         | 2021 | 81653      |
    +----------------+------+------------+
    */
    
  7. クリーンアップするために、ステップ3の実行中のコードをキャンセルし、以下のコードを実行し、アップロード、チェックポイント、書き込み用ディレクトリを削除します。

    Python
    dbutils.fs.rm(write_path, True)
    dbutils.fs.rm(upload_path, True)
    
    Scala
    dbutils.fs.rm(write_path, true)
    dbutils.fs.rm(upload_path, true)
    

チュートリアル:Auto LoaderによるDelta Lakeへの継続的データ取り込みもご覧ください。

スキーマ推定とスキーマ進化

注意
この機能はDatabricksランタイム8.2以降で利用できます。

Auto LoaderではCSV, JSON, バイナリー(binaryFile), テキストファイルフォーマットのスキーマ推定、スキーマ進化にサポートしています。詳細はSchema inference and evolution in Auto Loaderをご覧ください。

大規模データに対するAuto Loaderのスケーリング

Trigger.AvailableNowとレート制限の使用

注意
この機能はDatabricksランタイム10.1以降のScalaで利用できます。
この機能はDatabricksランタイム10.2以降のPythonとScalaで利用できます。

Auto LoaderはTrigger.AvailableNowを用いることで、バッチジョブとしてDatabricksジョブでスケジュールすることができます。AvailableNowトリガーは、クエリーの開始時刻の前に到着した全てのファイルを処理するようにAuto Loaderに指示します。ストリームが開始した後にアップロードされた新規ファイルは、次のトリガーまで無視されます。

Trigger.AvailableNowを用いることで、ファイル検知はデータ処理と非同期で実行され、データは例と制限を用いた複数のマイクロバッチで処理できるようになります。デフォルトではAuto Loaderはマイクロバッチごとに最大1000ファイルを処理します。マイクロバッチでいくつのファイルを処理するのか、何バイトを処理するのかを、cloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTriggerで設定することができます。ファイルの制限はハードリミットですが、バイト数の制限はソフトリミットであり、maxBytesPerTrigger以上のバイト数が処理される場合があります。両方のオプションが設定された際には、Auto Loaderはこれらの制限に到達するまで可能な限り多くのファイルを処理します。

最適化されたディレクトリ一覧

注意
この機能はDatabricksランタイム9.0以降で利用できます。

Auto Loaderは、代替手段よりも効率的なディレクトリ一覧を用いることで、クラウドストレージシステムのファイルを検知することができます。例えば、/some/path/YYYY/MM/DD/HH/fileNameというように、5分間隔でファイルをアップロードしている場合、これらのディレクトリの全てのファイルを検索するためにApache Sparkのファイルソースは並列で全てのサブディレクトリの一覧を作成するので、ストレージに対して 1 (ベースディレクトリ) + 365 (日毎) * 24 (時間ごと) = 8761のLIST API呼び出しを行います。Auto Loaderはストレージからフラット化された結果を受け取ることで、APIの呼び出し数をストレージ上のファイル数 / それぞれのAPI呼び出しで返却される結果の数(S3は1000、ADLS Gen2は5000、GCSは1024)に削減し、お使いのクラウドコストを劇的に削減します。

インクリメンタルな一覧

注意
本機能はDatabricksランタイム9.1 LTS以降で利用できます。

名前順に作成されたファイルに対しては、Auto Loaderはファイル名の語順とディレクトリ全体の内容の一覧を作成するのではなく、最近追加されたファイルを一覧することでディレクトリ一覧の効率を改善する最適化一覧APIを活用します。

デフォルトでは、Auto Loaderは、以前に完了したディレクトリ一覧との比較とチェックを行うことで、特定のディレクトリにインクリメンタルな一覧を適用できるのかを自動で検知します。autoモードでのデータの結果的完全性を保証するために、Auto Loaderは7回連続のインクリメンタル一覧を行なった後に完全なディレクトリ一覧を自動でトリガーします。完全ディレクトリ一覧の頻度はcloudFiles.backfillIntervalで制御でき、指定された周期で非同期のバックフィルをトリガーすることができます。

cloudFiles.useIncrementalListing"true""false"(デフォルトは"auto")に設定することで、インクリメンタルな一覧を明示的に有効、無効化することができます。明示的に有効化されると、Auto Loaderはバックフィル周期が指定されるまで完全ディレクトリ一覧をトリガーしません。AWS Kinesis Firehose、AWS DMS、Azure Data Factoryのようなサービスは、ストレージシステムに語順でファイルをアップロードするように設定できるサービスです。語順ディレクトリの構造のサンプルに関しては付録を参照ください。

ファイル通知の活用

バケットに対して名前順でファイルが到着しない場合、Auto Loaderが時間当たり数百万のファイルを取り込めるようにスケールできるようにファイル通知を活用することができます。cloudFiles.useNotificationstrueに設定し、クラウドリソースを作成するのに必要な権限を与えることで、Auto Loaderは自動でファイル通知をセットアップすることができます。さらに、これらのリソースを作成するためにAuto Loaderの認証情報を提供するために以下の追加オプションが必要になる場合があります。以下の表ではAuto Loaderによってどのリソースが作成されるのかをまとめています。

クラウドストレージ サブスクリプションサービス キューサービス プレフィックス(1) 制限(2)
AWS S3 AWS SNS AWS SQS databricks-auto-ingest S3バケットあたり100
ADLS Gen2 Azure Event Grid Azure Queue Storage databricks ストレージアカウントあたり500
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingest GCSバケットあたり100
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks ストレージアカウントあたり500
  1. Auto Loaderがこのプロフィックスでリソースに名前をつけます。
  2. ファイル通知パイプラインの同時起動数です。

ファイル通知サービスを作成するのに必要なアクセス権をAuto Loaderに指定できない場合、次のセクションにあるようにファイル通知サービスを作成するためのDatabricks ScalaノートブックでsetUpNotificationServicesメソッドを使用するように皆様のクラウド管理者に依頼することができます。あるいは、クラウド管理者が手動でファイル通知サービスをセットアップし、ファイル通知を活用できるようにキューのIDを教えてもらうことができます。詳細については、ファイル通知のオプションをご覧ください。

任意のタイミングでファイル通知とディレクトリ一覧を切り替えることができ、依然として一度のみの処理を保証することができます。

注意
クラウドプロバイダーは、非常に稀ば状況においては全てのファイルイベントの100%のデリバリーを保証せず、ファイルイベントのレーテンシーに対する厳密なSLAを提供しません。データの完全性の要件があり、全てのファイルが特定のSLAで検知されることを保証するために、cloudFiles.backfillIntervalオプションを用いて、Auto Loaderによる定期的なバックフィルを行うことをお勧めします。定期的なバックフィルの起動は重複を引き起こしません。

特定のストレージアカウントに対する制限値以上に、ファイル通知パイプラインを実行する必要がある場合には、以下の方法が考えられます。

  • ファイル通知ではなくインクリメンタルな一覧を活用できるように、ファイルのアップロード方法を再度設計し直すことを考えます。
  • コンテナ全体あるいはディレクトリ固有のキューに対するバケットをサブスクライブする単一のキューからの通知を分散させるAWS LambdaやAzure Functions、Google Cloud Functionsを活用します。

ファイル通知のイベント

AWS S3では、putかマルチパートアップロードでアップロードされたのかに関係なく、ファイルがS3バケットにアップロードされた際にはObjectCreatedイベントを生成します。

ADLS Gen2は、お使いのGen2コンテナにファイルが出現すると異なるイベント通知を行います。

  • ファイルを処理する際、Auto LoaderはFlushWithCloseイベントをリッスンします。
  • Databricksランタイム8.3以降で作成されたAuto Loaderのストリームはファイルの検知のためにRenameFileアクションをサポートしています。RenameFileアクションは、リネームされたファイルのサイズを取得するためにストレージシステムに対するAPIリクエストを必要とします。
  • Databricksランタイム9.0以降で作成されたAuto Loaderのストリームはファイルの検知のためにRenameDirectoryアクションをサポートしています。RenameDirectoryアクションは、リネームされたディレクトリの内容一覧を取得するためにストレージシステムに対するAPIリクエストを必要とします。

Google Cloud Storageはファイルがアップロードされた際にOBJECT_FINALIZEイベントを生成します。これには上書きとファイルコピーが含まれます。失敗したアップロードではこのイベントは生成されません。

ファイル通知リソースの管理

Auto Loaderによって作成された通知サービス、キューサービスを管理するためにScala APIを使用することができます。このAPIを使用する前にアクセス権で説明されている権限をセットアップするようにリソースを設定する必要があります。

Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

<prefix>-<resource-suffix>の名前のキューとサブスクリプションを作成するためにsetUpNotificationServices(<resource-suffix>)を使用します(プレフィックスはファイル通知のオプションにまとめているようにストレージシステムに依存します)。これによって、cloudFilesソースのユーザーは、リソースを作成するよりも少ない権限を持つことになります。アクセス権をご覧ください。

setUpNotificationServicesを呼び出す際にのみnewManager"path"オプションを指定します。listNotificationServicestearDownNotificationServicesでは不要です。これは、ストリーミングクエリーを実行するのと同じpathとなります。

クラウドストレージ セットアップAPI 一覧API ティアダウンAPI
AWS S3 全バージョン 全バージョン 全バージョン
ADLS Gen2 全バージョン 全バージョン 全バージョン
GCS Databricksランタイム9.1以降 Databricksランタイム9.1以降 Databricksランタイム9.1以降
Azure Blob Storage 全バージョン 全バージョン 全バージョン
ADLS Gen1 未サポート 未サポート 未サポート

イベントの保持

注意
本機能はDatabricksランタイム8.4以降で利用できます。

Auto Loaderは一度のみの取り込み保証を行うために、RocksDBを用いてチェックポイントの中で検知したファイルを追跡し続けます。大容量のデータセットに対しては、ストレージコストとAuto Loaderの起動時間を削減するためにcloudFiles.maxFileAgeを用いてチェックポイントからイベントを期限切れにすることができます。cloudFiles.maxFileAgeに指定できる最小値は"14 days"となります。RocksDBにおける削除はトゥームストーンのエントリとして表示され、定常状態になる前にイベントが期限切れになるため、一時的にはストレージの使用量は増加します。

警告!
cloudFiles.maxFileAgeは大規模データセットのコストコントロールの手段として提供しており、時間あたり数百万のファイルを取り込みます。不適切にcloudFiles.maxFileAgeをチューニングすると、データ品質問題を引き起こす場合があります。このため、絶対に必要でない場合を除きこれをチューニングすることはお勧めしません。

cloudFiles.maxFileAgeオプションをチューニングしようとすると、Auto Loaderによって未処理のファイルが無視されたり、処理済みのファイルが有効期限切れとなって再処理が行われ、重複データが生成される場合があります。cloudFiles.maxFileAgeを選択する際に検討すべき事項を以下に示します。

  • 長い間を置いてからストリームを再起動する際、キューから取得されるcloudFiles.maxFileAgeより古いファイル通知イベントは無視されます。同様に、ディレクトリ一覧を使用している際、ダウンタイムの期間に出現したファイルでcloudFiles.maxFileAgeより古いファイルは無視されます。
  • ディレクトリ一覧モードを使用しており、cloudFiles.maxFileAgeを例えば"1 month"に設定している場合、ストリームを停止してからcloudFiles.maxFileAge"2 months"に設定して再起動すると、1ヶ月より古く、2ヶ月より新しい全てのファイルが再処理されます。

cloudFiles.maxFileAgeをチューニングするのにベストなアプローチは、ゆるい有効期限、例えば"1 year"からスタートし、"9 months"のように短縮していくというものです。このオプションを設定し、最初にストリームを起動した際にcloudFiles.maxFileAgeより古いデータは取り込まれませんので、古いデータを取り込みたい場合には、ストリームを起動する際にこのオプションを設定すべきではありません。

プロダクションでのAuto Loaderの実行

プロダクション環境でAuto Loaderを実行するためには、ストリーミングのベストプラクティスに従うことをお勧めします。

Auto Loaderのモニタリング

ストリーミングアプリケーションをモニタリングするためには、Apache SparkのStreaming Query Listener interfaceを使用することをお勧めします。

Auto Loaderはバッチごとにストリーミングクエリーリスナーにメトリクスを報告します。バックログにどれだけのファイルが存在するのか、バックログがどのくらいのサイズなのかは、ストリーミングクエリー進捗ダッシュボードRaw Dataタブの下のnumFilesOutstandingnumBytesOutstandingで確認することができます。

JSON
{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

Databricksランタイム10.1以降でファイル通知モードを使用している際は、AWSとAzureのメトリクスにはapproximateQueueSizeとしてクラウドキューになるファイルイベント数の近似値が含まれます。

Delta Live TablesにおけるAuto Loaderの利用

Auto LoaderはDelta Live TablesでSQLやPythonから使用することができます。Delta Live TablesでAuto Loaderを使用する際、スキーマの場所やチェックポイントの場所を指定する必要はありません。これらは、ご自身のパイプラインのDelta Live Tablesによって管理されます。

以下の例では、CSVやJSONファイルからのデータセットを作成するためにAuto Loaderを使用しています。

Python
@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
SQL
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE INCREMENTAL LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Auto Loaderとサポートされているフォーマットオプションを使用することができます。以下の例ではタブ区切りのCSVファイルからデータを読み込んでいます。

SQL
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t"))

注意
Delta Live Tablesでファイルを読み込むためにAuto Loaderを使用すると、自動でスキーマとチェックポイントのディレイクトリを設定し管理します。しかし、これらのディレクトリを手動で設定した場合、フルリフレッシュの実行は設定されたディレクトリの内容に影響を与えません。処理の際の思いがけない副作用を避けるために、自動で設定されたディレクトリを使用することをお勧めします。

コストの検討

Auto Loaderを使用する際、主なコストの発生源は計算資源とファイル検知になることでしょう。

計算コストを削減するためには、低レーテンシーの要件がない限り連続的に実行するのではなく、Trigger.AvailableNow(Databricksランタイム10.1以降)あるいはTrigger.Onceを用いたバッチジョブとしてAuto Loaderをスケジュール処理するようにDatabricksジョブを使用することをお勧めします。

ファイル検知のコストは、ディレクトリ一覧モードにおけるストレージアカウントに対するLISTオペレーション、ファイル通知モードにおけるサブスクリプションサービスとキューサービスのAPIリクエストという形で発生します。ファイル検知のコストを削減するためには以下のことをお勧めします。

  • ディレクトリ一覧モードでAuto Loaderを継続的に実行する際にProcessingTimeトリガーを指定します。
  • 可能であればインクリメンタル一覧を活用できるように、名前順でストレージアカウントにファイルをアップロードするように設計します。
  • 特に深くネストされたディレクトリの場合には、ディレクトリ一覧モードでDatabricksランタイム9.0以降を使用します。
  • コストを追跡するためにAuto Loaderによって作成されたリソースにタグ付けするためにリソースタグを使います。

Auto Loaderの設定

cloudFilesソース固有の設定オプションにはcloudFilesのプレフィックスがついているので、他の構造化ストリーミングソースオプションと別の名前空間に存在することになります。

ファイルフォーマットのオプション

Auto Loaderを用いることで、JSONCSVPARQUETAVROTEXTBINARYFILEORCファイルを取り込むことができます。これらのファイルフォーマットに対するオプションについてはフォーマットオプションをご覧ください。

共通のAuto Loaderのオプション

ディレクトリ一覧、ファイル通知モードに対しては以下のオプションを設定することができます。

オプション
cloudFiles.allowOverwrites
タイプ:Boolean
入力ディレクトリのファイル変更が既存データを上書きするかどうかを指定します。Databricksランタイム7.6以降で利用できます。
デフォルト値:false
cloudFiles.backfillInterval
タイプ:Interval String
Auto Loaderが指定された周期、例えば1日に1度バックフィルする場合には1 day、1週間に1度バックフィルする場合には1 weekで非同期バックフィルを起動することができます。ファイルイベント通知システムはアップロードされた全てのファイルの100%のデリバリーを保証しないため、最終的に全てのファイルが処理されることを保証するために、Databricksランタイム8.4(未サポート)以降で利用できるバックフィルを使用することができます。インクリメンタル一覧を使用している場合には、Databricksランタイム 9.1 LTS以降で利用できる定期的なバックフィルを用いて結果的な完全性を保証することができます。
デフォルト値: None
cloudFiles.format
タイプ:String
ソースパスのデータファイルフォーマットを指定します。以下の値を指定できます。デフォルト値: None(必須のオプションです)
cloudFiles.includeExistingFiles
タイプ:Boolean
入力パスを処理するストリームに既存ファイルを含めるか、初期セットアップ後に到着する新規ファイルのみを処理するのかを指定します。このオプションは最初にストリームを起動した時のみ評価されます。ストリームを再起動した後にこのオプションを変更しても効果はありません。
デフォルト値:true
cloudFiles.inferColumnTypes
タイプ:Boolean
スキーマ推定を活用する際にカラムタイプを推定するかどうかを指定します。デフォルトでは、JSONデータセットを推定する際、カラムは文字列として推定されます。詳細はschema inferenceを参照ください。
デフォルト値:false
cloudFiles.maxBytesPerTrigger
タイプ:Byte String
トリガーごとに処理する新規バイトの最大数です。個々のバッチを10GBのデータに限定するには10gのようなバイト文字列を指定することができます。これはソフトな最大値です。それぞれ3GBであるファイルが存在する場合、Databricksはマイクロバッチで12GBを処理します。cloudFiles.maxFilesPerTriggerと一緒に指定した場合、DatabricksはcloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTriggerどちらかが先に限界に達するまで処理を行います。Trigger.Once()を使用した際にはこのオプションの効果はありません。
デフォルト値: None
cloudFiles.maxFileAge
タイプ:Interval String
重複排除のために、どれだけの期間ファイルイベントを追跡するのかを指定します。1時間あたり数百万ファイル規模のデータを取り込むのではない限り、このパラメーターをチューニングすることはお勧めしません。詳細はイベント保持のセクションをご覧ください。
デフォルト値: None
cloudFiles.maxFilesPerTrigger
タイプ:Integer
トリガーごとに処理する新規ファイルの最大数を指定します。cloudFiles.maxBytesPerTriggerと一緒に指定した場合、DatabricksはcloudFiles.maxFilesPerTriggercloudFiles.maxBytesPerTriggerどちらかが先に限界に達するまで処理を行います。Trigger.Once()を使用した際にはこのオプションは効果はありません。
デフォルト値: 1000
cloudFiles.partitionColumns
タイプ:String
ファイルのディレクトリ構造から推論したいHiveスタイルのパーティションカラムをカンマ区切りで指定します。Hiveスタイルのパーティションカラムは、base_path>/a=x/b=1/c=y/file.formatのようにイコールで結合されるキーバリューのペアとなります。この例では、パーティションカラムはabcとなります。スキーマ推定を使用しており、データを読み込む<base_path>を指定している場合、デフォルトではこれらのカラムは自動でスキーマに追加されます。スキーマを指定した場合、Auto Loaderはスキーマにこれらのカラムが含まれていることを期待します。これらのカラムをスキーマに含めたくない場合には、これらのカラムを無視するために""を指定することができます。さらに、以下の例のように複雑なディレクトリ構造のファイルパスからカラムを推定したい場合にはこのオプションを使用することができます。
<base_path>/year=2022/week=1/file1.csv <base_path>/year=2022/month=2/day=3/file2.csv <base_path>/year=2022/month=2/day=4/file3.csv
year,month,dayとしてcloudFiles.partitionColumnsを指定することで、file1.csvyear=2022を返却しますが、monthdaynullとなります。file2.csvfile3.csvについてはmonthdayは適切にパースされます。
デフォルト値: None
cloudFiles.schemaEvolutionMode
タイプ:String
データで新規カラムが検知された際のスキーマ進化のモードを指定します。デフォルトでは、JSONデータセットを推定する際、カラムは文字列型として推定されます。詳細はschema evolutionをご覧ください。
デフォルト値:スキーマが指定されない場合には"addNewColumns"、それ以外の場合はnone
cloudFiles.schemaHints
タイプ:String
スキーマ推定の際にAuto Loaderに指定するスキーマ情報です。詳細はschema hintsをご覧ください。
デフォルト値: None
cloudFiles.schemaLocation
タイプ:String
推定されたスキーマと以降の変更を保存する場所です。詳細はschema inferenceをご覧ください。
デフォルト値: None(スキーマを推定する際には必須です)
cloudFiles.validateOptions
タイプ:Boolean
Auto Loaderのオプションを検証し、未知あるいは一貫性のないオプションでエラーを返すかどうかを指定します。
デフォルト値:true

ディレクトリ一覧のオプション

以下のオプションはディレクトリ一覧モードで使用します。

オプション
cloudFiles.useIncrementalListing
タイプ:String
ディレクトリ一覧モードで完全な一覧ではなくインクリメンタルな一覧を使用するのかどうかを指定します。デフォルトでは、Auto Loaderはベストエフォートで指定されたディレクトリにインクリメンタルな一覧を適用できるかどうかを自動で検知しようとします。truefalseを指定することで、明示的にインクリメンタルな一覧か完全なディレクトリ一覧を使うのかを指定することができます。
Databricksランタイム9.1 LTS以降で利用できます。
デフォルト値: auto
指定できる値: auto, true, false

ファイル通知のオプション

以下のオプションはファイル通知モードで使用します。

オプション
cloudFiles.fetchParallelism
タイプ:Integer
キューサービスからメッセージをフェッチする際のスレッド数です。
デフォルト値: 1
cloudFiles.pathRewrites
タイプ:A JSON string
複数のS3バケットからファイル通知を受け取るqueueUrlを指定し、これらのコンテナのデータにアクセスするように設定されたマウントポイントを活用した場合にのみ必要となります。マウントポイントでbucket/keyのプレフィクスでリライトするためにこのオプションを使用します。プレフィクスのみがリライトされます。例えば、{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}という設定では、パスs3://<databricks-mounted-bucket>/path/2017/08/fileA.jsondbfs:/mnt/data-warehouse/2017/08/fileA.jsonにリライトされます。
デフォルト値: None
cloudFiles.resourceTags
タイプ:Map(String, String)
関連リソースの識別、紐付けに役立つキーバリューペアのシリーズです。例えば、
cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")
AWSにおける詳細については、Amazon SQS cost allocation tagsConfiguring tags for an Amazon SNS topicをご覧ください。(1)
Azureにおける詳細については、Naming Queues and MetadataEvent Subscriptionsproperties.labelsのカバレッジをご覧ください。Auto Loaderはラベルとして、これらJSON内のキーバリュータグのペアを格納します。(1)
GCPにおける詳細については、Reporting usage with labelsをご覧ください。(1)
デフォルト値: None
cloudFiles.useNotifications
タイプ:Boolean
新規ファイルがある際にファイル通知モードを使うかどうかを指定します。falseの場合、ディレクトリ一覧モードを使用します。Auto Loaderの動作原理をご覧ください。
デフォルト値: false

(1) Auto Loaderはデフォルトではベストエフォートで以下のキーバリュータグのペアを追加します。

  • vendor: Databricks
  • path: データがロードされるパス。GCPではラベルの制限のため利用不可。
  • checkpointLocation: ストリームのチェックポイントのパス。GCPではラベルの制限のため利用不可。
  • streamId: ストリームに対するグローバルでユニークなID。

これらのキーの名前は予約されており、これらの値を上書きしてはいけません。

AWS固有のオプション

オプション
cloudFiles.region
タイプ:String
ソースのS3バケットの存在するリージョンであり、ここにAWS SNSサービスとSQSサービスが作成されます。
デフォルト値: Databricksランタイム9.0以降ではEC2インスタンスのあるリージョンとなります。Databricksランタイム8.4以前ではリージョンを指定する必要があります。

loudFiles.useNotifications = trueを選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。

オプション
cloudFiles.queueUrl
タイプ:String
SQSキューのURLです。指定した場合、Auto Loaderは自身のAWS SNS、SQSサービスをセットアップするのではなく、このキューからのイベントを直接消費します。
デフォルト値: None

IAMロールが利用できない、異なるクラウドからデータを取り込む際にAWS SNS、SQSにアクセスするために以下のオプションで認証情報を指定することができます。

オプション
cloudFiles.awsAccessKey
タイプ:String
ユーザーに対するAWSアクセスキーのIDです。cloudFiles.awsSecretKeyと共に指定する必要があります。
デフォルト値: None
cloudFiles.awsSecretKey
タイプ:String
ユーザーに対するAWSのシークレットキーです。cloudFiles.awsAccessKeyと共に指定する必要があります。
デフォルト値: None
cloudFiles.roleArn
タイプ:String
移譲を受けるIAMロールのARNです。このロールはお使いのクラスターのインスタンスプロファイル、あるいはcloudFiles.awsAccessKeycloudFiles.awsSecretKeyによる認証情報から移譲を受けることができます。
デフォルト値: None
cloudFiles.roleExternalId
タイプ:String
cloudFiles.roleArnを用いてロールの移譲を受ける際に指定するIDです。
デフォルト値: None
cloudFiles.roleSessionName
タイプ:String
cloudFiles.roleArn`を用いてロールの移譲を受ける際に指定するオプションのセッション名です。
デフォルト値: None
cloudFiles.stsEndpoint
タイプ:String
cloudFiles.roleArn`を用いてロールの移譲を受ける際にAWS STSにアクセスする際に指定するオプションのエンドポイントです。
デフォルト値: None

Azure固有のオプション

loudFiles.useNotifications = trueを選択し、Auto Loaderで使用する通知サービスをセットアップするようにするには、以下のオプションを全て指定する必要があります。

オプション
cloudFiles.clientId
タイプ: String
クライアントIDあるいはサービスプリンシパルのアプリケーションIDです。
デフォルト値: None
cloudFiles.clientSecret
タイプ: String
サービスプリンシパルのクライアントシークレットです。
デフォルト値: None
cloudFiles.connectionString
タイプ: String
アカウントアクセスキー、あるいは共有アクセスシグネチャ(SAS)に基づくストレージアカウントに対する接続文字列です。
デフォルト値: None
cloudFiles.resourceGroup
タイプ: String
ストレージアカウントが作成されるAzureリソースグループです。
デフォルト値: None
cloudFiles.subscriptionId
タイプ: String
リソースグループが作成されるAzureサブスクリプションIDです。
デフォルト値: None
cloudFiles.tenantId
タイプ: String
サービスプリンシパルが作成されるAzureテナントIDです。
デフォルト値: None

重要!
Databricksランタイム9.1以降で、Azure Chinaとガバメントリージョンでは自動通知セットアップを利用することができます。古いDBRのバージョンでは、これらのリージョンにおけるファイル通知でAuto Loaderを使うにはqueueNameを指定する必要があります。

loudFiles.useNotifications = trueを選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。

オプション
cloudFiles.queueName
タイプ: String
Azureキューの名称です。指定した場合、自身のAzure Event GridとQueue Storageサービスをセットアップするのではなく、クラウドファイルソースはこのキューからイベントを直接消費します。この場合、お使いのcloudFiles.connectionStringではキューに対する読み取り権限のみが必要となります。
デフォルト値: None

Google固有のオプション

注意
Google Cloudにおけるファイル通知モードはパブリックプレビューです。Databricks Runtime 9.1 LTS以降でサポートされています。

Googleサービスアカウントを使用することで、Auto Loaderは自動で通知サービスをセットアップすることができます。Google service setupに従うことでサービスアカウントの移譲を受けるようにクラスターを設定することができます。サービスアカウントで必要な権限は、ファイル通知リソースをセットアップする際に必要となるアクセス権で指定されます。そうでない場合、Auto loaderに通知サービスをセットアップさせたい場合には、以下のオプションで認証情報を指定することができます。

オプション
cloudFiles.client
タイプ: String
GoogleサービスアカウントのクライアントIDです。
デフォルト値: None
cloudFiles.clientEmail
タイプ: String
Googleサービスアカウントのメールアドレスです。
デフォルト値: None
cloudFiles.privateKey
タイプ: String
Googleサービスアカウントに対して生成されるプライベートキーです。
デフォルト値: None
cloudFiles.privateKeyId
タイプ: String
Googleサービスアカウントに対して生成されるプライベートキーのIDです。
デフォルト値: None
cloudFiles.projectId
タイプ: String
GCSバケットが存在するプロジェクトのIDです。Google Cloud Pub/Subのサブスクリプションもこのプロジェクト内に作成されます。
デフォルト値: None

loudFiles.useNotifications = trueを選択し、Auto Loaderですでにセットアップ済みのキューを使いたい場合にのみ以下のオプションを指定します。

オプション
cloudFiles.subscription
タイプ: String
Google Cloud Pub/Subサブスクリプションの名前です。指定した場合、自身のGCS通知とGoogle Cloud Pub/Subサービスをセットアップするのではなく、クラウドファイルソースはこのキューからイベントを消費します。
デフォルト値: None

リファレンス

Auto Loaderの概要とデモンストレーションについては、こちらのYouTube動画(59分)をご覧ください。

Auto Loaderの使い方詳細については以下をご覧ください。

一般的なデータロードのパターン

globパターンを用いたディレクトリやファイルのフィルタリング

パスにglobパターンを指定することで、ディレクトリやファイルをフィルタリングすることができます。

パターン 説明
? 任意の一文字にマッチ
* 0文字以上の文字列にマッチ
[abc] 文字列セット{a,b,c}の1文字にマッチ
[a-z] 文字列レンジ{a...z}の1文字にマッチ
[^a] 文字列セット、レンジ{a}以外の1文字にマッチ。左のカッコの直後に^を指定する必要があることに注意してください。
{ab,cd} 文字列セット{ab,cd}の文字列にマッチ
{ab,c{de, fh}} 文字列セットの{ab, cde, cfh}の文字列にマッチ

以下のようにプレフィックスのパターンを指定するのにpathを使用します。

Python
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base_path>/*/files")
Scala
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base_path>/*/files")

重要!
明示的にサフィックスのパターンを指定するにはpathGlobFilterオプションを指定する必要があります。pathはプレフィックスのフィルターのみを提供します。

例えば、異なるサフィックスを持つファイルを含みディレクトリでpngファイルのみをパースしたい場合には、以下のように行います。

Python
df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base_path>)
Scala
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base_path>)

FAQ

ファイルに追記、上書きされた際に再度Auto Loaderは処理を行いますか?

cloudFiles.allowOverwritesを有効化しない限り、ファイルは1度だけ処理されます。ファイルに追記、上書きが行われた場合、Databricksはどのバージョンのファイルが処理されたのかを保証しません。動作の一貫性を保つためには、不変のファイルの取り込みのみにAuto Loaderを使用することをお勧めします。これが要件に合わない場合には、Databricks担当にお問い合わせください。

データファイルが継続的に到着する場合、例えば一日に一度というように定期的な周期で到着する場合、このソースを使い続けるべきでしょうか?メリットはありますか?

両方YESです。この場合、Trigger.OnceTrigger.AvailableNow(Databricksランタイム10.2以降で利用できます)の構造化ストリーミングジョブをセットアップし、予期されるファイルの到着時刻の後に実行するようにスケジュールします。Auto Loaderは頻度が高い、頻度の低いアップデート両方に対してうまく動作します。最終的なアップデートが非常に大規模な場合、Auto Loaderは入力サイズに応じて適切にスケールします。Auto Loaderの効率的なファイル検知技術とスキーマ進化の能力は、Auto Loaderをインクリメンタルデータ取り込みに対して頼りのある手段にしています。

ストリームを再起動する際にチェックポイントの場所を変更したら何が起きますか?

チェックポイントの場所ではストリームの重要な識別情報を保持しています。チェックポイントの場所を意図を持って変更することは、以前のストリームを破棄し、新たなストリームを開始することを意味します。

事前にイベント通知サービスを作成する必要がありますか?

いいえ。ファイル通知モードを選択し、必要なアクセス権を与えれば、Auto Loaderはファイル通知サービスを作成します。ファイル通知の活用をご覧ください。

Auto Loaderによって作成されたイベント通知のリソースをどのようにクリーンアップしますか?

リソースの一覧、ティアダウンにクラウドリソースマネージャを使用することができます。クラウドプロバイダーのUIやAPIを用いて手動でこれらのリソースを削除することができます。

同じバケット・コンテナーで異なる入力ディレクトリから複数のストリーミングクエリーを実行することができますか?

親子関係のディレクトリでない限り可能です。例えば、prod-logs/prod-logs/usage/は親子関係があるため動作しません。

バケット・コンテナーに既存のファイル通知がある場合に、この機能を使うことができますか?

入力ディレクトリが既存の通知プレフィクスと競合しない限り(例えば、上述の親子関係のディレクトリ)可能です。

トラブルシュート

エラー

java.lang.RuntimeException: Failed to create event grid subscription.

最初にAuto Loaderを実行する際にこのエラーに遭遇した場合、お使いのAzureサブスクリプションでEvent Gridがリソースプロバイダーとして登録されていません。Azureポータルで登録を行うには以下の手順を踏みます。

  1. お使いのサブスクリプションに移動します。
  2. SettingセクションのResource Providersをクリックします。
  3. プロバイダーMicrosoft.EventGridを登録します。

エラー

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

最初にAuto Loaderを実行する際にこのエラーに遭遇した場合、Event Gridとストレージアカウントに対するサービスプリンシパルにContributorロールが与えられていることを確認してください。

付録

ファイル通知リソースをセットアップする際に必要となるアクセス権

ADLS Gen2とAzure Blob Storage

入力ディレクトリに対する読み取り権限が必要です。Azure Blob StorageAzure Data Lake Storage Gen2をご覧ください。

ファイル通知モードを使うためには、イベント通知サービスをセットアップしアクセスするために認証情報を提供する必要があります。Databricksランタイム8.1以降では、認証のためのサービスプリンシパルが必要となります。Databricksランタイム8.0以前では、サービスプリンシパルと接続文字列の両方を指定する必要があります。

  • Azureビルトインロールを用いたサービスプリンシパル
    クライアントIDとクライアントシークレットのフォーマットでAzure Active Directoryアプリケーションとサービスプリンシパルを作成します。
    このアプリケーションに対して、入力パスが含まれるストレージアカウントに対する以下のロールを割り当てます。

    • Contributor: このロールは、キューやイベントサブスクリプションのようなストレージアカウントに対するリソースのセットアップに使用されます。
    • Storage Queue Data Contributor: このロールは、キューからのメッセージの取得、削除のようなキューのオペレーションに使用されます。このロールは、Databricksランタイム8.1以降で、接続文字列なしにサービスプリンシパルを指定する際に必要となります。
      このアプリケーションに関連リソースグループに対する以下のロールを割り当てます。
    • EventGrid EventSubscription Contributor: このロールは、イベントサブスクリプションの作成、一覧のようなイベントグリッドサブスクリプションのオペレーションに使用されます。

    詳細は、Assign Azure roles using the Azure portalをご覧ください。

  • カスタムロールを用いたサービスプリンシパル

    上述のロールで必要となる過剰なアクセス権に懸念がある場合、以下のようにAzureロールのJSONフォーマットで最低限のカスタムロールを作成することができます。

JSON
"permissions": [
  {
    "actions": [
      "Microsoft.EventGrid/eventSubscriptions/write",
      "Microsoft.EventGrid/eventSubscriptions/read",
      "Microsoft.EventGrid/eventSubscriptions/delete",
      "Microsoft.EventGrid/locations/eventSubscriptions/read",
      "Microsoft.Storage/storageAccounts/read",
      "Microsoft.Storage/storageAccounts/write",
      "Microsoft.Storage/storageAccounts/queueServices/read",
      "Microsoft.Storage/storageAccounts/queueServices/write",
      "Microsoft.Storage/storageAccounts/queueServices/queues/write",
      "Microsoft.Storage/storageAccounts/queueServices/queues/read",
      "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
  ],
    "notActions": [],
    "dataActions": [
      "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
      "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
      "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
      "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
    ],
    "notDataActions": []
  }
]
そして、このカスタムロールをアプリケーションに割り当てることができます。
詳細は、[Assign Azure roles using the Azure portal](https://docs.microsoft.com/azure/role-based-access-control/role-assignments-portal)を参照ください。
  • 接続文字列

    Auto Loaderでは、キューからメッセージを取得、削除、キューの作成のようなAzure Queue Storageに対する認証を行うために接続文字列が必要となります。入力ディレクトリパスが存在するのと同じストレージアカウントにキューが作成されます。お使いのアカウントキー共有アクセスシグネチャ(SAS)で接続文字列を見つけることができます。SASトークンを設定する際、以下のアクセス権を指定します。

AWS S3

入力ディレクトリに対する読み取り権限が必要となります。詳細はS3接続の詳細をご覧ください。

ファイル通知モードを使うには、お使いのIAMユーザーやロールに以下のJSONポリシーをアタッチします。

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

ここでは、

  • <bucket-name>: 例えばauto-logsのように、ストリームがファイルを読み込むS3バケット名を指定します。databricks-*-logsのように、ワイルドカードとして*を使うことができます。お使いのDBFSパスに対するS3バケットを見つけ出すには、%fs mountsをノートブックで実行することで、全てのDBFSマウントポイントを一覧することができます。
  • <region>: us-west-2のようにS3バケットが存在するAWSリージョンを指定します。
  • <account-number>: S3バケットを保有するAWSアカウントの番号です。例えば、123456789012を指定します。アカウント番号を指定したくない場合には*を使用します。

SQSとSNS ARNにおける文字列databricks-auto-ingest-*の指定は、SQSとSNSサービスを作成する際にcloudFilesソースが使う名前のプレフィックスになります。Databricksはストリームの初回実行で通知サービスをセットアップするので、初回実行の後にポリシーの権限を縮小することができます(例えば、ストリームを停止して再起動)。

注意
上述のポリシーは、通知サービスであるS3バケット通知、SNS、SQSサービスのセットアップに必要となる権限のみに関するものであり、すでにS3バケットへの読み取りアクセス権は持っていることを前提としています。S3の読み取り専用権限を追加する必要がある場合には、JSONドキュメントのDatabricksAutoLoaderSetup文のActionリストに以下の内容を追加してください。

  • s3:ListBucket
  • s3:GetObject

初期セットアップ後のアクセス権の縮小

上述したリソースセットアップのアクセス権は、ストリームの初回実行にのみ必要となります。初回実行の後は権限を縮小した以下のIAMポリシーに切り替えることができます。

重要!
縮小された権限では、処理を失敗した際に新規ストリーミングクエリーを起動したり、リソースを再作成することはできません(例えば、誤ってSQSキューを削除した場合)。リソースを一覧したりティアダウンするために、クラウドリソースマネジメントAPIを使用することもできません。

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

異なるAWSアカウントでセキュアにデータを取り込む

Auto LoaderはIAMロールの委任を受けることで、異なるAWSアカウントからデータをロードすることができます。AssumeRoleで作成された一時的なセキュリティクレディンシャルを設定した後は、クロスアカウントでAuto Loaderでクラウドファイルをロードすることができます。AWSクロスアカウントに対してAuto Loaderをセットアップするには、DatabricksにおけるAssumeRoleポリシーを用いたS3バケットに対するセキュアなクロスアカウントアクセスをご覧ください。以下のことを確認してください。

  • AssumeRoleメタロールがクラスターに割り当てられていることを確認。
  • 以下のプロパティを含むようにクラスターのSpark設定をセット。
ini
fs.s3a.credentialsType AssumeRole
fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
fs.s3a.acl.default BucketOwnerFullControl

GCS

GCSバケット及びバケット上の全てのオブジェクトに対するlistgetの権限が必要です。詳細はGoogleのドキュメントのIAM permissionsをご覧ください。

ファイル通知モードを使うには、GCSサービスアカウントとGoogle Cloud Pub/Subのリソースにアクセスする際に使用されるアカウントに権限を追加する必要があります。

Google Cloud Pub/Subリソースに使用されるサービスアカウントに対しては、以下の権限を追加する必要があります。

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

これを行うためには、これらの権限を持つIAMカスタムロールを作成するか、これらの権限をカバーする既存GPCロールを割り当てます。

GCSサービスアカウントの特定

対応するプロジェクトのGoogle Cloud Consoleで、Cloud Storage > Settingsに移動します。このページで、GCSサービスアカウントのメールアドレスを含む「Cloud Storage Service Account」というタイトルのセクションを見つけることができます。

ファイル通知モード向けのカスタムGoogle Cloud IAMロールの作成

対応するプロジェクトのGoogle Cloud Consoleで、IAM & Admin > Rolesに移動します。次に、画面上部でロールを作成するか、既存ロールを更新します。ロールの作成、編集画面では、Add Permissionsをクリックします。ロールに対して権限を追加することができるメニューがポップアップします。

ファイルの名前順

名前順で並び替えられるファイルに対しては、アップロードされる新規ファイルは、既存ファイルよりも大きい名前順を持つプレフィックスが必要となります。以下に、名前順で並び替えられるディレクトリの例を示します。

バージョン管理されるファイル

Delta Lakeのテーブルは名前順でトランザクションログをコミットします。

<path_to_table>/_delta_log/00000000000000000000.json
<path_to_table>/_delta_log/00000000000000000001.json <- guaranteed to be written after version 0
<path_to_table>/_delta_log/00000000000000000002.json <- guaranteed to be written after version 1
...

AWS DMSはバージョン管理された形でAWS S3にCDCファイルをアップロードします。

database_schema_name/table_name/LOAD00000001.csv
database_schema_name/table_name/LOAD00000002.csv
...

日付でパーティションが作成されたファイル

日付でパーティショニングされた形でファイルをアップロードすることができ、インクリメンタル一覧を活用することができます。以下に例を示します。

// <base_path>/yyyy/MM/dd/HH:mm:ss-randomString
<base_path>/2021/12/01/10:11:23-b1662ecd-e05e-4bb7-a125-ad81f6e859b4.json
<base_path>/2021/12/01/10:11:23-b9794cf3-3f60-4b8d-ae11-8ea320fad9d1.json
...

// <base_path>/year=yyyy/month=MM/day=dd/hour=HH/minute=mm/randomString
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/442463e5-f6fe-458a-8f69-a06aa970fc69.csv
<base_path>/year=2021/month=12/day=04/hour=08/minute=22/8f00988b-46be-4112-808d-6a35aead0d44.csv <- this may be uploaded before the file above as long as processing happens less frequently than a minute

日付のパーティショニングでファイルがアップロードされた際、いくつか注意すべきことがあります。

  • 月、日、時、分は名前順を保証するためにゼロによる左側のパディングが必要です(hour=3ではなくhour=032021/5/3ではなく2021/05/03)。
  • 親ディレクトリにおける処理頻度よりも子供のディレクトリの処理頻度が少ない場合は、必ずしもファイルが名前順でアップロードされる必要はありません。

ファイルを日付のパーティショニングの名前順でアップロードできるサービスには以下のようなものがあります。

フォーマットオプション

一般的なオプション

以下のオプションは全てのフォーマットに適用されます。

オプション
modifiedAfter
タイプ: Timestamp String例えば、2021-01-01 00:00:00.000000 UTC+0
指定されたタイムスタンプ以降の修正時のタイムスタンプを持つファイルを取り込むオプションの値です。
デフォルト値: None
modifiedBefore
タイプ: Timestamp String例えば、2021-01-01 00:00:00.000000 UTC+0
指定されたタイムスタンプ以前の修正時のタイムスタンプを持つファイルを取り込むオプションの値です。
デフォルト値: None
pathGlobFilter
タイプ: String
ファイルを選択するためのglobパターンを指定します。COPY INTOにおけるPATTERNと同じものです。
デフォルト値: None
recursiveFileLookup
タイプ: Boolean
ベースディレクトリ内のデータを再起的にロードし、パーティション推論をスキップするかどうかを指定します。
デフォルト値: false

JSONオプション

オプション
allowBackslashEscapingAnyCharacter
タイプ: Boolean
直後の文字をエスケープするためにバックスラッシュを許可するかどうかを指定します。無効化されている場合、JSON仕様で明示的に一覧されている文字のみがエスケープされます。
デフォルト値: false
allowComments
タイプ: Boolean
パースされたコンテンツ内でJava, C, C++スタイルのコメント('/', '*', '//')を許可するかどうかを指定します。
デフォルト値: false
allowNonNumericNumbers
タイプ: Boolean
not-a-number(NaN)を適切な浮動小数点の値として許可するかどうかを指定します。
デフォルト値: true
allowNumericLeadingZeros
タイプ: Boolean
ゼロで始まる整数値(00001など)を許可するかどうかを指定します。
デフォルト値: false
allowSingleQuotes
タイプ: Boolean
文字列を引用(名前や文字列の値)するためにシングルクオート(アポストロフィ、'\')を許可するかどうかを指定します。
デフォルト値: true
allowUnquotedControlChars
タイプ: Boolean
JSON文字列の中にエスケープされていない制御文字(32より小さい値のASCII文字、タブ、ラインフィード文字を含む)を含めて良いかどうかを指定します。
デフォルト値: false
allowUnquotedFieldNames
タイプ: Boolean
クオートされていないフィールド名(JavaScriptでは許可されますが、JSON仕様では許可されません)の利用を許可するかどうかを指定します。
デフォルト値: false
badRecordsPath
タイプ: String
不正なJSONレコードに関する情報を記録するファイルのパスです。
デフォルト値: None
columnNameOfCorruptRecord
タイプ: String
不正でパースできないレコードを格納するカラム名です。パーシングのmodeDROPMALFORMEDの場合、このカラムは空になります。
デフォルト値: _corrupt_record
dateFormat
タイプ: String
日付文字列をパースするフォーマットです。
デフォルト値: yyyy-MM-dd
dropFieldIfAllNull
タイプ: Boolean
スキーマ推定の際に、全てがnull値のカラム、空の配列、空のstructsのカラムを無視するかどうかを指定します。
デフォルト値: false
encodingあるいはcharset
タイプ: String
JSONファイルのエンコーディング名を指定します。オプションについてはjava.nio.charset.Charsetをご覧ください。multilinetrueの場合、UTF-16UTF-32を使うことはできません。
デフォルト値: UTF-8
inferTimestamp
タイプ: Boolean
タイムスタンプ文字列をTimestampTypeとして推定するかどうかを指定します。trueに設定すると、スキーマ推定の処理時間が長くなります。
デフォルト値: false
lineSep
タイプ: String
二つの連続するJSONレコードの区切り文字です。
デフォルト値: None、\r, \r\n, \nをカバーします。
locale
タイプ: String
java.util.LocaleのIDです。JSONのパーシングする際に、デフォルトの日付、タイムスタンプ、数値に影響を与えます。
デフォルト値: US
mode
タイプ: String
不正レコードのハンドリングにおけるパーサーのモードです。'PERMISSIVE', 'DROPMALFORMED', 'FAILFAST'のいずれかとなります。
デフォルト値: PERMISSIVE
multiLine
タイプ: Boolean
JSONレコードが複数行にわたるかどうかを指定します。
デフォルト値: false
prefersDecimal
タイプ: Boolean
スキーマ推定の際、floatやdoubleをDecimalTypeとして推定するかどうかを指定します。
デフォルト値: false
primitivesAsString
タイプ: Boolean
数字やbooleanのようなプリミティブ型をStringTypeとして推定するかどうかを指定します。
デフォルト値: false
rescuedDataColumn
タイプ: String
データ型の不一致やスキーマミスマッチ(カラム名の大文字小文字を含む)でパーシングできなかった全てのデータを別のカラムで収集するかどうかを指定します。Auto Loaderを使用する際には、このカラムは自動で含まれます。詳細はRescued data columnを参照ください。
デフォルト値: None
timestampFormat
タイプ: String
パースするタイムスタンプ文字列のフォーマットです。
デフォルト値: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
timeZone
タイプ: String
タイムスタンプと日付をパーシングする際に使用するjava.time.ZoneIdとなります。
デフォルト値: None

CSVオプション

オプション
badRecordsPath
タイプ: String
不正なCSVレコードに関する情報を記録するファイルのパスです。
デフォルト値: None
charToEscapeQuoteEscaping
タイプ: Char
クオートをエスケープするために使用する文字です。例えばレコード[ " a\\", b ]に対して
  • '\'をエスケープする文字が定義されていない場合、レコードはパースされません。パーサーは文字列[a],[\],["],[,],[ ],[b]を読み込み、クローズするクオートがないためエラーを発生させます。
  • '\'をエスケープする文字が\として定義されている場合、レコードが読み込まれ[a\][b]の2つの値が読み込まれます。

デフォルト値: '\0'
columnNameOfCorruptRecord
タイプ: String
不正でパースできないレコードを格納するカラム名です。パーシングのmodeDROPMALFORMEDの場合、このカラムは空になります。
デフォルト値: _corrupt_record
comment
タイプ: Char
テキストの行の最初にある場合に行コメントと表現する文字を定義します。コメントのスキップを無効化するには'\0'を使用します。
デフォルト値: '#'
dateFormat
タイプ: String
日付文字列をパースするフォーマットです。
デフォルト値: yyyy-MM-dd
emptyValue
タイプ: String
空の値を表現する文字列です。
デフォルト値: ""
encodingあるいはcharset
タイプ: String
CSVファイルのエンコーディング名を指定します。オプションについてはjava.nio.charset.Charsetをご覧ください。multilinetrueの場合、UTF-16UTF-32を使うことはできません。
デフォルト値: UTF-8
enforceSchema
タイプ: Boolean
CSVファイルに対して指定された、推定されたスキーマの適用を強制するかどうかを指定します。オプションが有効化された場合、CSVファイルのヘッダーは無視されます。Auto Loaderを使用する際、データを救助しスキーマ進化を実現するために、デフォルトでこのオプションは無視されます。
デフォルト値: true
escape
タイプ: Char
データをパーシングする際に使用するエスケープ文字です。
デフォルト値: '\'
header
タイプ: Boolean
CSVファイルにヘッダーがあるかどうかを指定します。スキーマを推定する際、Auto Loaderはファイルにヘッダーがあることを前提にします。
デフォルト値: false
ignoreLeadingWhiteSpace
タイプ: Boolean
パースされた値それぞれの頭の空白を無視するかどうかを指定します。
デフォルト値: false
ignoreTrailingWhiteSpace
タイプ: Boolean
パースされた値それぞれの後ろの空白を無視するかどうかを指定します。
デフォルト値: false
inferSchema
タイプ: Boolean
パースされたCSVレコードのデータ型を推定するか、全てのカラムがStringTypeであると仮定するかどうかを指定します。trueに設定すると、データに対する追加の処理が必要となります。
デフォルト値: false
lineSep
タイプ: String
二つの連続するCSVレコードの区切り文字です。
デフォルト値: None、\r, \r\n, \nをカバーします。
locale
タイプ: String
java.util.LocaleのIDです。JSONのパーシングする際に、デフォルトの日付、タイムスタンプ、数値に影響を与えます。
デフォルト値: US
maxCharsPerColumn
タイプ: Int
パースする値の最大文字数です。メモリーエラーの回避に使うことができます。デフォルトは-1であり、無制限となります。
デフォルト値: -1
maxColumns
タイプ: Int
レコードあたりの最大列数のハードリミットです。
デフォルト値: 20480
mergeSchema
タイプ: Boolean
複数ファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。Auto Loaderでスキーマ推定を行う際にはデフォルトで有効化されます。
デフォルト値: false
mode
タイプ: String
不正レコードのハンドリングにおけるパーサーのモードです。'PERMISSIVE', 'DROPMALFORMED', 'FAILFAST'のいずれかとなります。
デフォルト値: PERMISSIVE
multiLine
タイプ: Boolean
CSVレコードが複数行にわたるかどうかを指定します。
デフォルト値: false
nanValue
タイプ: String
FloatTypeDoubleTypeをパーシングする際のnon-a-numberの値の文字列表現を指定します。
デフォルト値: "NaN"
negativeInf
タイプ: String
FloatTypeDoubleTypeをパーシングする際の負の無限大の文字列表現を指定します。
デフォルト値: "-Inf"
nullValue
タイプ: String
ヌル値を示す文字列です。
デフォルト値: ""
parserCaseSensitive(非推奨)
タイプ: Boolean
ファイルを読み込んでいる間にヘッダーで宣言されているカラムをスキーマに割り当てる際に、大文字小文字を区別するかどうかを指定します。有効化されている場合、大文字小文字が異なるカラムはrescuedDataColumnにレスキューされます。このオプションはreaderCaseSensitiveによって非推奨となりました。
デフォルト値: false
positiveInf
タイプ: String
FloatTypeDoubleTypeをパーシングする際の正の無限大の文字列表現を指定します。
デフォルト値: "Inf"
quote
タイプ: Char
値にフィールドの区切り文字が含まれている際に使用されるエスケープ文字です。
デフォルト値: '\'
rescuedDataColumn
タイプ: String
データ型の不一致やスキーマミスマッチ(カラム名の大文字小文字を含む)でパーシングできなかった全てのデータを別のカラムで収集するかどうかを指定します。Auto Loaderを使用する際には、このカラムは自動で含まれます。詳細はRescued data columnを参照ください。
デフォルト値: None
sepあるいはdelimiter
タイプ: String
カラムの区切り文字です。
デフォルト値: ","
timestampFormat
タイプ: String
パースするタイムスタンプ文字列のフォーマットです。
デフォルト値: yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
timeZone
タイプ: String
タイムスタンプと日付をパーシングする際に使用するjava.time.ZoneIdとなります。
デフォルト値: None
unescapedQuoteHandling
タイプ: String
エスケープされていないクオート(引用符)の取り扱い戦略を指定します。オプションは以下の通りとなります。
  • STOP_AT_CLOSING_QUOTE: 入力にエスケープされていないクオートが見つかった場合、クオートの文字列を蓄積し、クローズするクオートが見つかるまで値を引用された値としてパーシングを継続します。
  • BACK_TO_DELIMITER: 入力にエスケープされていないクオートが見つかった場合、値は引用されていないものとみなされます。これによって、sep で定義されている区切り文字が見つかるまで現状のパースされた値の全ての文字列が蓄積されます。値に区切り文字が含まれていない場合、パーサーは区切り文字か行の終わりを見つけるまで文字の蓄積を継続します。
  • STOP_AT_DELIMITER: 入力にエスケープされていないクオートが見つかった場合、値は引用されていないものとみなされます。これによって、sep で定義されている区切り文字か行の終わりが見つかるまで全ての文字列が蓄積されます。
  • SKIP_VALUE: 入力にエスケープされていないクオートが見つかった場合、当該の値でパスされたコンテンツは(次の区切り文字が見つかるまで)スキップされ、nullValue で設定された値になります。
  • RAISE_ERROR: 入力にエスケープされていないクオートが見つかった場合、TextParsingExceptionエラーを引き起こします。

デフォルト値: STOP_AT_DELIMITER

PARQUETオプション

オプション
datetimeRebaseMode
タイプ: String
JulianとProleptic Gregorianカレンダーの間のDATEとTIMESTAMPのリベースを制御します。EXCEPTION, LEGACY, CORRECTEDを指定できます。
デフォルト値: LEGACY
int96RebaseMode
タイプ: String
JulianとProleptic Gregorianカレンダーの間のINT96のリベースを制御します。EXCEPTION, LEGACY, CORRECTEDを指定できます。
デフォルト値: LEGACY
mergeSchema
タイプ: Boolean
複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。
デフォルト値: false

AVROオプション

オプション
avroSchema
タイプ: String
ユーザーがAvroフォーマットで指定できるオプションのスキーマです。Avroを読み込んでいる際、このオプションに実際のAvroのスキーマと違うが互換性のある進化スキーマを設定することができます。
デフォルト値: None
datetimeRebaseMode
タイプ: String
JulianとProleptic Gregorianカレンダーの間のDATEとTIMESTAMPのリベースを制御します。EXCEPTION, LEGACY, CORRECTEDを指定できます。
デフォルト値: LEGACY
mergeSchema
タイプ: Boolean
複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。
デフォルト値: false

BINARYFILEオプション

バイナリーファイルには追加の設定オプションはありません。

TEXTオプション

オプション
encoding
タイプ: String
CSVファイルのエンコーディング名を指定します。オプションについてはjava.nio.charset.Charsetをご覧ください。
デフォルト値: UTF-8
lineSep
タイプ: String
二つの連続するテキストレコードの区切り文字です。
デフォルト値: None、\r, \r\n, \nをカバーします。
wholeText
タイプ: Boolean
ファイルを単一のレコードとして読み込むかどうかを指定します。
デフォルト値: false

ORCオプション

オプション
mergeSchema
タイプ: Boolean
複数のファイルのスキーマを推定し、ファイルごとのスキーマをマージするかどうかを指定します。
デフォルト値: false

Databricks 無料トライアル

Databricks 無料トライアル

11
9
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
9