背景・目的
S3に出力されたUTC日時のHive形式のパスから、GlueでJST日時のHive形式のパスへ変換します。
- ここでいう、Hive形式とは、year=XXXX/month=XXXX/day=XXXX/hour=XXXXなどの形式を指します。
- UTCとJSTの時差は、+9:00になります。
AWSのサービスでログをS3に出力する場合、UTC日時でS3パスが作られます。
これを各ローカルタイムのパスに変更をしたいのが趣旨になります。
例えば、UTCの2022年04月15日(金) 00:00
は、JSTでは2022年04月15日(金) 09:00
になりますが、
S3パスでは、UTCのyear=2022/month=04/day=15/hour=00/
となり、これを、year=2022/month=04/day=15/hour=09/
に変換します。
まとめ
- パーティション値を変換しました。
- 変換するには、DataFrameを駆使すればできました。
実践
前提
- Source(UTC)側のS3パスは、以下のとおりです。
- 2022/4/15 0:00と、2022/4/15 1:00のパーティションが2つあります。
$ aws s3 ls s3://{インプットバケット名} --recursive
2022-04-15 21:45:47 0 year=2022/
2022-04-15 21:46:22 0 year=2022/month=04/
2022-04-15 21:47:23 0 year=2022/month=04/day=15/
2022-04-15 21:47:51 0 year=2022/month=04/day=15/hour=00/
2022-04-16 00:45:46 45 year=2022/month=04/day=15/hour=00/20220415000000.json
2022-04-15 21:47:59 0 year=2022/month=04/day=15/hour=01/
2022-04-16 00:20:17 45 year=2022/month=04/day=15/hour=01/20220415010000.json
$
- ファイルの内容は、以下のとおりです。
- log_dateカラムとvalueカラムを持つJSONフォーマットです。
$ aws s3 cp s3://{インプットバケット}/year=2022/month=04/day=15/hour=00/20220415000000.json -
{"log_date":"20220415000000","value":100000}
$ aws s3 cp s3://{インプットバケット}/year=2022/month=04/day=15/hour=01/20220415010000.json -
{"log_date":"20220415010000","value":200000}
$
カラム名の変更
- パーティションのカラム名を変更します。
- Glueでデータを確認するとパーティションも含めた値が取得できます。これを一度別名にリネームします。
- year→year_work
- ・・・・
リネームをする理由
この後、DynamicFrameから、DataFrameに変換をするのですが、変換後パーティションキーの値は引き継がれずNullになってしまいました。
そのため、リネームをしています。
RenamedDynamicFrame = ApplyMapping_node2.rename_field('year','year_work').rename_field('month','month_work').rename_field('day','day_work').rename_field('hour','hour_work')
- この時点のDataFrameの定義とデータは以下のとおりです。
root
|-- log_date: string
|-- value: int
|-- year_work: string
|-- month_work: string
|-- day_work: string
|-- hour_work: string
====
{
"log_date": "20220415000000",
"value": 100000,
"year_work": "2022",
"month_work": "04",
"day_work": "15",
"hour_work": "00"
}
{
"log_date": "20220415010000",
"value": 200000,
"year_work": "2022",
"month_work": "04",
"day_work": "15",
"hour_work": "01"
}
DataFrameへの変換
- 日時操作を行うにあたり、DynamicFrameからDataFrameに変換します。
DF = RenamedDynamicFrame.toDF()
パーティション値の操作
- パーティション値を、日時型に変換し、時差を加算します。
パーティション値の連結
- パーティション値を文字列として連結します。
DfWithColumn = DF.withColumn('work_timestamp', format_string('%s-%s-%s %s:00:00',DF.year_work,DF.month_work,DF.day_work,DF.hour_work))
- この時点のDataFrameの定義とデータは以下のとおりです。
root
|-- log_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- year_work: string (nullable = true)
|-- month_work: string (nullable = true)
|-- day_work: string (nullable = true)
|-- hour_work: string (nullable = true)
|-- work_timestamp: string (nullable = false)
===
+--------------+------+---------+----------+--------+---------+-------------------+
| log_date| value|year_work|month_work|day_work|hour_work| work_timestamp|
+--------------+------+---------+----------+--------+---------+-------------------+
|20220415000000|100000| 2022| 04| 15| 00|2022-04-15 00:00:00|
|20220415010000|200000| 2022| 04| 15| 01|2022-04-15 01:00:00|
+--------------+------+---------+----------+--------+---------+-------------------+
文字列からタイムスタンプ型に変換
- 文字列からタイムスタンプ型に変換します。
DfWithColumn = DfWithColumn.withColumn('work_timestamp2', to_timestamp(DfWithColumn.work_timestamp))
JST時刻の計算
- exprにINTERVALを引数に加算します。
- JSTはUTCから+9時間なので「INTERVAL 9 HOURS」とします。
time_difference = 9
INTERVAL_STRING = "INTERVAL {0} HOURS".format(time_difference)
DFPartition_time = DfWithColumn.withColumn('partition_time',DfWithColumn.work_timestamp2 + expr(INTERVAL_STRING))
- この時点のDataFrameの定義とデータは以下のとおりです。
- partition_timeが、+9Hされた時刻になっています。
root
|-- log_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- year_work: string (nullable = true)
|-- month_work: string (nullable = true)
|-- day_work: string (nullable = true)
|-- hour_work: string (nullable = true)
|-- work_timestamp: string (nullable = false)
|-- work_timestamp2: timestamp (nullable = true)
|-- partition_time: timestamp (nullable = true)
===
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+
| log_date| value|year_work|month_work|day_work|hour_work| work_timestamp| work_timestamp2| partition_time|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+
|20220415000000|100000| 2022| 04| 15| 00|2022-04-15 00:00:00|2022-04-15 00:00:00|2022-04-15 09:00:00|
|20220415010000|200000| 2022| 04| 15| 01|2022-04-15 01:00:00|2022-04-15 01:00:00|2022-04-15 10:00:00|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+
日時の切り出し
- JSTに変換したタイムスタンプ型のデータを、年、月、日、時に分割します。
SplitedDF = DFPartition_time.withColumn('year',date_format(DFPartition_time.partition_time,"y")).withColumn('month',date_format(DFPartition_time.partition_time,'MM')).withColumn('day',date_format(DFPartition_time.partition_time,"dd")).withColumn('hour',date_format(DFPartition_time.partition_time,"HH"))
- この時点のDataFrameの定義とデータは以下のとおりです。
- parition_timeから、year,month,day,hourに分割されました。
root
|-- log_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- year_work: string (nullable = true)
|-- month_work: string (nullable = true)
|-- day_work: string (nullable = true)
|-- hour_work: string (nullable = true)
|-- work_timestamp: string (nullable = false)
|-- work_timestamp2: timestamp (nullable = true)
|-- partition_time: timestamp (nullable = true)
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- day: string (nullable = true)
|-- hour: string (nullable = true)
===
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+
| log_date| value|year_work|month_work|day_work|hour_work| work_timestamp| work_timestamp2| partition_time|year|month|day|hour|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+
|20220415000000|100000| 2022| 04| 15| 00|2022-04-15 00:00:00|2022-04-15 00:00:00|2022-04-15 09:00:00|2022| 04| 15| 09|
|20220415010000|200000| 2022| 04| 15| 01|2022-04-15 01:00:00|2022-04-15 01:00:00|2022-04-15 10:00:00|2022| 04| 15| 10|
+--------------+------+---------+----------+--------+---------+-------------------+-------------------+-------------------+----+-----+---+----+
カラム削除
- 作業用のカラムを削除します。
DropedDF = SplitedDF.drop("year_work").drop("month_work").drop("day_work").drop("hour_work").drop("work_timestamp").drop("work_timestamp2").drop("partition_time")
- この時点のDataFrameの定義とデータは以下のとおりです。
root
|-- log_date: string (nullable = true)
|-- value: integer (nullable = true)
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- day: string (nullable = true)
|-- hour: string (nullable = true)
===
+--------------+------+----+-----+---+----+
| log_date| value|year|month|day|hour|
+--------------+------+----+-----+---+----+
|20220415000000|100000|2022| 04| 15| 09|
|20220415010000|200000|2022| 04| 15| 10|
+--------------+------+----+-----+---+----+
DynamicFrameに変換
- DataFrameからDynamicFrameに変更します。
DynamicFrameJST = DynamicFrame.fromDF(DropedDF,glueContext,'DynamicFrameAddColumn')
アウトプット
- Parquet&Snappyで書き込みます。
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=DynamicFrameJST,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://{アウトプットバケット}/",
"partitionKeys": ["year", "month", "day", "hour"],
},
format_options={"compression": "snappy"},
transformation_ctx="S3bucket_node3",
)
結果の確認
- 出力結果を確認します。
$ aws s3 ls s3://{アウトプットバケット} --recursive
2022-04-16 22:54:43 459 year=2022/month=04/day=15/hour=09/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
2022-04-16 22:54:43 459 year=2022/month=04/day=15/hour=10/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
$
- ファイルも読めました。
- Parquetファイルを読むために、parquet-toolsを使いました。parquet-toolsの使い方はこちらをご確認ください。
$ aws s3 cp s3://{アウトプットバケット}/year=2022/month=04/day=15/hour=09/run-1650117281422-part-block-0-0-r-00000-snappy.parquet - > ~/Downloads/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
$ parquet-tools cat ~/Downloads/run-1650117281422-part-block-0-0-r-00000-snappy.parquet
log_date = 20220415000000
value = 100000
$ aws s3 cp s3://{アウトプットバケット}/year=2022/month=04/day=15/hour=10/run-1650117281422-part-block-0-0-r-00001-snappy.parquet - > ~/Downloads/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
$ parquet-tools cat ~/Downloads/run-1650117281422-part-block-0-0-r-00001-snappy.parquet
log_date = 20220415010000
value = 200000
$
考察
- パーティション値の変換は、結構手間がかかるなぁというのが率直な感想です。
- 実は、もっと簡単にできる方法があるような気がしますので、ご存じの方は教えていただけると幸いです。
参考