AWS GlueのJobでクロールしたRedshiftのテーブルをDynamic Frame経由で利用しようとすると以下の2つの問題点があります。
- 複数のテーブルをJoinして利用することができない
- SELECT * で条件指定などせず、全レコードをtemporary領域にUnloadしてしまう
そこで、databricksが提供しているRedshift Data Source for Apache Sparkを利用してRedshiftを利用します。
前提
- spark-redshift v2.0.1のjarを利用
- AWS Glue Jobは2018/5時点に作成、言語としてはScalaを利用。
python(pySpark)だと 必要なライブラリや、py4jのリフレクションで詰まって動作するところまで持っていけてない。 - Database(Redshift)への設定は事前に確認しておく
- GlueContextがSQLContextをextendしていることを理解しておく
準備
Connectionの作成
RedshiftへのConnectionの作成はクラスメソッドさんのAWS Glue 実践入門:Amazon Redshiftのテーブルをクロールするが参考になります。
ここで作成したConnectionをGlue JobのConnectionsとして利用します。
RedshiftへのConnectionを事前に用意しておかないと、Glue JobからRedshiftへアクセスためにSecurity Groupの開放を行う必要が出てきますが、自身のVPCなどで実行されるわけではないので必要なIPの範囲などをサポートに問い合わせる必要が出るかと思います。
使用するライブラリの用意
S3にjarファイルをアップローしておいて、「Script libraries and job parameters (optional)」の「Dependent jars path」にアップロードしたjarファイルのパスを設定すれば完了です。
コード
エラーが発生したらGlueのJobがエラー終了することは問題がないので、いろいろ雑なコードになっています。
ここでは加工などはしていないですが、SQLの実行結果はDataFrameなのでそれを処理することは可能です。
writeLegacyFormatはDecimal型をRedshift Spectrumで利用できるようにするために設定しています。
import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
import java.util.Calendar
import java.util.TimeZone
import com.amazonaws.regions.Regions
import com.amazonaws.services.kms.AWSKMS
import com.amazonaws.services.kms.AWSKMSClientBuilder
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import java.util.Base64
object GlueApp {
def buildDecRequest(byteBuffer: ByteBuffer): DecryptRequest = new DecryptRequest().withCiphertextBlob(byteBuffer)
def decryptMessage(src: String): String = {
val kms: AWSKMS = AWSKMSClientBuilder.standard.withRegion(Regions.US_WEST_2).build()
val encryptionMessage = ByteBuffer.wrap(Base64.getDecoder.decode(src.getBytes()))
val decryptRequest = buildDecRequest(encryptionMessage)
val decryptResult = kms.decrypt(decryptRequest)
val decryptText = decryptResult.getPlaintext
new String(decryptText.array, "UTF-8")
}
def main(sysArgs: Array[String]) {
val encryptedRedshiftPassword = "KMS encrypted password"
val rsPassword = decryptMessage(encryptedRedshiftPassword)
val spark: SparkContext = new SparkContext()
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME","STAGE").toArray)
// STAGEから、s3pathとrsDbNameを指定
val (rsDbName, s3path) = args("STAGE") match {
case "production" => ("production_dbname", "s3://prodcution_buckt/")
case _ => ("staging_dbname", "s3://staging_buckt/")
}
// Output S3path
val now = Calendar.getInstance(TimeZone.getTimeZone("JST"))
now.add(Calendar.DATE, -1)
val year = now.get(Calendar.YEAR)
val month = now.get(Calendar.MONTH) + 1
val date = now.get(Calendar.DATE)
val outS3path = s"$s3path/year=$year/month=$month/day=$date"
val rsTempS3path = "s3://aws-glue-temporary-path/"
// 必要に応じて設定
//spark.hadoopConfiguration.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
//spark.hadoopConfiguration.set("fs.s3.awsSecretAccessKey",awsSecretAccessKey)
val glueContext: GlueContext = new GlueContext(spark)
glueContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val rsURL = "redshift-host:5439"
val rsUser = "connection user"
val jdbcURL = s"jdbc:redshift://$rsURL/$rsDbName?user=$rsUser&password=$rsPassword"
val selectSQL = """
SELECT
id,
product_group_code,
product_code,
price,
high_price,
ratio::decimal(6,10),
high_price_ratio::decimal(17,10),
created_at,
updated_at
FROM
plan_list
"""
val schema = StructType(
Array(
StructField("id", IntegerType, true),
StructField("product_group_code", StringType, true),
StructField("product_code", StringType, true),
StructField("price", IntegerType, true),
StructField("high_price", IntegerType, true),
StructField("ratio", DecimalType(6,10), true),,
StructField("high_price_ratio", DecimalType(17,10), true),
StructField("created_at", TimestampType, true),
StructField("updated_at", TimestampType, true)
)
)
val df = glueContext.read
.format("com.databricks.spark.redshift")
.schema(schema)
.option("url", jdbcURL)
.option("query", selectSQL)
.option("tempdir", rsTempS3path)
.option("aws_iam_role", "arn:aws:iam::aaaaa")
.load()
df.printSchema()
df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save(outS3path)
Job.commit()
}
}
結論
上記のようなコードをベースに、Where句に条件をつけて実行してみたころspark-redshiftで指定したtempdirに出力されていたファイルはWhere句の条件がちゃんと効いていることが確認できました。
spark-redshiftを利用することで、データソーソスに複雑な条件を指定でき、Redshiftからのデータ取得時の負荷をコントロールできるようになります。