Configure Databricks S3 commit service-related settings | Databricks on AWS [2022/1/20時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksでは、複数クラスターからAmazon S3への書き込みを調整するコミットサービスを実行しています。このサービスはDatabricksのコントロールプレーンで実行されています。追加のセキュリティ対策として、ダイレクトアップロード最適化の無効化で説明されいるように、このサービスのダイレクトアップロード最適化を無効化することができます。お使いのS3バケットへのアクセスをさらに制限するには、追加のバケットセキュリティ制限を参照ください。
S3コミットサービスに関連するAWS GuardDutyアラートを受け取った場合には、S3コミットサービスに関連するAWS GuardDutyのアラートを参照ください。
コミットサービスについて
S3コミットサービスは、特定のケースにおいて単一のテーブルに対する複数クラスターの書き込みの一貫性保証をサポートするものです。例えばコミットサービスはDelta LakeのACIDトランザクションの実装をサポートしています。
デフォルトの設定では、DatabricksはコミットサービスのAPI呼び出しの中で、データプレーンからコントロールプレーンに一時的なAWSクレディンシャルを送信します。インスタンスプロファイルのクレディンシャルは6時間有効です。
データプレーンはデータを直接S3に書き込み、コントロールプレーンのS3コミットサービスがコミットログのアップロード(以下で説明するマルチパートアップロードの完了)を最終化することで一貫性コントロールを提供します。コミットサービスはS3からいかなるデータも読み込みません。ファイルが存在しない場合には新規ファイルをS3に作成します。
DatabricksのコミットサービスによってS3に書き込まれる最も一般的なデータは、お使いのデータから収集されるカラムの最小値、最大値などの統計情報を含むDeltaログです。ほとんどのDeltaログデータは、Amazon S3のマルチパートアップロードを用いてデータプレーンから送信されます。
クラスターがDeltaログをS3に書き込むために、マルチパートのデータのステージングを行うと、DatabricksコントロールプレーンのS3のコミットサービスが、S3に処理が完了したことを通知することでS3マルチパートアップロードを完了します。小規模な更新処理のパフォーマンスを最適化するために、デフォルトではコミットサービスは、時々小規模な更新処理をコントロールプレーンからS3に直接プッシュします。このダイレクトアップデート最適化を無効化することができます。ダイレクトアップロード最適化の無効化を参照ください。
Delta Lakeに加えて、以下のDatabricksの機能が同じS3コミットサービスを使用しています。
Amazonはオブジェクトが存在していないのみオブジェクトを配置するオペレーションを提供していないため、コミットサービスは必要となります。Amazon S3は分散システムです。S3が同じオブジェクトに同時に書き込みを行う複数のリクエストを受け取った場合、最後に書き込まれたオブジェクト以外は全て上書きされます。コミットを中央で検証する機能なしに、異なるクラスターからの同時コミットはテーブルを破損させる可能性があります。
S3コミットサービスに関連するAWS GuardDutyのアラート
AWS GuardDutyを使用しており、AWS IAMインスタンスプロファイルを用いたデータアクセスを行なっている場合、GuardDutyはDelta Lake、構造化ストリーミング、オートローダー、COPY INTO
に関するDatabricksの挙動に対してアラートを生成する場合があります。これらのアラートは、インスタンスのクレディンシャルの流出に関連するもので、デフォルトでは有効化されています。これらのアラートにはUnauthorizedAccess:IAMUser/InstanceCredentialExfiltration.InsideAWS
といったものが含まれます。
元々のS3データアクセスのIAMロールを担うAWSインスタンスプロファイルを作成することで、S3に関連するGuardDutyアラートに対応するように設定を行うことが可能です。
インスタンスプロファイルのクレディシャルを使うのではなく、この新規インスタンスプロファイルは短期間のトークンを用いてクラスターがロールを担うように設定を行います。この機能はすでに最近のDatabricksランタイムには存在しており、クラスターポリシーを通じて全体に設定を強制することができます。
-
未設定の場合には、S3データにアクセスするために通常のインスタンスプロファイルを作成します。このインスタンスプロファイルは、S3のデータに直接アクセスするためにインスタンスプロファイルのクレディシャルを使用します。
このセクションではインスタンスプロファイルのロールARNを
<data-role-arn>
とします。 -
トークンを使用する新規インスタンスプロファイルを作成し、データに直接アクセスするインスタンスプロファイルを参照します。皆様のクラスターはこの新規のトークンベースのインスタンスプロファイルを参照することになります。Databricksにおけるインスタンスプロファイルを用いたS3バケットへのセキュアなアクセスを参照ください。
このインスタンスプロファイルは、いかなる直接S3アクセスを必要としません。代わりとして、データアクセスに必要になるIAMロールを担うことができるアクセス権のみを必要とします。このセクションではこのインスタンスプロファイルのロールARNを
<cluster-role-arn>
とします。-
新規クラスターインスタンスプロファイルのIAMロール(
<cluster-role-arn>
)にアタッチされるIAMポリシーを追加します。新規クラスターインスタンスプロファイルのIAMロールに以下のポリシー文を追加し、<data-role-arn>
をもともとバケットにアクセスするために使用していたインスタンスプロファイルのARNで置き換えます。JSON{ "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "<data-role-arn>" }
-
既存のデータアクセス用のIAMロールに信頼ポリシー文を追加し、
<cluster-role-arn>
を、もともとバケットにアクセスするために使用していたインスタンスプロファイルのARNで置き換えます。JSON{ "Effect": "Allow", "Principal": { "AWS": "<cluster-role-arn>" }, "Action": "sts:AssumeRole" }
-
-
DBFSを使用せずにS3に直接接続を行うノートブックのコードを使用するには、お使いのクラスターがこの新規トークンベースのインスタンスプロファイルを用いて、データアクセスのロールを担うように設定を行う必要があります。
-
全てのバケットに対するS3アクセスをクラスターに対して設定します。以下のクラスターSpark設定を追加します。
inifs.s3a.credentialsType AssumeRole fs.s3a.stsAssumeRole.arn <data-role-arn>
-
Databricksランタイム8.3以降では、特定のバケットに対して設定を行うことができます。
inifs.s3a.bucket.<bucket-name>.aws.credentials.provider org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider fs.s3a.bucket.<bucket-name>.assumed.role.arn <data-role-arn>
-
S3バケットのDBFSマウントを使用している場合には、お使いのコードでバケットをマウントし、
AssumeRole
設定を追加する必要があります。このステップはDBFSマウントでのみ必要であり、お使いのワークスペースのルートS3バケットのルートDBFSストレージへのアクセスに関しては不要です。以下の例ではPythonを使用しています。
# If other code has already mounted the bucket without using the new role, unmount it first
dbutils.fs.unmount("/mnt/<mount-name>")
# mount the bucket and assume the new role
dbutils.fs.mount("s3a://<bucket-name>/", "/mnt/<mount-name>", extra_configs = {
"fs.s3a.credentialsType": "AssumeRole",
"fs.s3a.stsAssumeRole.arn": "<role-arn>"
})
ダイレクトアップロード最適化の無効化
小規模な更新のパフォーマンスの最適化のために、デフォルトではコミットサービスは時に小規模更新処理を直接コントロールプレーンからS3にプッシュします。この最適化を無効化するには、Sparkのパラメーターspark.hadoop.fs.s3a.databricks.s3commit.directPutFileSizeThreshold
を0
に設定します。この設定をクラスターのSpark設定に追加するか、グローバルinitスクリプトに設定することができます。
この機能を無効化することで、定期的に小規模更新を行うニアリアルタイムの構造化ストリーミングのクエリーの性能に僅かなインパクトがあるかもしれません。プロダクション環境でこの機能を無効化する前には、パフォーマンスへのインパクトをテストすることを検討してください。
追加のバケットセキュリティ制限
以下のバケットポリシー設定を行うことで、お使いのS3バケットに対するアクセスをさらに制限することができます。
これらのいずれの変更もGuardDutyのアラートには影響を与えません。
- 特定のIPとS3オペレーションにバケットアクセスを制限する。 お使いのバケットに対する更なるコントロールに興味があるのであれば、特定のIPアドレスからS3バケットへのアクセスのみに限定することができます。例えば、ご自身の環境とS3コミットサービスを含むDatabricksコントロールプレーンのみにアクセスを許可することができます。S3へのアクセス制限を参照ください。この設定によって、他の場所からクレディンシャルが用いられるリスクを制限します。
-
特定ディレクトリ外でのS3オペレーションのタイプを制限する。S3コミットサービスに必要とされるディレクトリ以外のS3バケットに対するDatabricksコントロールプレーンからのアクセスを拒否することができます。また、DatabricksのIPアドレスからこれらのディレクトリに対するオペレーションを、S3オペレーションの
put
とlist
に制限することができます。(S3コミットサービスを含む)Databricksのコントロールプレーンはバケットに対するget
アクセスを必要としません。JSON{ "Sid": "LimitCommitServiceActions", "Effect": "Deny", "Principal": "*", "NotAction": [ "s3:ListBucket", "s3:GetBucketLocation", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::<bucket-name>/*", "arn:aws:s3:::<bucket-name>" ], "Condition": { "IpAddress": { "aws:SourceIp": "<control-plane-ip>" } } }, { "Sid": "LimitCommitServicePut", "Effect": "Deny", "Principal": "*", "Action": "s3:PutObject", "NotResource": [ "arn:aws:s3:::<bucket-name>/*_delta_log/*", "arn:aws:s3:::<bucket-name>/*_spark_metadata/*", "arn:aws:s3:::<bucket-name>/*offsets/*", "arn:aws:s3:::<bucket-name>/*sources/*", "arn:aws:s3:::<bucket-name>/*sinks/*", "arn:aws:s3:::<bucket-name>/*_schemas/*" ], "Condition": { "IpAddress": { "aws:SourceIp": "<control-plane-ip>" } } }, { "Sid": "LimitCommitServiceList", "Effect": "Deny", "Principal": "*", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::<bucket-name>", "Condition": { "StringNotLike": { "s3:Prefix": [ "*_delta_log/*", "*_spark_metadata/*", "*offsets/*", "*sources/*", "*sinks/*", "*_schemas/*" ] }, "IpAddress": { "aws:SourceIp": "<control-plane-ip>" } } }
<control-plane-ip>
をお使いのリージョンのDatabricksコントロールプレーンのIPアドレス、<bucket-name>
をお使いのS3バケット名で置き換えてください。