LoginSignup
5
1

More than 1 year has passed since last update.

DatabricksにおけるAmazon S3の取り扱い

Last updated at Posted at 2022-01-05

Amazon S3 | Databricks on AWS [2021/9/3時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Amazon S3は、大量のテキストやバイナリーデータのような非構造化データを格納するためのサービスです。

本書では、AWS S3バケットに対するDBFS(Databricksファイルシステム)を用いたバケットのマウント、あるいはAPIを用いて直接アクセスする方法を説明します。

重要!
Databricksランタイム7.3 LTS以降では、アップグレードされたバージョンのS3コネクターを使用します。以下の変更が既存コードに影響を及ぼす場合があります。

  • S3AファイルシステムはFileSystem.close()でリソースを解放します。ファイルシステムのキャッシュはデフォルトで有効化されているため、キャッシュされたファイルシステムへの参照を保持する他のスレッドが、クローズされた後に不適切にそのファイルを使用する場合があります。このため、FileSystem.close() APIを使用するべきではありません。
  • S3Aファイルシステムは出力ストリームをクローズする際にディレクトリのマーカーを削除しません。HADOOP-13230を含まないHadoopバージョンに依存するレガシーなアプリケーションは、ディレクトリの中にファイルが存在する場合でも空のディレクトリと誤解する場合があります。

DBFSを通じたS3バケットへのアクセス

このセクションでは、DBFSを通じてどのようにS3バケットにアクセスするのかを説明します。以下のことを実行できます。

S3バケットのマウント

Databricksファイルシステム(DBFS)を通じてS3バケットをマウントすることができます。このマウントはS3ロケーションへのポインターであり、決してデータはローカルでは同期されません。

クラスターを通じてマウントポイントが作成されると、当該クラスターのユーザーはすぐにマウントポイントにアクセスすることができます。別の稼働中のクラスターで、このマウントポイントを使用するには、稼働中のクラスターで新規に作成されたマウントポイントを利用可能にするためにdbutils.fs.refreshMounts()を実行する必要があります。

S3バケットをマウントするには2つの方法が存在します。

AWSインスタンスプロファイルを用いたバケットのマウント

AWSのインスタンスプロファイルを用いてS3バケットに対する認証、承認を管理することができます。インスタンスプロファイルに許可されたアクセス権によってバケットのオブジェクトに対するアクセスのタイプが決定されます。ロールに書き込みアクセスがあるのであれば、マウントポイントのユーザーはバケットにオブジェクトを書き込むことができます。ロールに読み込みアクセスがあるのであれば、マウントポイントのユーザーはバケットのオブジェクトを読み込むことができます。

  1. クラスターに対するインスタンスプロファイルを設定します。
  2. バケットをマウントします。

    Python
    aws_bucket_name = "<aws-bucket-name>"
    mount_name = "<mount-name>"
    dbutils.fs.mount("s3a://%s" % aws_bucket_name, "/mnt/%s" % mount_name)
    display(dbutils.fs.ls("/mnt/%s" % mount_name))
    
    Scala
    val AwsBucketName = "<aws-bucket-name>"
    val MountName = "<mount-name>"
    dbutils.fs.mount(s"s3a://$AwsBucketName", s"/mnt/$MountName")
    display(dbutils.fs.ls(s"/mnt/$MountName"))
    

AWSキーを用いたバケットのマウント

AWSキーを用いてバケットをマウントすることができます。

重要!
キーを用いてS3バケットをマウントする際、S3バケットのすべてのオブジェクトに対して、すべてのユーザーが読み書き権限を有することになります。

以下の例では、キーを格納するためにDatabricksのシークレットを使用しています。シークレットのキーはURLエスケープする必要があります。

Python
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "<aws-bucket-name>"
mount_name = "<mount-name>"

dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))
Scala
val AccessKey = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
// Encode the Secret Key as that can contain "/"
val SecretKey = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = "<aws-bucket-name>"
val MountName = "<mount-name>"

dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")
display(dbutils.fs.ls(s"/mnt/$MountName"))

ローカルファイルとしてS3オブジェクトにアクセス

S3バケットをDBFSにマウントすると、ローカルファイルパスを用いてS3オブジェクトにアクセスすることができます。

Python
df = spark.read.text("/mnt/%s/..." % mount_name)

あるいは

Python
df = spark.read.text("dbfs:/mnt/%s/..." % mount_name)
Scala
// scala
val df = spark.read.text(s"/mnt/$MountName/...")

あるいは

Scala
val df = spark.read.text(s"dbfs:/mnt/$MountName/...")

S3バケットのアンマウント

Python
dbutils.fs.unmount("/mnt/mount_name")
Scala
dbutils.fs.unmount(s"/mnt/$MountName")

S3バケットへの直接アクセス

この方法では、AWSキーを用いてSparkのワーカーがS3バケットのオブジェクトに直接アクセスすることができます。キーを格納するためにDatabricksのシークレットを使用しています。

Python
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "aws-region-id"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

myRDD = sc.textFile("s3a://%s/.../..." % aws_bucket_name)
myRDD.count()

s3a://パスに対するKMS暗号化の設定

ステップ1: インスタンスプロファイルの設定

Databricksでインスタンスプロファイルを作成します。

ステップ2: 設定で指定されたKMSキーにおけるキーユーザーとしてインスタンスプロファイルを追加

  1. AWSでIAMサービスに移動します。
  2. サイドバーの下部にあるEncryption Keysをクリックします。
  3. アクセス権を追加したいキーをクリックします。
  4. Key UsersセクションでAddをクリックします。
  5. IAMロールの隣にあるチェックボックスを選択します。
  6. Attachをクリックします。

ステップ3: 暗号化プロパティのセットアップ

AWS configurationsセッティングあるいはinitスクリプトを用いて、グローバルKMS暗号化プロパティをセットアップします。お使いのキーのARNをspark.hadoop.fs.s3a.server-side-encryption.keyキーに設定します。

Spark設定

ini
spark.hadoop.fs.s3a.server-side-encryption.key arn:aws:kms:<region>:<aws-account-id>:key/<bbbbbbbb-ddd-ffff-aaa-bdddddddddd>
spark.hadoop.fs.s3a.server-side-encryption-algorithm SSE-KMS

バケットごとにKMS暗号化を設定することもできます。

Initスクリプト

initスクリプトset-kms.shを作成するために以下のコードをノートブックで実行し、スクリプトを実行するようにクラスターを設定することで、グローバル暗号化設定をセットアップします。

Python
dbutils.fs.put("/databricks/scripts/set-kms.sh", """
#!/bin/bash

cat >/databricks/driver/conf/aes-encrypt-custom-spark-conf.conf <<EOL
[driver] {
  "spark.hadoop.fs.s3a.server-side-encryption.key" = "arn:aws:kms:<region>:<aws-account-id>:key/<bbbbbbbb-ddd-ffff-aaa-bdddddddddd>"
  "spark.hadoop.fs.s3a.server-side-encryption-algorithm" = "SSE-KMS"
}
EOL
""", True)

暗号化が動作していることを確認したら、グローバルinitスクリプトを用いて、すべてのクラスターの暗号化を設定します。

S3バケットデータの暗号化

Databricksではサーバーサイド暗号化を用いたデータの暗号化をサポートしています。このセクションでは、DBFSを通じてS3にファイルを書き込む際に、どのようにサーバーサイド暗号化を用いるのかを説明します。DatabricksではAmazon S3-managed encryption keys (SSE-S3)AWS KMS–managed encryption keys (SSE-KMS)をサポートしています。

SSE-S3を用いたファイルの書き込み

  1. SSE-S3を用いてS3バケットをマウントするには以下を実行します。

    Scala
    dbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-s3")
    
  2. SSE-S3を用いて対応するS3バケットにファイルを書き込むには以下を実行します。

    Scala
    dbutils.fs.put(s"/mnt/$MountName", "<file content>")
    

SSE-KMSを用いたファイルの書き込み

  1. 暗号化タイプとしてsse-kmsあるいはsse-kms:$KmsKeyを引き渡すことでソースディレクトリをマウントします。

    • デフォルトKMSマスターキーを用いて、SSE-KMSを用いたS3バケットのマウントを行うには以下を実行します。
    Scala
    dbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-kms")
    
    • 特定のKMSキーを用いて、SSE-KMSを用いたS3バケットのマウントを行うには以下を実行します。
    Scala
    dbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-kms:$KmsKey")
    
  2. SSE-KMSによる暗号化がされたS3バケットにファイルを書き込むには以下を実行します。

    Scala
    dbutils.fs.put(s"/mnt/$MountName", "<file content>")
    

設定

Databricks Runtime 7.3 LTS以降では、open-source Hadoop optionsを用いたS3Aファイルシステムの設定をサポートしています。グローバルプロパティ、あるいはバケットごとのプロパティを設定することができます。

グローバル設定

ini
# Global S3 configuration
spark.hadoop.fs.s3a.aws.credentials.provider <aws-credentials-provider-class>
spark.hadoop.fs.s3a.endpoint <aws-endpoint>
spark.hadoop.fs.s3a.server-side-encryption-algorithm SSE-KMS

バケットごとの設定

spark.hadoop.fs.s3a.bucket.<bucket-name>.<configuration-key>シンタックスを用いて、バケットごとのプロパティを設定することができます。これによって、異なるクレディンシャル、エンドポイントなどをバケットに設定することができます。

たとえば、グローバルS3設定に加えて、以下のキーを用いることで、それぞれのバケットに設定を行うことができます。

ini
# Set up authentication and endpoint for a specific bucket
spark.hadoop.fs.s3a.bucket.<bucket-name>.aws.credentials.provider <aws-credentials-provider-class>
spark.hadoop.fs.s3a.bucket.<bucket-name>.endpoint <aws-endpoint>

# Configure a different KMS encryption key for a specific bucket
spark.hadoop.fs.s3a.bucket.<bucket-name>.server-side-encryption.key <aws-kms-encryption-key>

アクセスリクエスト元によるバケット支払い

Requester Paysバケットへのアクセスを有効化するには、クラスターのAWS設定に以下の行を追加してください。

ini
spark.hadoop.fs.s3a.requester-pays.enabled true

注意
DatabricksではRequester Paysバケットに対するDelta Lakeの書き込みはサポートしていません。

Databricks 無料トライアル

Databricks 無料トライアル

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1