2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWS Glue&Scala:ETLジョブでAmazon AthenaのPartitionKeyを追加する

Last updated at Posted at 2019-05-05

Glue ジョブ スクリプトの編集にて

1.ソースファイルにimport文を追加

import com.amazonaws.services.glue.DynamicFrame
import com.amazonaws.services.glue.DynamicRecord
import com.amazonaws.services.glue.types.StringNode //今回はStringのみ。

2.カラム追加処理(DynamicFrame.mapメソッド、DynamicRecord.addFieldメソッド/getFieldメソッド、StringNode)

val datasource0 = glueContext.getCatalogSource(
	 -- 中略 --
 ).getDynamicFrame()

//カラム追加処理
//今回は、誕生日(birthday)を西暦、月、日にパーティショニングしたかったため以下の処理。
//用途に応じて読み替えてください。
val addField1 = datasource0.map((rec: DynamicRecord) => {
	val mbody = rec.getField("birthday")// DynamicRecord.getFieldメソッドで指定カラムの値をOption型で取得
	val datePattern = """(\d{4})-(\d{2})-(\d{2})""".r //パターンマッチ用の正規表現

	mbody match {
		case Some(mval:String) => {
			mval match {
				case datePattern(y,m,d)=>{
					rec.addField("year",StringNode(y)) //西暦カラムを追加。birthdayの西暦をセット
					rec.addField("month",StringNode(m)) //月カラムを追加。birthdayの月をセット
					rec.addField("day",StringNode(d)) //日カラムを追加。birthdayの日をセット
					rec
				}
				case _ => rec
			}
		}
		case _ => rec
	}
})

 (以下略)

以下、本家より

  • DynamicFrame

    • def map
      • def map( f : DynamicRecord => DynamicRecord,
        errorMsg : String = "",
        transformationContext : String = "",
        callSite : CallSite = CallSite("Not provided", ""),
        stageThreshold : Long = 0,
        totalThreshold : Long = 0
        ) : DynamicFrame
        • 指定した関数 "f" をこの DynamicFrame の各レコードに適用することで生成された新しい DynamicFrame を返します。
        • このメソッドは、指定した関数を適用する前に各レコードをコピーするため、レコードを安全に変更できます。特定のレコードでマッピング関数から例外がスローされた場合、そのレコードはエラーとしてマークされ、スタックトレースがエラーレコードの列として保存されます。
  • DynamicRecord

    • def addField

      • def addField( path : String,dynamicNode : DynamicNode) : Unit
        • 指定したパスに DynamicNode を追加します。
          • path — 追加するフィールドのパス。
          • dynamicNode — 指定したパスに追加する DynamicNode。
    • def getField

      • def getField( path : String ) : Option[Any]
        • DynamicNode のオプションとして指定した path でフィールドの値を取得します。
        • scala.Some Some (値) を返します。

3.追加マッピング

// val applymapping1 = datasource0.applyMapping(
val applymapping1 = addField1.applyMapping(
	mappings = Seq(
	  -- 中略 --
            ("year","string","year","string"), //西暦カラムをStringで追加マッピング
            ("month","string","month","string"), //月カラムをStringで追加マッピング
            ("day","string","day","string"), //日カラムをStringで追加マッピング
	  -- 中略 --
	), caseSensitive = false, transformationContext = "applymapping1"
)
 (中略)

4.パーティショニングの設定

    val datasink4 = glueContext.getSinkWithFormat(
            connectionType = "s3", 
            options = JsonOptions(
                Map( 
           "path" -> "s3://【s3のパス】",
                    "partitionKeys" -> Seq( "year", "month","day") //西暦、月、日のパーティショニングを設定
                )
            ),
            transformationContext = "datasink4", 
            format = "parquet"
        ).writeDynamicFrame(dropnullfields3)

あとがき

  • ScalaでのETLプログラミングに関する情報が少なく苦労した。(Pythonはそれなりにある)
  • DynamicFrameを Apache Spark の DataFrame に変換してからカラム追加し、DynamicFrameに戻すやり方もあった。が、DynamicFrameで行えることはDynamicFrameで行うほうがよい。
  • 困ったら本家のマニュアル

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?