LoginSignup
0
0

More than 3 years have passed since last update.

DatabricksにおけるRedshift連携

Last updated at Posted at 2021-04-29

Amazon Redshift | Databricks on AWS [2021/3/17時点]の翻訳です。

Apache Spark SQLデータフレームにデータを読み込むデータソースとしてAmazon Redshiftを利用できますし、Redshiftのテーブルに書き込むことも可能です。Redshiftデータソースは、効率的にデータを転送するためにAmazon S3を使用し、自動的にRedshift上で適切にCOPYUNLOADを実行するためにJDBCを使用します。

Redshiftデータソースは、クエリの都度S3に対して大量のデータを出力するので、インタラクティブなクエリよりもバッチワークロードに適しています。Redshift上の同じデータに対して繰り返しクエリーを実行するのであれば、抽出したデータをApache Parquetなどの最適化されたフォーマットで保存することをお勧めします。

注意
DatabricksのVPCにおけるセキュリティモデルによる権限の問題が生じる場合があるため、RedshiftのクラスターはDatabricks管理のVPCの中に作るべきではありません。ご自身のVPCの中にRedshiftクラスターを作成し、DatabricksからRedshiftインスタンスに接続するためにVPC peeringを実施してください。

インストール

DatabricksランタイムにはAmazon Redshiftデータソースが含まれています。追加のインストールは不要です。Databricksランタイムリリースに含まれるRedshiftデータソースのバージョンを確認するには、Databricksランタイムのリリースノートを確認ください。

Redshiftデータソースには、Redshiftと互換性のあるJDBCドライバーが必要となります。RedshiftはPostgreSQLデータベースシステムに基づいているので、Databricksランタイムに含まれているPostgreSQLのJDBCドライバーか、Amazonが推奨するRedshiftのJDBCドライバーを使用することができます。PostgreSQLのJDBCドライバーを使う場合にはインストールは不要です。リリースに含まれるPostgreSQL JDBCドライバーのバージョンはリリースノートを確認できます。

RedshiftのJDBCドライバーを使う際には、手動でのインストールが必要となります。Redshiftドライバーは、postgresqlredshiftのJDBC接続の両方のハンドラーとして自身を登録しています。クラスター上でRedshiftとPostgreSQL両方と連携すると競合が発生する可能性があります。このため、DatabricksはRedshiftのJDBCドライバーをバンドルしていません。詳細に関しては、Stack Overflowのポストを参考にしてください。RedshiftのJDBCドライバーをバンドルすると、JDBC 4.0、4.1、4.2バージョンを選択することができなくなります。

RedshiftのJDBCドライバーを手動でインストールするには:

  1. Amazonからドライバーをダウンロードします。
  2. ドライバーをDatabricksワークスペースにアップロードします。
  3. クラスターにライブラリをインストールします。

注意
Redshift JDBCドライバーv1.2.16には、SQLクエリーにwhere句を指定した際に空のデータが返却される問題があります。最新のドライバーを使用することをお勧めします。

使用法

ドライバーに応じて、JDBC URLを設定します。

  • バンドルされているPostgreSQL JDBCドライバー: jdbc:postgresql://...
  • Redshift JDBCドライバー: jdbc:redshift://...

以下のサンプルはRedshiftドライバーを使用した例です。PostgreSQL JDBCドライバーを使用する際にはurlを置き換えてください。

AWSの認証情報を設定することで、Python、SQL、R、Scalaで提供されているSparkデータソースAPI経由で、データソースにアクセスできるようになります。

Python

# Read data from a table
df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
  .option("dbtable", "<your-table-name>") \
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
  .load()

# Read data from a query
df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
  .option("query", "select x, count(*) <your-table-name> group by x") \
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
  .load()

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
  .option("dbtable", "<your-table-name>") \
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
  .mode("error") \
  .save()

# Write back to a table using IAM Role based authentication
df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>") \
  .option("dbtable", "<your-table-name>") \
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>") \
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
  .mode("error") \
  .save()

SQL

SQLによるデータ読み込み

CREATE TABLE example_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable '<your-table-name>',
  tempdir 's3a://<your-bucket>/<your-directory-path>',
  url 'jdbc:redshift://<the-rest-of-the-connection-string>'
);

SQLによるデータ書き込み

-- Create a new table, throwing an error if a table with the same name already exists
CREATE TABLE example_table
USING com.databricks.spark.redshift
OPTIONS (
  dbtable '<your-table-name>',
  tempdir 's3a://<your-bucket>/<your-directory-path>'
  url 'jdbc:redshift://<the-rest-of-the-connection-string>'
)
AS SELECT * FROM table_to_save;

R

Rによるデータ読み込み

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Scala

// Get some data from a Redshift table
val df: DataFrame = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "<your-table-name>")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load()

// Also load data from a Redshift query
val df: DataFrame = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "<your-table-name>")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "<your-table-name>")
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save()

設定

S3とRedshiftの認証

以下の図に示すように、データソースにはいくつかのネットワーク接続が関係します。

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Redshiftとデータをやり取りする際には、データソースはS3にデータを書き込みます。そのため、S3バケット(tempdir設定パラメータで指定します)に対する読み書きのための認証情報が必要となります。

注意
データソースは、S3に生成される一時ファイルをクリーンアップしません。このため、一定期間後にオブジェクトを自動で削除するように、オブジェクトライフサイクル設定を伴う専用のS3バケットを使用することをお勧めします。どのようにファイルを暗号化するかに関しては、本ドキュメントの暗号化を参照ください。

以下のセクションではそれぞれの接続における認証設定を説明します。

SparkのドライバーノードからRedshiftへの接続

Sparkドライバーノードは、usernameとpasswordを用いて、JDBC経由でRedshiftに接続します。Redshiftは接続を認証するためのIAMロールの使用をサポートしていません。デフォルトでは、接続はSSL暗号化されます。詳細は暗号化を参照ください。

SparkからS3への接続

Redshiftに対する読み書きを行う際、S3は大量データの仲介役として動作します。Sparkは、HadoopファイルシステムインタフェースとAmazon Java SDKのS3クライアントの両方を使用してS3にアクセスします。この接続はAWSのキーあるいはインスタンスプロファイル(DBFSマウントポイントはサポートされていないので、AWSキーを使いたくない場合には、クラスターのインスタンスプロファイルを使用する必要があります)。認証情報を渡すためには4つの方法があります。

  • デフォルトのクレディンシャルプロバイダーチェーン(多くのケースでベストな選択肢となります): AWSの認証情報はDefaultAWSCredentialsProviderChainを通じて自動的に収集されます。S3への認証を行う際にインスタンスプロファイルを使用している際には、多くの場合にこの方法を使用することになります。
  • Hadoop設定にキーをセットする: Hadoop configuration propertiesを使用してAWSキーを設定することができます。tempdirの設定でs3a://ファイルシステムをポイントしているのであれば、HadoopのXML設定ファイルにfs.s3a.access.keyfs.s3a.secret.keyプロパティを設定するか、sc.hadoopConfiguration.set()を呼び出してSparkのグローバルHadoop設定にキーをセットすることができます。s3n://ファイルシステムを使用している場合は、以下の例に示すようにレガシーな設定キーを指定することができます。

Scala

s3aファイルシステムを使用している場合は、以下を追加します。

sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")

レガシーなs3nファイルシステムを使用している場合には、以下を追加します。

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")

Python

以下のコマンドはSparkの内部処理に依存している部分がありますが、PySparkでも動作するはずで、将来的に振る舞いが変更される可能性は低いです。

 sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
 sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")

Scala

sc.hadoopConfiguration.set("fs.s3a.credentialsType", "AssumeRole")
sc.hadoopConfiguration.set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)

Python

sc._jsc.hadoopConfiguration().set("fs.s3a.credentialsType", "AssumeRole")
sc._jsc.hadoopConfiguration().set("fs.s3a.stsAssumeRole.arn", <iam-role-arn-to-be-assumed>)

RedshiftからS3への接続

また、RedshiftもCOPYUNLOADクエリーの際にはS3にアクセスします。この接続の際には3つの方法で認証を行うことができます:

  • IAMロールを前提とする(最もセキュアです): COPYUNLOADの際、RedshiftにIAMロールを許可することができ、Redshiftがそのロールを使用するようにデータソースを設定できます。
    1. バケットにアクセスできるS3権限をIAMロールに許可する
    2. Authorizing Amazon Redshift to Access Other AWS Services On Your Behalfのガイドに従って、Redshiftがこのロールを使用できるようにロールの信頼ポリシーを設定する
    3. Authorizing COPY and UNLOAD Operations Using IAM Rolesのガイドに従い、IAMロールとRedshiftクラスターを関連づける
  • SparkのS3認証情報をRedshiftに転送する: forward_spark_s3_credentialsオプションがtrueに設定されているのであれば、データソースは自動的にSparkが使用している認証情報を検知し、JDBC経由でRedshiftに転送します。Sparkがインスタンスプロファイルを使用してS3にアクセスしているのであれば、テンポラリーSTS認証情報のセットがRedshiftに転送されます。そうでなければ、AWSキーが転送されます。JDBCのクエリーにこれらの認証情報が埋め込まれるので、この認証方式を使用する際には、必ずJDBC接続におけるSSL暗号化を有効化してください。
  • セキュリティトークンサービス(STS)を使用する: AWSのSecurity Token Serviceで生成される一時的なキーを使用するためにtemporary_aws_access_key_idtemporary_aws_secret_access_keytemporary_aws_session_tokenを設定することも可能です。JDBCのクエリーにこれらの認証情報が埋め込まれるので、この認証方式を使用する際には、必ずJDBC接続におけるSSL暗号化を有効化してください。この方式を選択した際には、読み書きの操作が成功する前に、認証情報が期限切れになる可能性があることに注意してください。

これら3つのオプションを組み合わせることはできないので、必ず明示的にどれか一つを選択する必要があります。

暗号化

  • JDBCをセキュアにする: JDBC URLにSSLに関連する設定が存在しない場合、デフォルトでデータソースはSSL暗号化を有効にし、Redshiftサーバーが信頼できるか(sslmode=verify-fullかどうか)を検証します。このため、初回はAmazonサーバーから自動でサーバー証明書がダウンロードされます。これに失敗した場合は、フォールバックとして事前にバンドルされている証明書が使用されます。これは、Redshift、PostgreSQLドライバー両方に適用されます。SSLの自動設定は、2.1.1-db4 cluster image (未サポート)で導入されました。これより前のリリースでは、SSLは自動で設定されず、デフォルトのJDBC設定(SSL無効)が使用されます。
    • この機能において不具合が生じた場合、あるいはSSLを無効化したい場合には、自身のコードにおけるDataFrameReaderあるいはDataFrameWriter.option("autoenablessl", "false")オプションを設定することができます。
    • カスタムのSSL設定を適用したい場合には、Redshiftのドキュンメント、Using SSL and Server Certificates in JavaJDBC Driver Configuration Optionsの手順に従ってください。JDBC urlに記載されたSSL関連の設定は優先されます。つまり、自動設定は停止します。
  • S3に格納されたUNLOADデータを暗号化する(Redshiftから読み込みを行なった際に格納されるデータ): Redshiftのドキュメント、Unloading Data to S3によれば、「UNLOADはAmazon S3のサーバーサイド暗号化(SSE-S3)によってデータを暗号化します」とあります。
    • また、Redshiftはクライアントサイドのカスタムキーによる暗号化(Unloading Encrypted Data Filesを参照ください)もサポートしていますが、データソースには要求されるシンメトリックキーを指定する機能を有していません。
  • S3に格納されたCOPYデータを暗号化する(Redshiftに書き込みを行う際に格納されるデータ): Redshiftドキュメント、Loading Encrypted Data Files from Amazon S3には以下の記載があります。
    • AWSが管理する暗号化キーによるサーバーサイド暗号化(SSE-S3、SSE-KMS)、あるいはクライアントサイド暗号化、これらの両方を用いて、Amazon S3にアップロードされたデータファイルをロードするのにCOPYコマンドを使用することができます。COPYは、顧客管理のキーによるAmazon S3サーバーサイド暗号化(SSE-C)をサポートしていません。
    • この機能を使う際には、お使いのHadoop S3ファイルシステムがAmazon S3 encryptionを使うように設定してください。この際、書き込まれるファイルの一覧を含むMANIFESTファイルは暗号化されません。

パラメーター

Spark SQLで提供されるパラメーターマップ、OPTIONSは以下の通りです。

原文を参照ください。

追加の設定オプション

文字列カラムの最大サイズの設定

Redshiftのテーブルを作成する際、デフォルトの振る舞いにおいては、文字列のカラムはTEXTカラムとなります。RedshiftはTEXTカラムをVARCHAR(256)として格納しますので、この列は最大256文字となります(ソース)。

よりサイズの大きいカラムにするためには、それぞれの文字列のカラムの最大長を指定するためのカラムメタデータフィールドmaxlengthを使用できます。これは、デフォルトより小さいサイズのカラムを宣言することで、容量を節約し、パフォーマンスの改善を行う際にも有効です。

注意 Sparkの制限のため、SQLとRのAPIでは、カラムメタデータ変更はサポートされていません。

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

以下のサンプルは、Spark Scala APIを用いて複数のカラムのメタデータを更新するものです。

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

カスタムカラムタイプの設定

手動でカラムタイプを設定する際には、redshift_typeカラムメタデータを使用します。例えば、Spark SQL Schema -> Redshift SQLのタイプマッピングを上書きして、ユーザー定義のカラムタイプを指定したい場合には、以下のようにします。

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

カラムのエンコーディングの設定

テーブルを作成する際に、カラムごとにカラムメタデータフィールドencodingを指定します。(利用可能なエンコーディングに関してはAmazonのドキュメントを参照ください)

カラムの説明の設定

Redshiftでは、カラムに対する説明を追加することができ、一般的なクエリーツールで(COMMENTコマンドを利用して)参照することができます。カラムメタデータフィールドdescriptionでカラムごとの説明を指定できます。

Redshiftへのクエリープッシュダウン

Sparkオプティマイザーは、以下のオペレーターをRedshiftにプッシュダウンします。

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

ProjectFilterにおいては、以下の表現をサポートしています。

  • ほとんどのブーリアンオペレーター
  • 比較
  • 基本的な算術処理
  • 数字、文字列のキャスト
  • ほとんどの文字列関数
  • 全体をRedshiftにプッシュダウンできる場合は、スカラーのサブクエリ

注意
プッシュダウンは、日付とタイムスタンプを操作する表現はサポートしていません。

Aggregationにおいては、以下の集約関数をサポートしています。

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

利用できる場合には、DISTINCT句と組み合わせることができます。

Joinにおいては、以下のジョインをサポートしています。

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • オプティマイザーによってJoinに書き込まれるサブクエリ。例: WHERE EXISTSWHERE NOT EXISTS

注意
Joinのプッシュダウンは、FULL OUTER JOINをサポートしていません。

LIMITを使うクエリーにおいて、プッシュダウンが最も有効かもしれません。SELECT * FROM large_redshift_table LIMIT 10のようなクエリーは、中間データとしてテーブル全体をS3にUNLOADするため、非常に時間がかかる場合があります。プッシュダウンを活用することで、LIMITはRedshiftで実行されます。集計を伴うクエリーにおいては、集計処理をRedshiftにプッシュダウンすることで、転送されるデータ量を削減することが可能になります。

Redshiftへのクエリープッシュダウンはデフォルトで有効になっています。spark.databricks.redshift.pushdownfalseに設定することで無効化することができます。無効化されている場合においても、Sparkはフィルターとカラムの削除はRedshiftにプッシュダウンします。

トランザクションの保証

本章では、Sparkに対するRedshiftデータソースのトランザクション保証を説明します。

RedshiftとS3の特性のバックグラウンド

Redshiftのトランザクション保証に関する一般的な情報に関しては、RedshiftドキュメントのManaging Concurrent Write Operationsを参照してください。要約すると、RedshiftのBEGINコマンドのドキュメントによれば、Redshiftはserializable isolationを提供しています。

4つのトランザクションレベルのいずれも使用できますが、Amazon Redshiftはすべての分離レベルをシリアライズ可能として処理します。

Redshiftドキュメントによれば、

Amazon Redshiftは、それぞれ実行されたSQLコマンドを個別にコミットする自動コミットをデフォルトでサポートしています。

すなわち、COPYUNLOADのような個々のコマンドは原子的、かつトランザクションが保証されます。一方、複数のコマンドやクエリーの原子性を保証するときには、明示的なBEGINENDが必要になります。

Redshiftに読み書きを行う際、データソースはS3に読み書きを行います。SparkとRedshiftは部分的な出力を生成し、複数のファイルとしてS3に格納します。Amazon S3 Data Consistency Modelによると、S3バケットの一覧操作は結果的整合性を保証しています。このため、データソースの結果的整合性によるデータの欠損や欠如を避けるためにこれらのファイルは、特定のサイズに到達する必要があります。

Sparkに対するRedshiftデータソースの保証

既存テーブルへの追加

Redshiftに行を追加する際、データソースはCOPYコマンドを使用し、結果的整合性のS3操作に備えるためにmanifestsを指定します。結果として、spark-redshiftは、通常のRedshiftのCOPYコマンドと同じ原子性、トランザクション特性を持つ既存テーブルに追加を行います。

新規テーブルの作成(SaveMode.CreateIfNotExists)

テーブルの作成は、CREATE TABLEコマンド、および初期のレコードを追加するCOPYコマンドから構成される二段階のプロセスとなります。両方の操作は同一のトランザクションとして実行されます。

既存テーブルの上書き

デフォルトでは、対象テーブルの削除、空テーブルの新規作成、行追加から構成されるトランザクションで上書きを行います。

usestagingtablefalseに設定されている場合、上書き操作の原子性を犠牲にしてRedshiftでの上書きに必要なステージング領域を削減できるように、データソースは新規テーブルに行を追加する前に、DELETE TABLEコマンドをコミットします。

Redshiftテーブルへのクエリー

クエリーにおいては、RedshiftのUNLOADコマンドを使用し、結果をS3に保存し、S3の結果的整合性の操作に備えるためにmanifestsを使用します。結果として、SparkのRedshiftデータソースからのクエリーは、通常のRedshiftクエリーと同様の一貫性を持ちます。

一般的な問題及び解決策

S3バケットとRedshitクラスターが異なるAWSリージョンに存在する

デフォルトでは、S3バケットとRedshiftクラスターが異なるAWSリージョンに存在している場合にはS3 <-> Redshiftのコピーは動作しません。

異なるリージョンにS3バケットがあるケースで、Redshiftのテーブルから読み取りを行おうとすると、以下のようなエラーが発生します。

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

同様のケースで書き込みを行おうとすると、以下のようなエラーが発生します。

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • 書き込み: RedshiftのCOPYコマンドでは、明示的にS3バケットのリージョンを指定することができますので、このケースにおいてはextracopyoptionsregion 'the-region-name'を追加することで、Redshiftが適切に動作するようになります。例えば、バケットがUS East (Virginia)にある場合には、Scala APIでは以下のように指定します。

Scala

.option("extracopyoptions", "region 'us-east-1'")

Databricksランタイム6.2以降では、代わりにawsregionを使用することも可能です。

Scala

.option("awsregion", "us-east-1")
  • 読み取り: RedshiftのUNLOADコマンドでもS3バケットのリージョンを指定できます。Databricksランタイム6.2以降ではawsregionを設定することで、正常に読み取りを行えるようになります。

Scala

.option("awsregion", "us-east-1")

Databricksランタイム6.1以前では、データソースは異なるリージョンのバケットへの書き込みをサポートしていません。唯一のワークアラウンドは、Redshiftと同じリージョンに新たなバケットを作成することです。

S3の認証を行う際にインスタンスプロファイルを使用した場合に、予期しないS3ServiceException credentials errorが発生する

S3の認証を行う際にインスタンスプロファイルを使用していて、予期しないS3ServiceExceptionエラーに直面した際には、S3 URIのtempdir、Hadoopの設定、あるいはDefaultAWSCredentialsProviderChainで検証される全てのソースにおいてAWSのアクセスキーが指定されていることを確認してください。これらのソースの設定は、インスタンスプロファイルの認証情報を上書きします。

こちらが、インスタンスプロファイルの設定を間違って上書きしてしまった際に表示される可能性のあるエラーメッセージです。

com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;

対応するRedshiftの操作が完了しているにもかかわらず、長時間のSparkクエリーがハングアップする

Redshiftに対して大量のデータを読み書きする際に、Redshiftのモニタリングのページでは、Redshiftで対応するLOADUNLOADの処理は完了していると表示され、クラスターはアイドル状態になっていると表示されているにもかかわらず、Sparkのクエリーがハングアップする場合があります。これは、RedshiftとSpark間の接続がタイムアウトした場合に発生します。これを避けるには、JDBCフラグのtcpKeepAliveが有効化されており、TCPKeepAliveMinutesが小さい値(1など)になっていることを確認してください。

詳細はAmazon Redshift JDBC Driver Configurationを参照ください。

タイムスタンプとタイムゾーンのセマンティクス

データを読み込む際、RedshiftのTIMESTAMPTIMESTAMPTZデータタイプの両方は、SparkのTimestampTypeにマッピングされ、値は協定世界時(UTC)に変換され、UTCのタイムスタンプとして格納されます。RedshiftのTIMESTAMPにおいては、値はいかなるタイムゾーン情報を持たないものとして、ローカルのタイムゾーンが用いられます。Redshiftテーブルに書き込む際には、SparkのTimestampTypeはRedshiftのTIMESTAMPデータタイプにマッピングされます。

移行ガイド

現時点では、データソースがSparkのS3認証情報をRedshiftに転送する前に、明示的にforward_spark_s3_credentialsを指定する必要があります。aws_iam_roletemporary_aws_*認証機構を使用している場合には、この変更の影響はありません。しかし、過去のデフォルト動作に依存している場合、これまでのRedshiftからS3への認証機構を使い続けるのであれば、明示的にforward_spark_s3_credentialstrueに設定する必要があります。3つの認証機構とセキュリティ上のトレードオフに関する議論については、本記事のS3とRedshiftの認証を参照ください。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0