7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Glueの使い方的な⑯(出力ファイル数をまとめる)

Last updated at Posted at 2018-06-05

parquetファイルがたくさんできるのを任意の数にする

今回使うGlueのリソースは、Glueのチュートリアルのもの

スクリーンショット 0030-06-05 21.50.09.png

まずはチュートリアルにそって操作を行う。Flight Dataのparquetフォーマット変換処理が行われる。

  • Crawler : flights data crawler
  • Job : flights conversion
  • input data : s3://crawler-public-ap-northeast-1/flight/2016/csv/
  • output data : s3://test-glue00/se2/out_flightdata

全体の流れ

  • チュートリアルの結果ファイル確認
  • チュートリアルのコード修正
  • 修正したコードの出力ファイル確認

チュートリアルの結果ファイル確認

S3にはparquetファイルが複数できている

スクリーンショット 0030-06-05 21.56.10.png スクリーンショット 0030-06-05 22.04.50.png

全部で23個のファイルができている

Athenaで確認

Crawler を "flight data out"で作成

Table 確認

スクリーンショット 0030-06-05 22.00.30.png

Athena確認

スクリーンショット 0030-06-05 22.01.30.png

件数

スクリーンショット 0030-06-05 22.02.30.png

チュートリアルのコード修正

コード修正

Jobの"flights conversion"にチェックを入れ、Actionの"Edit script"をクリック

スクリーンショット 0030-06-05 22.09.47.png

表示されるコードがこちら

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")

以下を"dropnullfields3"の下に追記

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

修正したコードの出力ファイル確認

repartitionで指定した4つにファイルが出力されている

スクリーンショット 0030-06-05 23.08.56.png

Crawlerを作成しクローリングしTableを作成後、Athenaで確認

件数変わらず

スクリーンショット 0030-06-05 23.11.55.png

クエリ結果によるデータ内容も変わらず

スクリーンショット 0030-06-05 23.11.27.png

1つのファイルが細かすぎる場合、想定する性能がでないことがあります。SparkやHadoopは1ファイルのサイズは128-512MBあたりを推奨しているのである程度大きめな単位が望ましいです。出力ファイルが細かすぎる場合は、sparkの処理の中でrepartitionをすることでマージすることができます。

repartitionについてはピンクのブタさんブログに詳しく書かれてます
https://yohei-a.hatenablog.jp/entry/20181206/1544036150

従来のHadoopであればFileUtils.copyMerge
https://qiita.com/RyujiKawazoe/items/0d2051637e9ce3b33e20

またはAWSであれば s3distcp でマージも可能です

Glueだけで完結させたい場合は今回のようなやり方がよいかと思います。

To Be Continue

TODO

こちらも是非

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?