parquetファイルがたくさんできるのを任意の数にする
今回使うGlueのリソースは、Glueのチュートリアルのもの
まずはチュートリアルにそって操作を行う。Flight Dataのparquetフォーマット変換処理が行われる。
- Crawler : flights data crawler
- Job : flights conversion
- input data : s3://crawler-public-ap-northeast-1/flight/2016/csv/
- output data : s3://test-glue00/se2/out_flightdata
全体の流れ
- チュートリアルの結果ファイル確認
- チュートリアルのコード修正
- 修正したコードの出力ファイル確認
チュートリアルの結果ファイル確認
S3にはparquetファイルが複数できている
全部で23個のファイルができている
Athenaで確認
Crawler を "flight data out"で作成
Table 確認
Athena確認
件数
チュートリアルのコード修正
コード修正
Jobの"flights conversion"にチェックを入れ、Actionの"Edit script"をクリック
表示されるコードがこちら
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
以下をコメントアウト
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
以下を"dropnullfields3"の下に追記
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
修正したコードの出力ファイル確認
repartitionで指定した4つにファイルが出力されている
Crawlerを作成しクローリングしTableを作成後、Athenaで確認
件数変わらず
クエリ結果によるデータ内容も変わらず
1つのファイルが細かすぎる場合、想定する性能がでないことがあります。SparkやHadoopは1ファイルのサイズは128-512MBあたりを推奨しているのである程度大きめな単位が望ましいです。出力ファイルが細かすぎる場合は、sparkの処理の中でrepartitionをすることでマージすることができます。
repartitionについてはピンクのブタさんブログに詳しく書かれてます
https://yohei-a.hatenablog.jp/entry/20181206/1544036150
従来のHadoopであればFileUtils.copyMerge
https://qiita.com/RyujiKawazoe/items/0d2051637e9ce3b33e20
またはAWSであれば s3distcp でマージも可能です
Glueだけで完結させたい場合は今回のようなやり方がよいかと思います。
To Be Continue
TODO
こちらも是非
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f