2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Glueでパーティション値を変更(UTC→JST)してみた

Last updated at Posted at 2022-04-16

背景・目的

S3に出力されたUTC日時のHive形式のパスから、GlueでJST日時のHive形式のパスへ変換します。

  • ここでいう、Hive形式とは、year=XXXX/month=XXXX/day=XXXX/hour=XXXXなどの形式を指します。
  • UTCとJSTの時差は、+9:00になります。

AWSのサービスでログをS3に出力する場合、UTC日時でS3パスが作られます。
これを各ローカルタイムのパスに変更をしたいのが趣旨になります。

例えば、UTCの2022年04月15日(金) 00:00は、JSTでは2022年04月15日(金) 09:00になりますが、
S3パスでは、UTCのyear=2022/month=04/day=15/hour=00/となり、これを、year=2022/month=04/day=15/hour=09/に変換します。

まとめ

  • パーティション値を変換しました。
  • 変換するには、DataFrameを駆使すればできました。

実践

前提

  • Source(UTC)側のS3パスは、以下のとおりです。
    • 2022/4/15 0:00と、2022/4/15 1:00のパーティションが2つあります。
$ aws s3 ls s3://{インプットバケット名} --recursive

2022-04-15 21:45:47          0 year=2022/
2022-04-15 21:46:22          0 year=2022/month=04/
2022-04-15 21:47:23          0 year=2022/month=04/day=15/
2022-04-15 21:47:51          0 year=2022/month=04/day=15/hour=00/
2022-04-16 00:45:46         45 year=2022/month=04/day=15/hour=00/20220415000000.json
2022-04-15 21:47:59          0 year=2022/month=04/day=15/hour=01/
2022-04-16 00:20:17         45 year=2022/month=04/day=15/hour=01/20220415010000.json
$
  • ファイルの内容は、以下のとおりです。
    • log_dateカラムとvalueカラムを持つJSONフォーマットです。
$ aws s3 cp s3://{インプットバケット}/year=2022/month=04/day=15/hour=00/20220415000000.json -
{"log_date":"20220415000000","value":100000}
$ aws s3 cp s3://{インプットバケット}/year=2022/month=04/day=15/hour=01/20220415010000.json -
{"log_date":"20220415010000","value":200000}
$

カラム名の変更

  • パーティションのカラム名を変更します。
  • Glueでデータを確認するとパーティションも含めた値が取得できます。これを一度別名にリネームします。
    • year→year_work
    • ・・・・

リネームをする理由
この後、DynamicFrameから、DataFrameに変換をするのですが、変換後パーティションキーの値は引き継がれずNullになってしまいました。
そのため、リネームをしています。

RenamedDynamicFrame = ApplyMapping_node2.rename_field('year','year_work').rename_field('month','month_work').rename_field('day','day_work').rename_field('hour','hour_work')
  • この時点のDataFrameの定義とデータは以下のとおりです。
root
|-- log_date: string
|-- value: int
|-- year_work: string
|-- month_work: string
|-- day_work: string
|-- hour_work: string

====

{
    "log_date": "20220415000000",
    "value": 100000,
    "year_work": "2022",
    "month_work": "04",
    "day_work": "15",
    "hour_work": "00"
}
{
    "log_date": "20220415010000",
    "value": 200000,
    "year_work": "2022",
    "month_work": "04",
    "day_work": "15",
    "hour_work": "01"
}

DataFrameへの変換

  • 日時操作を行うにあたり、DynamicFrameからDataFrameに変換します。
DF = RenamedDynamicFrame.toDF()

パーティション値の操作

  • パーティション値を、日時型に変換し、時差を加算します。

パーティション値の連結

  • パーティション値を文字列として連結します。
DfWithColumn = DF.withColumn('work_timestamp', format_string('%s-%s-%s %s:00:00',DF.year_work,DF.month_work,DF.day_work,DF.hour_work))
  • この時点のDataFrameの定義とデータは以下のとおりです。
root
 |-- log_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- year_work: string (nullable = true)
 |-- month_work: string (nullable = true)
 |-- day_work: string (nullable = true)
 |-- hour_work: string (nullable = true)
 |-- work_timestamp: string (nullable = false)

===

+--------------+------+---------+----------+--------+---------+-------------------+
|      log_date| value|year_work|month_work|day_work|hour_work|     work_timestamp|
+--------------+------+---------+----------+--------+---------+-------------------+
|20220415000000|100000|     2022|        04|      15|       00|2022-04-15 00:00:00|
|20220415010000|200000|     2022|        04|      15|       01|2022-04-15 01:00:00|
+--------------+------+---------+----------+--------+---------+-------------------+

文字列からタイムスタンプ型に変換

  • 文字列からタイムスタンプ型に変換します。
DfWithColumn = DfWithColumn.withColumn('work_timestamp2', to_timestamp(DfWithColumn.work_timestamp))

JST時刻の計算

  • exprにINTERVALを引数に加算します。
  • JSTはUTCから+9時間なので「INTERVAL 9 HOURS」とします。
time_difference = 9
INTERVAL_STRING = "INTERVAL {0} HOURS".format(time_difference)

DFPartition_time = DfWithColumn.withColumn('partition_time',DfWithColumn.work_timestamp2 + expr(INTERVAL_STRING))
  • この時点のDataFrameの定義とデータは以下のとおりです。
  • partition_timeが、+9Hされた時刻になっています。
root
 |-- log_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- year_work: string (nullable = true)
 |-- month_work: string (nullable = true)
 |-- day_work: string (nullable = true)
 |-- hour_work: string (nullable = true)
 |-- work_timestamp: string (nullable = false)
 |-- work_timestamp2: timestamp (nullable = true)
 |-- partition_time: timestamp (nullable = true)
===

+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+
|      log_date| value|year_work|month_work|day_work|hour_work|     work_timestamp|    work_timestamp2|     partition_time|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+
|20220415000000|100000|     2022|        04|      15|       00|2022-04-15 00:00:00|2022-04-15 00:00:00|2022-04-15 09:00:00|
|20220415010000|200000|     2022|        04|      15|       01|2022-04-15 01:00:00|2022-04-15 01:00:00|2022-04-15 10:00:00|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+

日時の切り出し

  • JSTに変換したタイムスタンプ型のデータを、年、月、日、時に分割します。
SplitedDF = DFPartition_time.withColumn('year',date_format(DFPartition_time.partition_time,"y")).withColumn('month',date_format(DFPartition_time.partition_time,'MM')).withColumn('day',date_format(DFPartition_time.partition_time,"dd")).withColumn('hour',date_format(DFPartition_time.partition_time,"HH"))
  • この時点のDataFrameの定義とデータは以下のとおりです。
  • parition_timeから、year,month,day,hourに分割されました。
root
 |-- log_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- year_work: string (nullable = true)
 |-- month_work: string (nullable = true)
 |-- day_work: string (nullable = true)
 |-- hour_work: string (nullable = true)
 |-- work_timestamp: string (nullable = false)
 |-- work_timestamp2: timestamp (nullable = true)
 |-- partition_time: timestamp (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: string (nullable = true)

===

+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+
|      log_date| value|year_work|month_work|day_work|hour_work|     work_timestamp|    work_timestamp2|     partition_time|year|month|day|hour|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+
|20220415000000|100000|     2022|        04|      15|       00|2022-04-15 00:00:00|2022-04-15 00:00:00|2022-04-15 09:00:00|2022|   04| 15|  09|
|20220415010000|200000|     2022|        04|      15|       01|2022-04-15 01:00:00|2022-04-15 01:00:00|2022-04-15 10:00:00|2022|   04| 15|  10|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+

カラム削除

  • 作業用のカラムを削除します。
DropedDF = SplitedDF.drop("year_work").drop("month_work").drop("day_work").drop("hour_work").drop("work_timestamp").drop("work_timestamp2").drop("partition_time")
  • この時点のDataFrameの定義とデータは以下のとおりです。
root
 |-- log_date: string (nullable = true)
 |-- value: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- hour: string (nullable = true)

===

+--------------+------+----+-----+---+----+
|      log_date| value|year|month|day|hour|
+--------------+------+----+-----+---+----+
|20220415000000|100000|2022|   04| 15|  09|
|20220415010000|200000|2022|   04| 15|  10|
+--------------+------+----+-----+---+----+

DynamicFrameに変換

  • DataFrameからDynamicFrameに変更します。
DynamicFrameJST = DynamicFrame.fromDF(DropedDF,glueContext,'DynamicFrameAddColumn')

アウトプット

  • Parquet&Snappyで書き込みます。
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrameJST,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://{アウトプットバケット}/",
        "partitionKeys": ["year", "month", "day", "hour"],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="S3bucket_node3",
)

結果の確認

  • 出力結果を確認します。
$ aws s3 ls s3://{アウトプットバケット} --recursive
2022-04-16 22:54:43        459 year=2022/month=04/day=15/hour=09/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
2022-04-16 22:54:43        459 year=2022/month=04/day=15/hour=10/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
$
  • ファイルも読めました。
  • Parquetファイルを読むために、parquet-toolsを使いました。parquet-toolsの使い方はこちらをご確認ください。
$ aws s3 cp s3://{アウトプットバケット}/year=2022/month=04/day=15/hour=09/run-1650117281422-part-block-0-0-r-00000-snappy.parquet - > ~/Downloads/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
$ parquet-tools cat ~/Downloads/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
log_date = 20220415000000
value = 100000

$ aws s3 cp s3://{アウトプットバケット}/year=2022/month=04/day=15/hour=10/run-1650117281422-part-block-0-0-r-00001-snappy.parquet - > ~/Downloads/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
$ parquet-tools cat ~/Downloads/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
log_date = 20220415010000
value = 200000
$

考察

  • パーティション値の変換は、結構手間がかかるなぁというのが率直な感想です。
  • 実は、もっと簡単にできる方法があるような気がしますので、ご存じの方は教えていただけると幸いです。

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?