Amazon Redshift | Databricks on AWS [2021/3/17時点]の翻訳です。
Apache Spark SQLデータフレームにデータを読み込むデータソースとしてAmazon Redshiftを利用できますし、Redshiftのテーブルに書き込むことも可能です。Redshiftデータソースは、効率的にデータを転送するためにAmazon S3を使用し、自動的にRedshift上で適切にCOPY
、UNLOAD
を実行するためにJDBCを使用します。
Redshiftデータソースは、クエリの都度S3に対して大量のデータを出力するので、インタラクティブなクエリよりもバッチワークロードに適しています。Redshift上の同じデータに対して繰り返しクエリーを実行するのであれば、抽出したデータをApache Parquetなどの最適化されたフォーマットで保存することをお勧めします。
注意
DatabricksのVPCにおけるセキュリティモデルによる権限の問題が生じる場合があるため、RedshiftのクラスターはDatabricks管理のVPCの中に作るべきではありません。ご自身のVPCの中にRedshiftクラスターを作成し、DatabricksからRedshiftインスタンスに接続するためにVPC peeringを実施してください。
インストール
DatabricksランタイムにはAmazon Redshiftデータソースが含まれています。追加のインストールは不要です。Databricksランタイムリリースに含まれるRedshiftデータソースのバージョンを確認するには、Databricksランタイムのリリースノートを確認ください。
Redshiftデータソースには、Redshiftと互換性のあるJDBCドライバーが必要となります。RedshiftはPostgreSQLデータベースシステムに基づいているので、Databricksランタイムに含まれているPostgreSQLのJDBCドライバーか、Amazonが推奨するRedshiftのJDBCドライバーを使用することができます。PostgreSQLのJDBCドライバーを使う場合にはインストールは不要です。リリースに含まれるPostgreSQL JDBCドライバーのバージョンはリリースノートを確認できます。
RedshiftのJDBCドライバーを使う際には、手動でのインストールが必要となります。Redshiftドライバーは、postgresql
とredshift
のJDBC接続の両方のハンドラーとして自身を登録しています。クラスター上でRedshiftとPostgreSQL両方と連携すると競合が発生する可能性があります。このため、DatabricksはRedshiftのJDBCドライバーをバンドルしていません。詳細に関しては、Stack Overflowのポストを参考にしてください。RedshiftのJDBCドライバーをバンドルすると、JDBC 4.0、4.1、4.2バージョンを選択することができなくなります。
RedshiftのJDBCドライバーを手動でインストールするには:
注意
Redshift JDBCドライバーv1.2.16には、SQLクエリーにwhere
句を指定した際に空のデータが返却される問題があります。最新のドライバーを使用することをお勧めします。
使用法
ドライバーに応じて、JDBC URLを設定します。
- バンドルされているPostgreSQL JDBCドライバー:
jdbc:postgresql://...
- Redshift JDBCドライバー:
jdbc:redshift://...
以下のサンプルはRedshiftドライバーを使用した例です。PostgreSQL JDBCドライバーを使用する際にはurl
を置き換えてください。
AWSの認証情報を設定することで、Python、SQL、R、Scalaで提供されているSparkデータソースAPI経由で、データソースにアクセスできるようになります。
Python
# Read data from a table
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
.option("dbtable", "<your-table-name>") \
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
.load()
# Read data from a query
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
.option("query", "select x, count(*) <your-table-name> group by x") \
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
.load()
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
.option("dbtable", "<your-table-name>") \
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
.mode("error") \
.save()
# Write back to a table using IAM Role based authentication
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
.option("dbtable", "<your-table-name>") \
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
.mode("error") \
.save()
SQL
SQLによるデータ読み込み
CREATE TABLE example_table
USING com.databricks.spark.redshift
OPTIONS (
dbtable '<your-table-name>',
tempdir 's3a://<your-bucket>/<your-directory-path>',
url 'jdbc:redshift://<the-rest-of-the-connection-string>'
);
SQLによるデータ書き込み
-- Create a new table, throwing an error if a table with the same name already exists
CREATE TABLE example_table
USING com.databricks.spark.redshift
OPTIONS (
dbtable '<your-table-name>',
tempdir 's3a://<your-bucket>/<your-directory-path>'
url 'jdbc:redshift://<the-rest-of-the-connection-string>'
)
AS SELECT * FROM table_to_save;
R
Rによるデータ読み込み
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Scala
// Get some data from a Redshift table
val df: DataFrame = spark.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
.option("dbtable", "<your-table-name>")
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
.load()
// Also load data from a Redshift query
val df: DataFrame = spark.read
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
.option("dbtable", "<your-table-name>")
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
.option("dbtable", "<your-table-name>")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
.mode("error")
.save()
設定
S3とRedshiftの認証
以下の図に示すように、データソースにはいくつかのネットワーク接続が関係します。
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Redshiftとデータをやり取りする際には、データソースはS3にデータを書き込みます。そのため、S3バケット(tempdir
設定パラメータで指定します)に対する読み書きのための認証情報が必要となります。
注意
データソースは、S3に生成される一時ファイルをクリーンアップしません。このため、一定期間後にオブジェクトを自動で削除するように、オブジェクトライフサイクル設定を伴う専用のS3バケットを使用することをお勧めします。どのようにファイルを暗号化するかに関しては、本ドキュメントの暗号化を参照ください。
以下のセクションではそれぞれの接続における認証設定を説明します。
SparkのドライバーノードからRedshiftへの接続
Sparkドライバーノードは、usernameとpasswordを用いて、JDBC経由でRedshiftに接続します。Redshiftは接続を認証するためのIAMロールの使用をサポートしていません。デフォルトでは、接続はSSL暗号化されます。詳細は暗号化を参照ください。
SparkからS3への接続
Redshiftに対する読み書きを行う際、S3は大量データの仲介役として動作します。Sparkは、HadoopファイルシステムインタフェースとAmazon Java SDKのS3クライアントの両方を使用してS3にアクセスします。この接続はAWSのキーあるいはインスタンスプロファイル(DBFSマウントポイントはサポートされていないので、AWSキーを使いたくない場合には、クラスターのインスタンスプロファイルを使用する必要があります)。認証情報を渡すためには4つの方法があります。
- デフォルトのクレディンシャルプロバイダーチェーン(多くのケースでベストな選択肢となります): AWSの認証情報はDefaultAWSCredentialsProviderChainを通じて自動的に収集されます。S3への認証を行う際にインスタンスプロファイルを使用している際には、多くの場合にこの方法を使用することになります。
-
Hadoop設定にキーをセットする: Hadoop configuration propertiesを使用してAWSキーを設定することができます。
tempdir
の設定でs3a://
ファイルシステムをポイントしているのであれば、HadoopのXML設定ファイルにfs.s3a.access.key
とfs.s3a.secret.key
プロパティを設定するか、sc.hadoopConfiguration.set()
を呼び出してSparkのグローバルHadoop設定にキーをセットすることができます。s3n://
ファイルシステムを使用している場合は、以下の例に示すようにレガシーな設定キーを指定することができます。
Scala
s3a
ファイルシステムを使用している場合は、以下を追加します。
sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
レガシーなs3n
ファイルシステムを使用している場合には、以下を追加します。
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
以下のコマンドはSparkの内部処理に依存している部分がありますが、PySparkでも動作するはずで、将来的に振る舞いが変更される可能性は低いです。
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
- IAMロールを前提とする: インスタンスプロファイルが使用するIAMロールを使用することもできます。ロールのARNを指定するためには、インスタンスプロファイルをクラスターにアタッチし、以下の設定キーを提供する必要があります。
Scala
sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
Python
sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole")
sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)
RedshiftからS3への接続
また、RedshiftもCOPY
、UNLOAD
クエリーの際にはS3にアクセスします。この接続の際には3つの方法で認証を行うことができます:
-
IAMロールを前提とする(最もセキュアです):
COPY
、UNLOAD
の際、RedshiftにIAMロールを許可することができ、Redshiftがそのロールを使用するようにデータソースを設定できます。- バケットにアクセスできるS3権限をIAMロールに許可する
- Authorizing Amazon Redshift to Access Other AWS Services On Your Behalfのガイドに従って、Redshiftがこのロールを使用できるようにロールの信頼ポリシーを設定する
- Authorizing COPY and UNLOAD Operations Using IAM Rolesのガイドに従い、IAMロールとRedshiftクラスターを関連づける
-
SparkのS3認証情報をRedshiftに転送する:
forward_spark_s3_credentials
オプションがtrue
に設定されているのであれば、データソースは自動的にSparkが使用している認証情報を検知し、JDBC経由でRedshiftに転送します。Sparkがインスタンスプロファイルを使用してS3にアクセスしているのであれば、テンポラリーSTS認証情報のセットがRedshiftに転送されます。そうでなければ、AWSキーが転送されます。JDBCのクエリーにこれらの認証情報が埋め込まれるので、この認証方式を使用する際には、必ずJDBC接続におけるSSL暗号化を有効化してください。 -
セキュリティトークンサービス(STS)を使用する: AWSのSecurity Token Serviceで生成される一時的なキーを使用するために
temporary_aws_access_key_id
、temporary_aws_secret_access_key
、temporary_aws_session_token
を設定することも可能です。JDBCのクエリーにこれらの認証情報が埋め込まれるので、この認証方式を使用する際には、必ずJDBC接続におけるSSL暗号化を有効化してください。この方式を選択した際には、読み書きの操作が成功する前に、認証情報が期限切れになる可能性があることに注意してください。
これら3つのオプションを組み合わせることはできないので、必ず明示的にどれか一つを選択する必要があります。
暗号化
-
JDBCをセキュアにする: JDBC URLにSSLに関連する設定が存在しない場合、デフォルトでデータソースはSSL暗号化を有効にし、Redshiftサーバーが信頼できるか(
sslmode=verify-full
かどうか)を検証します。このため、初回はAmazonサーバーから自動でサーバー証明書がダウンロードされます。これに失敗した場合は、フォールバックとして事前にバンドルされている証明書が使用されます。これは、Redshift、PostgreSQLドライバー両方に適用されます。SSLの自動設定は、2.1.1-db4 cluster image (未サポート)で導入されました。これより前のリリースでは、SSLは自動で設定されず、デフォルトのJDBC設定(SSL無効)が使用されます。- この機能において不具合が生じた場合、あるいはSSLを無効化したい場合には、自身のコードにおける
DataFrameReader
あるいはDataFrameWriter
で.option("autoenablessl", "false")
オプションを設定することができます。 - カスタムのSSL設定を適用したい場合には、Redshiftのドキュンメント、Using SSL and Server Certificates in Java、JDBC Driver Configuration Optionsの手順に従ってください。JDBC urlに記載されたSSL関連の設定は優先されます。つまり、自動設定は停止します。
- この機能において不具合が生じた場合、あるいはSSLを無効化したい場合には、自身のコードにおける
-
S3に格納されたUNLOADデータを暗号化する(Redshiftから読み込みを行なった際に格納されるデータ): Redshiftのドキュメント、Unloading Data to S3によれば、「UNLOADはAmazon S3のサーバーサイド暗号化(SSE-S3)によってデータを暗号化します」とあります。
- また、Redshiftはクライアントサイドのカスタムキーによる暗号化(Unloading Encrypted Data Filesを参照ください)もサポートしていますが、データソースには要求されるシンメトリックキーを指定する機能を有していません。
-
S3に格納されたCOPYデータを暗号化する(Redshiftに書き込みを行う際に格納されるデータ): Redshiftドキュメント、Loading Encrypted Data Files from Amazon S3には以下の記載があります。
- AWSが管理する暗号化キーによるサーバーサイド暗号化(SSE-S3、SSE-KMS)、あるいはクライアントサイド暗号化、これらの両方を用いて、Amazon S3にアップロードされたデータファイルをロードするのに
COPY
コマンドを使用することができます。COPYは、顧客管理のキーによるAmazon S3サーバーサイド暗号化(SSE-C)をサポートしていません。 - この機能を使う際には、お使いのHadoop S3ファイルシステムがAmazon S3 encryptionを使うように設定してください。この際、書き込まれるファイルの一覧を含む
MANIFEST
ファイルは暗号化されません。
- AWSが管理する暗号化キーによるサーバーサイド暗号化(SSE-S3、SSE-KMS)、あるいはクライアントサイド暗号化、これらの両方を用いて、Amazon S3にアップロードされたデータファイルをロードするのに
パラメーター
Spark SQLで提供されるパラメーターマップ、OPTIONSは以下の通りです。
原文を参照ください。
追加の設定オプション
文字列カラムの最大サイズの設定
Redshiftのテーブルを作成する際、デフォルトの振る舞いにおいては、文字列のカラムはTEXT
カラムとなります。RedshiftはTEXT
カラムをVARCHAR(256)
として格納しますので、この列は最大256文字となります(ソース)。
よりサイズの大きいカラムにするためには、それぞれの文字列のカラムの最大長を指定するためのカラムメタデータフィールドmaxlength
を使用できます。これは、デフォルトより小さいサイズのカラムを宣言することで、容量を節約し、パフォーマンスの改善を行う際にも有効です。
注意 Sparkの制限のため、SQLとRのAPIでは、カラムメタデータ変更はサポートされていません。
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
以下のサンプルは、Spark Scala APIを用いて複数のカラムのメタデータを更新するものです。
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
カスタムカラムタイプの設定
手動でカラムタイプを設定する際には、redshift_type
カラムメタデータを使用します。例えば、Spark SQL Schema -> Redshift SQL
のタイプマッピングを上書きして、ユーザー定義のカラムタイプを指定したい場合には、以下のようにします。
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
カラムのエンコーディングの設定
テーブルを作成する際に、カラムごとにカラムメタデータフィールドencoding
を指定します。(利用可能なエンコーディングに関してはAmazonのドキュメントを参照ください)
カラムの説明の設定
Redshiftでは、カラムに対する説明を追加することができ、一般的なクエリーツールで(COMMENT
コマンドを利用して)参照することができます。カラムメタデータフィールドdescription
でカラムごとの説明を指定できます。
Redshiftへのクエリープッシュダウン
Sparkオプティマイザーは、以下のオペレーターをRedshiftにプッシュダウンします。
Filter
Project
Sort
Limit
Aggregation
Join
Project
とFilter
においては、以下の表現をサポートしています。
- ほとんどのブーリアンオペレーター
- 比較
- 基本的な算術処理
- 数字、文字列のキャスト
- ほとんどの文字列関数
- 全体をRedshiftにプッシュダウンできる場合は、スカラーのサブクエリ
注意
プッシュダウンは、日付とタイムスタンプを操作する表現はサポートしていません。
Aggregation
においては、以下の集約関数をサポートしています。
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
利用できる場合には、DISTINCT
句と組み合わせることができます。
Join
においては、以下のジョインをサポートしています。
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- オプティマイザーによってJoinに書き込まれるサブクエリ。例:
WHERE EXISTS
、WHERE NOT EXISTS
注意
Joinのプッシュダウンは、FULL OUTER JOIN
をサポートしていません。
LIMIT
を使うクエリーにおいて、プッシュダウンが最も有効かもしれません。SELECT * FROM large_redshift_table LIMIT 10
のようなクエリーは、中間データとしてテーブル全体をS3にUNLOADするため、非常に時間がかかる場合があります。プッシュダウンを活用することで、LIMIT
はRedshiftで実行されます。集計を伴うクエリーにおいては、集計処理をRedshiftにプッシュダウンすることで、転送されるデータ量を削減することが可能になります。
Redshiftへのクエリープッシュダウンはデフォルトで有効になっています。spark.databricks.redshift.pushdown
をfalse
に設定することで無効化することができます。無効化されている場合においても、Sparkはフィルターとカラムの削除はRedshiftにプッシュダウンします。
トランザクションの保証
本章では、Sparkに対するRedshiftデータソースのトランザクション保証を説明します。
RedshiftとS3の特性のバックグラウンド
Redshiftのトランザクション保証に関する一般的な情報に関しては、RedshiftドキュメントのManaging Concurrent Write Operationsを参照してください。要約すると、RedshiftのBEGINコマンドのドキュメントによれば、Redshiftはserializable isolationを提供しています。
4つのトランザクションレベルのいずれも使用できますが、Amazon Redshiftはすべての分離レベルをシリアライズ可能として処理します。
Redshiftドキュメントによれば、
Amazon Redshiftは、それぞれ実行されたSQLコマンドを個別にコミットする自動コミットをデフォルトでサポートしています。
すなわち、COPY
やUNLOAD
のような個々のコマンドは原子的、かつトランザクションが保証されます。一方、複数のコマンドやクエリーの原子性を保証するときには、明示的なBEGIN
とEND
が必要になります。
Redshiftに読み書きを行う際、データソースはS3に読み書きを行います。SparkとRedshiftは部分的な出力を生成し、複数のファイルとしてS3に格納します。Amazon S3 Data Consistency Modelによると、S3バケットの一覧操作は結果的整合性を保証しています。このため、データソースの結果的整合性によるデータの欠損や欠如を避けるためにこれらのファイルは、特定のサイズに到達する必要があります。
Sparkに対するRedshiftデータソースの保証
既存テーブルへの追加
Redshiftに行を追加する際、データソースはCOPYコマンドを使用し、結果的整合性のS3操作に備えるためにmanifestsを指定します。結果として、spark-redshift
は、通常のRedshiftのCOPY
コマンドと同じ原子性、トランザクション特性を持つ既存テーブルに追加を行います。
新規テーブルの作成(SaveMode.CreateIfNotExists
)
テーブルの作成は、CREATE TABLE
コマンド、および初期のレコードを追加するCOPYコマンドから構成される二段階のプロセスとなります。両方の操作は同一のトランザクションとして実行されます。
既存テーブルの上書き
デフォルトでは、対象テーブルの削除、空テーブルの新規作成、行追加から構成されるトランザクションで上書きを行います。
usestagingtable
がfalse
に設定されている場合、上書き操作の原子性を犠牲にしてRedshiftでの上書きに必要なステージング領域を削減できるように、データソースは新規テーブルに行を追加する前に、DELETE TABLE
コマンドをコミットします。
Redshiftテーブルへのクエリー
クエリーにおいては、RedshiftのUNLOADコマンドを使用し、結果をS3に保存し、S3の結果的整合性の操作に備えるためにmanifestsを使用します。結果として、SparkのRedshiftデータソースからのクエリーは、通常のRedshiftクエリーと同様の一貫性を持ちます。
一般的な問題及び解決策
S3バケットとRedshitクラスターが異なるAWSリージョンに存在する
デフォルトでは、S3バケットとRedshiftクラスターが異なるAWSリージョンに存在している場合にはS3 <-> Redshiftのコピーは動作しません。
異なるリージョンにS3バケットがあるケースで、Redshiftのテーブルから読み取りを行おうとすると、以下のようなエラーが発生します。
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
同様のケースで書き込みを行おうとすると、以下のようなエラーが発生します。
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
-
書き込み: RedshiftのCOPYコマンドでは、明示的にS3バケットのリージョンを指定することができますので、このケースにおいては
extracopyoptions
にregion 'the-region-name'
を追加することで、Redshiftが適切に動作するようになります。例えば、バケットがUS East (Virginia)にある場合には、Scala APIでは以下のように指定します。
Scala
.option("extracopyoptions", "region 'us-east-1'")
Databricksランタイム6.2以降では、代わりにawsregion
を使用することも可能です。
Scala
.option("awsregion", "us-east-1")
-
読み取り: RedshiftのUNLOADコマンドでもS3バケットのリージョンを指定できます。Databricksランタイム6.2以降では
awsregion
を設定することで、正常に読み取りを行えるようになります。
Scala
.option("awsregion", "us-east-1")
Databricksランタイム6.1以前では、データソースは異なるリージョンのバケットへの書き込みをサポートしていません。唯一のワークアラウンドは、Redshiftと同じリージョンに新たなバケットを作成することです。
S3の認証を行う際にインスタンスプロファイルを使用した場合に、予期しないS3ServiceException credentials errorが発生する
S3の認証を行う際にインスタンスプロファイルを使用していて、予期しないS3ServiceException
エラーに直面した際には、S3 URIのtempdir
、Hadoopの設定、あるいはDefaultAWSCredentialsProviderChainで検証される全てのソースにおいてAWSのアクセスキーが指定されていることを確認してください。これらのソースの設定は、インスタンスプロファイルの認証情報を上書きします。
こちらが、インスタンスプロファイルの設定を間違って上書きしてしまった際に表示される可能性のあるエラーメッセージです。
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;
対応するRedshiftの操作が完了しているにもかかわらず、長時間のSparkクエリーがハングアップする
Redshiftに対して大量のデータを読み書きする際に、Redshiftのモニタリングのページでは、Redshiftで対応するLOAD
、UNLOAD
の処理は完了していると表示され、クラスターはアイドル状態になっていると表示されているにもかかわらず、Sparkのクエリーがハングアップする場合があります。これは、RedshiftとSpark間の接続がタイムアウトした場合に発生します。これを避けるには、JDBCフラグのtcpKeepAlive
が有効化されており、TCPKeepAliveMinutes
が小さい値(1など)になっていることを確認してください。
詳細はAmazon Redshift JDBC Driver Configurationを参照ください。
タイムスタンプとタイムゾーンのセマンティクス
データを読み込む際、RedshiftのTIMESTAMP
とTIMESTAMPTZ
データタイプの両方は、SparkのTimestampType
にマッピングされ、値は協定世界時(UTC)に変換され、UTCのタイムスタンプとして格納されます。RedshiftのTIMESTAMP
においては、値はいかなるタイムゾーン情報を持たないものとして、ローカルのタイムゾーンが用いられます。Redshiftテーブルに書き込む際には、SparkのTimestampType
はRedshiftのTIMESTAMP
データタイプにマッピングされます。
移行ガイド
現時点では、データソースがSparkのS3認証情報をRedshiftに転送する前に、明示的にforward_spark_s3_credentials
を指定する必要があります。aws_iam_role
やtemporary_aws_*
認証機構を使用している場合には、この変更の影響はありません。しかし、過去のデフォルト動作に依存している場合、これまでのRedshiftからS3への認証機構を使い続けるのであれば、明示的にforward_spark_s3_credentials
をtrue
に設定する必要があります。3つの認証機構とセキュリティ上のトレードオフに関する議論については、本記事のS3とRedshiftの認証を参照ください。