Amazon S3 | Databricks on AWS [2021/9/3時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Amazon S3は、大量のテキストやバイナリーデータのような非構造化データを格納するためのサービスです。
本書では、AWS S3バケットに対するDBFS(Databricksファイルシステム)を用いたバケットのマウント、あるいはAPIを用いて直接アクセスする方法を説明します。
重要!
Databricksランタイム7.3 LTS以降では、アップグレードされたバージョンのS3コネクターを使用します。以下の変更が既存コードに影響を及ぼす場合があります。
- S3Aファイルシステムは
FileSystem.close()
でリソースを解放します。ファイルシステムのキャッシュはデフォルトで有効化されているため、キャッシュされたファイルシステムへの参照を保持する他のスレッドが、クローズされた後に不適切にそのファイルを使用する場合があります。このため、FileSystem.close()
APIを使用するべきではありません。
- S3Aファイルシステムは出力ストリームをクローズする際にディレクトリのマーカーを削除しません。HADOOP-13230を含まないHadoopバージョンに依存するレガシーなアプリケーションは、ディレクトリの中にファイルが存在する場合でも空のディレクトリと誤解する場合があります。
DBFSを通じたS3バケットへのアクセス
このセクションでは、DBFSを通じてどのようにS3バケットにアクセスするのかを説明します。以下のことを実行できます。
S3バケットのマウント
Databricksファイルシステム(DBFS)を通じてS3バケットをマウントすることができます。このマウントはS3ロケーションへのポインターであり、決してデータはローカルでは同期されません。
クラスターを通じてマウントポイントが作成されると、当該クラスターのユーザーはすぐにマウントポイントにアクセスすることができます。別の稼働中のクラスターで、このマウントポイントを使用するには、稼働中のクラスターで新規に作成されたマウントポイントを利用可能にするためにdbutils.fs.refreshMounts()
を実行する必要があります。
S3バケットをマウントするには2つの方法が存在します。
AWSインスタンスプロファイルを用いたバケットのマウント
AWSのインスタンスプロファイルを用いてS3バケットに対する認証、承認を管理することができます。インスタンスプロファイルに許可されたアクセス権によってバケットのオブジェクトに対するアクセスのタイプが決定されます。ロールに書き込みアクセスがあるのであれば、マウントポイントのユーザーはバケットにオブジェクトを書き込むことができます。ロールに読み込みアクセスがあるのであれば、マウントポイントのユーザーはバケットのオブジェクトを読み込むことができます。
-
クラスターに対するインスタンスプロファイルを設定します。
-
バケットをマウントします。
Python
aws_bucket_name = ""
mount_name = ""
dbutils.fs.mount("s3a://%s" % aws_bucket_name, "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))
```
```scala:Scala
val AwsBucketName = ""
val MountName = ""
dbutils.fs.mount(s"s3a://$AwsBucketName", s"/mnt/$MountName")
display(dbutils.fs.ls(s"/mnt/$MountName"))
```
AWSキーを用いたバケットのマウント
AWSキーを用いてバケットをマウントすることができます。
重要!
キーを用いてS3バケットをマウントする際、S3バケットのすべてのオブジェクトに対して、すべてのユーザーが読み書き権限を有することになります。
以下の例では、キーを格納するためにDatabricksのシークレットを使用しています。シークレットのキーはURLエスケープする必要があります。
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "<aws-bucket-name>"
mount_name = "<mount-name>"
dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))
val AccessKey = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
// Encode the Secret Key as that can contain "/"
val SecretKey = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
val EncodedSecretKey = SecretKey.replace("/", "%2F")
val AwsBucketName = "<aws-bucket-name>"
val MountName = "<mount-name>"
dbutils.fs.mount(s"s3a://$AccessKey:$EncodedSecretKey@$AwsBucketName", s"/mnt/$MountName")
display(dbutils.fs.ls(s"/mnt/$MountName"))
ローカルファイルとしてS3オブジェクトにアクセス
S3バケットをDBFSにマウントすると、ローカルファイルパスを用いてS3オブジェクトにアクセスすることができます。
df = spark.read.text("/mnt/%s/..." % mount_name)
あるいは
df = spark.read.text("dbfs:/mnt/%s/..." % mount_name)
// scala
val df = spark.read.text(s"/mnt/$MountName/...")
あるいは
val df = spark.read.text(s"dbfs:/mnt/$MountName/...")
S3バケットのアンマウント
dbutils.fs.unmount("/mnt/mount_name")
dbutils.fs.unmount(s"/mnt/$MountName")
S3バケットへの直接アクセス
この方法では、AWSキーを用いてSparkのワーカーがS3バケットのオブジェクトに直接アクセスすることができます。キーを格納するためにDatabricksのシークレットを使用しています。
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "aws-region-id"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")
myRDD = sc.textFile("s3a://%s/.../..." % aws_bucket_name)
myRDD.count()
s3a://
パスに対するKMS暗号化の設定
ステップ1: インスタンスプロファイルの設定
Databricksでインスタンスプロファイルを作成します。
ステップ2: 設定で指定されたKMSキーにおけるキーユーザーとしてインスタンスプロファイルを追加
- AWSでIAMサービスに移動します。
- サイドバーの下部にあるEncryption Keysをクリックします。
- アクセス権を追加したいキーをクリックします。
- Key UsersセクションでAddをクリックします。
- IAMロールの隣にあるチェックボックスを選択します。
- Attachをクリックします。
ステップ3: 暗号化プロパティのセットアップ
AWS configurationsセッティングあるいはinitスクリプトを用いて、グローバルKMS暗号化プロパティをセットアップします。お使いのキーのARNをspark.hadoop.fs.s3a.server-side-encryption.key
キーに設定します。
Spark設定
spark.hadoop.fs.s3a.server-side-encryption.key arn:aws:kms:<region>:<aws-account-id>:key/<bbbbbbbb-ddd-ffff-aaa-bdddddddddd>
spark.hadoop.fs.s3a.server-side-encryption-algorithm SSE-KMS
バケットごとにKMS暗号化を設定することもできます。
Initスクリプト
initスクリプトset-kms.sh
を作成するために以下のコードをノートブックで実行し、スクリプトを実行するようにクラスターを設定することで、グローバル暗号化設定をセットアップします。
dbutils.fs.put("/databricks/scripts/set-kms.sh", """
#!/bin/bash
cat >/databricks/driver/conf/aes-encrypt-custom-spark-conf.conf <<EOL
[driver] {
"spark.hadoop.fs.s3a.server-side-encryption.key" = "arn:aws:kms:<region>:<aws-account-id>:key/<bbbbbbbb-ddd-ffff-aaa-bdddddddddd>"
"spark.hadoop.fs.s3a.server-side-encryption-algorithm" = "SSE-KMS"
}
EOL
""", True)
暗号化が動作していることを確認したら、グローバルinitスクリプトを用いて、すべてのクラスターの暗号化を設定します。
S3バケットデータの暗号化
Databricksではサーバーサイド暗号化を用いたデータの暗号化をサポートしています。このセクションでは、DBFSを通じてS3にファイルを書き込む際に、どのようにサーバーサイド暗号化を用いるのかを説明します。DatabricksではAmazon S3-managed encryption keys (SSE-S3)とAWS KMS–managed encryption keys (SSE-KMS)をサポートしています。
SSE-S3を用いたファイルの書き込み
-
SSE-S3を用いてS3バケットをマウントするには以下を実行します。
Scala
dbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-s3")
1. SSE-S3を用いて対応するS3バケットにファイルを書き込むには以下を実行します。
```scala:Scala
dbutils.fs.put(s"/mnt/$MountName", "<file content>")
SSE-KMSを用いたファイルの書き込み
-
暗号化タイプとして
sse-kms
あるいはsse-kms:$KmsKey
を引き渡すことでソースディレクトリをマウントします。- デフォルトKMSマスターキーを用いて、SSE-KMSを用いたS3バケットのマウントを行うには以下を実行します。
Scaladbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-kms")
- 特定のKMSキーを用いて、SSE-KMSを用いたS3バケットのマウントを行うには以下を実行します。
Scaladbutils.fs.mount(s"s3a://$AccessKey:$SecretKey@$AwsBucketName", s"/mnt/$MountName", "sse-kms:$KmsKey")
-
SSE-KMSによる暗号化がされたS3バケットにファイルを書き込むには以下を実行します。
Scala
dbutils.fs.put(s"/mnt/$MountName", "")
# 設定
Databricks Runtime 7.3 LTS以降では、[open\-source Hadoop options](https://hadoop.apache.org/docs/r3.1.3/hadoop-aws/tools/hadoop-aws)を用いたS3Aファイルシステムの設定をサポートしています。グローバルプロパティ、あるいはバケットごとのプロパティを設定することができます。
## グローバル設定
```ini:ini
# Global S3 configuration
spark.hadoop.fs.s3a.aws.credentials.provider <aws-credentials-provider-class>
spark.hadoop.fs.s3a.endpoint <aws-endpoint>
spark.hadoop.fs.s3a.server-side-encryption-algorithm SSE-KMS
バケットごとの設定
spark.hadoop.fs.s3a.bucket.<bucket-name>.<configuration-key>
シンタックスを用いて、バケットごとのプロパティを設定することができます。これによって、異なるクレディンシャル、エンドポイントなどをバケットに設定することができます。
たとえば、グローバルS3設定に加えて、以下のキーを用いることで、それぞれのバケットに設定を行うことができます。
# Set up authentication and endpoint for a specific bucket
spark.hadoop.fs.s3a.bucket.<bucket-name>.aws.credentials.provider <aws-credentials-provider-class>
spark.hadoop.fs.s3a.bucket.<bucket-name>.endpoint <aws-endpoint>
# Configure a different KMS encryption key for a specific bucket
spark.hadoop.fs.s3a.bucket.<bucket-name>.server-side-encryption.key <aws-kms-encryption-key>
アクセスリクエスト元によるバケット支払い
Requester Paysバケットへのアクセスを有効化するには、クラスターのAWS設定に以下の行を追加してください。
spark.hadoop.fs.s3a.requester-pays.enabled true
注意
DatabricksではRequester Paysバケットに対するDelta Lakeの書き込みはサポートしていません。