3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS Glueからspark redshiftを利用してRedshiftを直接利用する

Last updated at Posted at 2018-05-31

AWS GlueのJobでクロールしたRedshiftのテーブルをDynamic Frame経由で利用しようとすると以下の2つの問題点があります。

  • 複数のテーブルをJoinして利用することができない
  • SELECT * で条件指定などせず、全レコードをtemporary領域にUnloadしてしまう

そこで、databricksが提供しているRedshift Data Source for Apache Sparkを利用してRedshiftを利用します。

前提

  • spark-redshift v2.0.1のjarを利用
  • AWS Glue Jobは2018/5時点に作成、言語としてはScalaを利用。
    python(pySpark)だと 必要なライブラリや、py4jのリフレクションで詰まって動作するところまで持っていけてない。
  • Database(Redshift)への設定は事前に確認しておく
  • GlueContextがSQLContextをextendしていることを理解しておく

準備

Connectionの作成

RedshiftへのConnectionの作成はクラスメソッドさんのAWS Glue 実践入門:Amazon Redshiftのテーブルをクロールするが参考になります。
ここで作成したConnectionをGlue JobのConnectionsとして利用します。

RedshiftへのConnectionを事前に用意しておかないと、Glue JobからRedshiftへアクセスためにSecurity Groupの開放を行う必要が出てきますが、自身のVPCなどで実行されるわけではないので必要なIPの範囲などをサポートに問い合わせる必要が出るかと思います。

使用するライブラリの用意

S3にjarファイルをアップローしておいて、「Script libraries and job parameters (optional)」の「Dependent jars path」にアップロードしたjarファイルのパスを設定すれば完了です。

コード

エラーが発生したらGlueのJobがエラー終了することは問題がないので、いろいろ雑なコードになっています。
ここでは加工などはしていないですが、SQLの実行結果はDataFrameなのでそれを処理することは可能です。
writeLegacyFormatはDecimal型をRedshift Spectrumで利用できるようにするために設定しています。


import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
import java.util.Calendar
import java.util.TimeZone

import com.amazonaws.regions.Regions
import com.amazonaws.services.kms.AWSKMS
import com.amazonaws.services.kms.AWSKMSClientBuilder
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import java.util.Base64


object GlueApp {
  def buildDecRequest(byteBuffer: ByteBuffer): DecryptRequest = new DecryptRequest().withCiphertextBlob(byteBuffer)
  def decryptMessage(src: String): String = {
    val kms: AWSKMS = AWSKMSClientBuilder.standard.withRegion(Regions.US_WEST_2).build()
    val encryptionMessage = ByteBuffer.wrap(Base64.getDecoder.decode(src.getBytes()))

    val decryptRequest = buildDecRequest(encryptionMessage)
    val decryptResult = kms.decrypt(decryptRequest)
    val decryptText = decryptResult.getPlaintext
    new String(decryptText.array, "UTF-8")
  }

  def main(sysArgs: Array[String]) {

    val encryptedRedshiftPassword = "KMS encrypted password"
    val rsPassword = decryptMessage(encryptedRedshiftPassword)

    val spark: SparkContext = new SparkContext()
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME","STAGE").toArray)
    // STAGEから、s3pathとrsDbNameを指定
    val (rsDbName, s3path) = args("STAGE") match {
      case "production" => ("production_dbname", "s3://prodcution_buckt/")
      case _ => ("staging_dbname", "s3://staging_buckt/")
    }
    // Output S3path
    val now = Calendar.getInstance(TimeZone.getTimeZone("JST"))
    now.add(Calendar.DATE, -1)
    val year = now.get(Calendar.YEAR)
    val month = now.get(Calendar.MONTH) + 1
    val date = now.get(Calendar.DATE)
    val outS3path = s"$s3path/year=$year/month=$month/day=$date"
    val rsTempS3path = "s3://aws-glue-temporary-path/"

    // 必要に応じて設定
    //spark.hadoopConfiguration.set("fs.s3.awsAccessKeyId",awsAccessKeyId)
    //spark.hadoopConfiguration.set("fs.s3.awsSecretAccessKey",awsSecretAccessKey)

    val glueContext: GlueContext = new GlueContext(spark)
    glueContext.setConf("spark.sql.parquet.writeLegacyFormat", "true")


    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    val rsURL = "redshift-host:5439"
    val rsUser = "connection user"
    val jdbcURL = s"jdbc:redshift://$rsURL/$rsDbName?user=$rsUser&password=$rsPassword"
    val selectSQL = """
    SELECT
      id,
      product_group_code,
      product_code,
      price,
      high_price,
      ratio::decimal(6,10),
      high_price_ratio::decimal(17,10),
      created_at,
      updated_at
    FROM
      plan_list
    """

    val schema = StructType(
      Array(
        StructField("id", IntegerType, true),
        StructField("product_group_code", StringType, true),
        StructField("product_code", StringType, true),
        StructField("price", IntegerType, true),
        StructField("high_price", IntegerType, true),
        StructField("ratio", DecimalType(6,10), true),,
        StructField("high_price_ratio", DecimalType(17,10), true),
        StructField("created_at", TimestampType, true),
        StructField("updated_at", TimestampType, true)
      )
    )


    val df = glueContext.read
      .format("com.databricks.spark.redshift")
      .schema(schema)
      .option("url", jdbcURL)
      .option("query", selectSQL)
      .option("tempdir", rsTempS3path)
      .option("aws_iam_role", "arn:aws:iam::aaaaa")
      .load()

    df.printSchema()


    df.write.mode("overwrite").format("parquet").option("compression", "gzip").mode("overwrite").save(outS3path)
    Job.commit()
  }
}

結論

上記のようなコードをベースに、Where句に条件をつけて実行してみたころspark-redshiftで指定したtempdirに出力されていたファイルはWhere句の条件がちゃんと効いていることが確認できました。

spark-redshiftを利用することで、データソーソスに複雑な条件を指定でき、Redshiftからのデータ取得時の負荷をコントロールできるようになります。

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?