パーティション分割するジョブを作る
ジョブの内容
元データに"2018/08/26 12:00:01"みたいなタイムスタンプが入ってるデータを、パーティションによるディレクトリ構成にしてデータ配置とフォーマットなど変換する
※"Glueの使い方的な①(GUIでジョブ実行)"(以後①とだけ書きます)と同じ内容のデータで、ディレクトリyyyy/mmに分割してファイルを配置しています。
ジョブ名
se2_job12
クローラー名
se2_in10
se2_out12
全体の流れ
- 前準備
- 入力データのクローラー作成と実行
- ジョブの作成と修正
- ジョブの実行と確認
前準備
ソースデータ(19件)
$ ls
cvlog-20171111.csv cvlog-20171201.csv cvlog-20171216.csv
cvlog-20171114.csv cvlog-20171202.csv cvlog-20171217.csv
cvlog-20171121.csv cvlog-20171209.csv cvlog-20171219.csv
cvlog-20171129.csv cvlog-20171214.csv cvlog-20171229.csv
cvlog-20171130.csv cvlog-20171215.csv
ヘッダ情報こんな感じ
$ cat * | head
deviceid,uuid,appid,country,timestamp
iphone,11121,001,JP,2017/11/11 12:00:01
deviceid,uuid,appid,country,timestamp
iphone,11123,009,FR,2017/11/14 14:00:01
deviceid,uuid,appid,country,timestamp
iphone,11119,007,AUS,2017/11/21 14:00:01
deviceid,uuid,appid,country,timestamp
other,11110,005,JP,2017/11/29 15:55:01
iphone,11125,005,JP,2017/11/29 15:00:01
deviceid,uuid,appid,country,timestamp
データの内容
$ cat * | grep -v deviceid
iphone,11121,001,JP,2017/11/11 12:00:01
iphone,11123,009,FR,2017/11/14 14:00:01
iphone,11119,007,AUS,2017/11/21 14:00:01
other,11110,005,JP,2017/11/29 15:55:01
iphone,11125,005,JP,2017/11/29 15:00:01
android,11122,001,FR,2017/11/30 20:00:01
iphone,11129,007,AUS,2017/11/30 14:33:01
pc,11118,001,FR,2017/12/01 01:00:01
pc,11117,009,FR,2017/12/02 18:00:01
iphone,11128,009,FR,2017/12/09 04:11:01
iphone,11111,001,JP,2017/12/14 12:00:01
android,11112,001,FR,2017/12/14 14:00:01
iphone,11116,001,JP,2017/12/15 11:00:01
iphone,11113,009,FR,2017/12/16 21:00:01
iphone,11114,007,AUS,2017/12/17 18:00:01
iphone,11124,007,AUS,2017/12/17 14:00:01
iphone,11126,001,JP,2017/12/19 08:00:01
android,11127,001,FR,2017/12/19 14:10:01
other,11115,005,JP,2017/12/29 15:00:01
件数19件
$ cat * | grep -v deviceid |wc -l
19
S3にデータをアップロード
$ aws s3 ls s3://test-glue00/se2/in10/ --recursive
2018-08-26 09:48:30 0 se2/in10/
2018-08-26 09:49:17 0 se2/in10/2017/
2018-08-26 09:49:24 0 se2/in10/2017/11/
2018-08-26 09:49:41 78 se2/in10/2017/11/cvlog-20171111.csv
2018-08-26 09:49:41 78 se2/in10/2017/11/cvlog-20171114.csv
2018-08-26 09:49:41 79 se2/in10/2017/11/cvlog-20171121.csv
2018-08-26 09:49:42 117 se2/in10/2017/11/cvlog-20171129.csv
2018-08-26 09:49:42 120 se2/in10/2017/11/cvlog-20171130.csv
2018-08-26 09:49:26 0 se2/in10/2017/12/
2018-08-26 09:49:57 74 se2/in10/2017/12/cvlog-20171201.csv
2018-08-26 09:49:58 74 se2/in10/2017/12/cvlog-20171202.csv
2018-08-26 09:49:58 78 se2/in10/2017/12/cvlog-20171209.csv
2018-08-26 09:49:58 119 se2/in10/2017/12/cvlog-20171214.csv
2018-08-26 09:49:58 78 se2/in10/2017/12/cvlog-20171215.csv
2018-08-26 09:49:58 78 se2/in10/2017/12/cvlog-20171216.csv
2018-08-26 09:49:59 120 se2/in10/2017/12/cvlog-20171217.csv
2018-08-26 09:49:59 119 se2/in10/2017/12/cvlog-20171219.csv
2018-08-26 09:49:59 77 se2/in10/2017/12/cvlog-20171229.csv
S3のディレクトリ構成
Glueジョブの入力データは"in10"ディレクトリ配下、出力は"out12"ディレクトリ配下
$ aws s3 ls s3://test-glue00/se2/
PRE in10/
PRE out12/
PRE script/
PRE tmp/
入力データのクローラー作成と実行
入力データのクローラー作成
AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの"クローラー"をクリックし、"クローラーの追加"をクリック
"クローラーの名前"を入力し[次へ]をクリック
"Choose a data store"にS3を選択し、"インクルードパス"にS3にあるソースデータのパス入力し[次へ]をクリック。その次も[次へ]をクリックし、その次は適切なIAMロールを選択し[次へ]をクリックし、その次も[次へ]をクリックする
"データベース"に"se2"を入力し、"プレフィックス"に"se2_"を入力し[次へ]をクリック、次の画面で[完了]をクリックする
クローラー実行
作成されたクローラー"se2_in10"にチェックを入れ[クローラの実行]をクリックする
作成されたテーブル情報を確認
Athenaでも確認
このあと、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます
ジョブ作成と修正
ジョブの作成
①と同じ手順でGUIのみの操作でse2_job12ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。
処理内容は"S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する"です。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
ジョブの修正
以下の部分を修正します。
冒頭のimportに以下を追加
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import month
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import year
35行目の"dropnullfields3"の後に以下を追加
df = dropnullfields3.toDF()
df = df.withColumn("timestamp2", to_timestamp(df.timestamp, 'yyyy/MM/dd HH:mm:ss'))
df = df.withColumn("year", year(df.timestamp2)).withColumn("month", month(df.timestamp2)).withColumn("day", dayofmonth(df.timestamp2))
partitionby=['year','month','day']
output='s3://test-glue00/se2/out12/'
codec='snappy'
df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先
DataFrameのwrite()で出力するため、最後の方のsink処理をコメントアウト
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
修正したコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import dayofmonth
from pyspark.sql.functions import month
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import year
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("timestamp", "string", "timestamp", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
df = dropnullfields3.toDF()
df = df.withColumn("timestamp2", to_timestamp(df.timestamp, 'yyyy/MM/dd HH:mm:ss'))
df = df.withColumn("year", year(df.timestamp2)).withColumn("month", month(df.timestamp2)).withColumn("day", dayofmonth(df.timestamp2))
partitionby=['year','month','day']
output='s3://test-glue00/se2/out12/'
codec='snappy'
df.repartition(*partitionby).write.partitionBy(partitionby).mode("overwrite").parquet(output,compression=codec)
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out12"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
ジョブ実行と確認
ジョブ実行
対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。
# 出力データのクローラー作成、実行、Athenaで確認
se2_out12でクローラー作成
se2_in10と同じ手順でクローラー作成し、クローラー実行する
※インクルードパスは"s3://test-glue00/se2/out12"
作成されたテーブルse2_out12の情報
Athenaで確認
Zeppelin Notebookで確認
show()で各所確認
to_timestampでデータタイプをtimestampに変換する
変換したtimestampからyearやmonthを取得してpartitionにする
こちらも是非
Spark SQL Date function
https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f