LoginSignup
7
3

More than 5 years have passed since last update.

Glueの使い方的な㉒(csvデータをパーティション分割したparquetに変換_2)

Posted at

パーティション分割するジョブを作る

ジョブの内容

元データに"2018/08/26 12:00:01"みたいなタイムスタンプが入ってるデータを、パーティションによるディレクトリ構成にしてデータ配置とフォーマットなど変換する

※"Glueの使い方的な①(GUIでジョブ実行)"(以後①とだけ書きます)と同じ内容のデータで、ディレクトリyyyy/mmに分割してファイルを配置しています。

ジョブ名

se2_job12

クローラー名

se2_in10
se2_out12

全体の流れ

  • 前準備
  • 入力データのクローラー作成と実行
  • ジョブの作成と修正
  • ジョブの実行と確認

前準備

ソースデータ(19件)

csvlog-20171111.csv-csvlog-20171229.csv
$ ls
cvlog-20171111.csv  cvlog-20171201.csv  cvlog-20171216.csv
cvlog-20171114.csv  cvlog-20171202.csv  cvlog-20171217.csv
cvlog-20171121.csv  cvlog-20171209.csv  cvlog-20171219.csv
cvlog-20171129.csv  cvlog-20171214.csv  cvlog-20171229.csv
cvlog-20171130.csv  cvlog-20171215.csv

ヘッダ情報こんな感じ

ヘッダ情報
$ cat * | head
deviceid,uuid,appid,country,timestamp
iphone,11121,001,JP,2017/11/11 12:00:01
deviceid,uuid,appid,country,timestamp
iphone,11123,009,FR,2017/11/14 14:00:01
deviceid,uuid,appid,country,timestamp
iphone,11119,007,AUS,2017/11/21 14:00:01
deviceid,uuid,appid,country,timestamp
other,11110,005,JP,2017/11/29 15:55:01
iphone,11125,005,JP,2017/11/29 15:00:01
deviceid,uuid,appid,country,timestamp

データの内容

csvlog-20171111.csv-csvlog-20171229.csv
$ cat * | grep -v deviceid 
iphone,11121,001,JP,2017/11/11 12:00:01
iphone,11123,009,FR,2017/11/14 14:00:01
iphone,11119,007,AUS,2017/11/21 14:00:01
other,11110,005,JP,2017/11/29 15:55:01
iphone,11125,005,JP,2017/11/29 15:00:01
android,11122,001,FR,2017/11/30 20:00:01
iphone,11129,007,AUS,2017/11/30 14:33:01
pc,11118,001,FR,2017/12/01 01:00:01
pc,11117,009,FR,2017/12/02 18:00:01
iphone,11128,009,FR,2017/12/09 04:11:01
iphone,11111,001,JP,2017/12/14 12:00:01
android,11112,001,FR,2017/12/14 14:00:01
iphone,11116,001,JP,2017/12/15 11:00:01
iphone,11113,009,FR,2017/12/16 21:00:01
iphone,11114,007,AUS,2017/12/17 18:00:01
iphone,11124,007,AUS,2017/12/17 14:00:01
iphone,11126,001,JP,2017/12/19 08:00:01
android,11127,001,FR,2017/12/19 14:10:01
other,11115,005,JP,2017/12/29 15:00:01

件数19件

件数
$ cat * | grep -v deviceid |wc -l
      19

S3にデータをアップロード

$ aws s3 ls s3://test-glue00/se2/in10/ --recursive
2018-08-26 09:48:30          0 se2/in10/
2018-08-26 09:49:17          0 se2/in10/2017/
2018-08-26 09:49:24          0 se2/in10/2017/11/
2018-08-26 09:49:41         78 se2/in10/2017/11/cvlog-20171111.csv
2018-08-26 09:49:41         78 se2/in10/2017/11/cvlog-20171114.csv
2018-08-26 09:49:41         79 se2/in10/2017/11/cvlog-20171121.csv
2018-08-26 09:49:42        117 se2/in10/2017/11/cvlog-20171129.csv
2018-08-26 09:49:42        120 se2/in10/2017/11/cvlog-20171130.csv
2018-08-26 09:49:26          0 se2/in10/2017/12/
2018-08-26 09:49:57         74 se2/in10/2017/12/cvlog-20171201.csv
2018-08-26 09:49:58         74 se2/in10/2017/12/cvlog-20171202.csv
2018-08-26 09:49:58         78 se2/in10/2017/12/cvlog-20171209.csv
2018-08-26 09:49:58        119 se2/in10/2017/12/cvlog-20171214.csv
2018-08-26 09:49:58         78 se2/in10/2017/12/cvlog-20171215.csv
2018-08-26 09:49:58         78 se2/in10/2017/12/cvlog-20171216.csv
2018-08-26 09:49:59        120 se2/in10/2017/12/cvlog-20171217.csv
2018-08-26 09:49:59        119 se2/in10/2017/12/cvlog-20171219.csv
2018-08-26 09:49:59         77 se2/in10/2017/12/cvlog-20171229.csv

S3のディレクトリ構成

Glueジョブの入力データは"in10"ディレクトリ配下、出力は"out12"ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in10/
                           PRE out12/
                           PRE script/
                           PRE tmp/

入力データのクローラー作成と実行

入力データのクローラー作成

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの"クローラー"をクリックし、"クローラーの追加"をクリック

スクリーンショット 0030-08-26 11.13.47.png

"クローラーの名前"を入力し[次へ]をクリック

スクリーンショット 0030-08-26 11.15.24.png

"Choose a data store"にS3を選択し、"インクルードパス"にS3にあるソースデータのパス入力し[次へ]をクリック。その次も[次へ]をクリックし、その次は適切なIAMロールを選択し[次へ]をクリックし、その次も[次へ]をクリックする

スクリーンショット 0030-08-26 11.16.43.png

"データベース"に"se2"を入力し、"プレフィックス"に"se2_"を入力し[次へ]をクリック、次の画面で[完了]をクリックする

スクリーンショット 0030-08-26 11.20.09.png

クローラー実行

作成されたクローラー"se2_in10"にチェックを入れ[クローラの実行]をクリックする

スクリーンショット 0030-08-26 11.20.39.png

作成されたテーブル情報を確認

スクリーンショット 0030-08-26 11.24.39.png

スクリーンショット 0030-08-26 11.24.53.png

Athenaでも確認

スクリーンショット 0030-08-26 11.46.11.png

このあと、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

ジョブの作成

①と同じ手順でGUIのみの操作でse2_job12ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は"S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する"です。

se2_job12
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブの修正

以下の部分を修正します。

冒頭のimportに以下を追加

from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import month
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import year

35行目の"dropnullfields3"の後に以下を追加

df = dropnullfields3.toDF()
df = df.withColumn("timestamp2", to_timestamp(df.timestamp, 'yyyy/MM/dd HH:mm:ss'))
df = df.withColumn("year", year(df.timestamp2)).withColumn("month", month(df.timestamp2)).withColumn("day", dayofmonth(df.timestamp2))
partitionby=['year','month','day']
output='s3://test-glue00/se2/out12/'
codec='snappy'
df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

DataFrameのwrite()で出力するため、最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコード

se2_job12_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import month
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import year

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
df = dropnullfields3.toDF()
df = df.withColumn("timestamp2", to_timestamp(df.timestamp, 'yyyy/MM/dd HH:mm:ss'))
df = df.withColumn("year", year(df.timestamp2)).withColumn("month", month(df.timestamp2)).withColumn("day", dayofmonth(df.timestamp2))
partitionby=['year','month','day']
output='s3://test-glue00/se2/out12/'
codec='snappy'
df.repartition(*partitionby).write.partitionBy(partitionby).mode("overwrite").parquet(output,compression=codec)

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-08-26 13.47.56.png

 出力データのクローラー作成、実行、Athenaで確認

se2_out12でクローラー作成

se2_in10と同じ手順でクローラー作成し、クローラー実行する
※インクルードパスは"s3://test-glue00/se2/out12"

作成されたテーブルse2_out12の情報

スクリーンショット 0030-08-26 13.52.07.png

スクリーンショット 0030-08-26 13.52.17.png

Athenaで確認

スクリーンショット 0030-08-26 13.56.13.png

Zeppelin Notebookで確認

show()で各所確認

スクリーンショット 0030-08-26 13.57.56.png

to_timestampでデータタイプをtimestampに変換する

スクリーンショット 0030-08-26 13.59.25.png

変換したtimestampからyearやmonthを取得してpartitionにする

スクリーンショット 0030-08-26 13.59.38.png

こちらも是非

Spark SQL Date function
https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3