Help us understand the problem. What is going on with this article?

Glueの使い方的な⑯(出力ファイル数をまとめる)

More than 1 year has passed since last update.

parquetファイルがたくさんできるのを任意の数にする

今回使うGlueのリソースは、Glueのチュートリアルのもの

スクリーンショット 0030-06-05 21.50.09.png

まずはチュートリアルにそって操作を行う。Flight Dataのparquetフォーマット変換処理が行われる。

  • Crawler : flights data crawler
  • Job : flights conversion
  • input data : s3://crawler-public-ap-northeast-1/flight/2016/csv/
  • output data : s3://test-glue00/se2/out_flightdata

全体の流れ

  • チュートリアルの結果ファイル確認
  • チュートリアルのコード修正
  • 修正したコードの出力ファイル確認

チュートリアルの結果ファイル確認

S3にはparquetファイルが複数できている

スクリーンショット 0030-06-05 21.56.10.png

スクリーンショット 0030-06-05 22.04.50.png

全部で23個のファイルができている

Athenaで確認

Crawler を "flight data out"で作成

Table 確認

スクリーンショット 0030-06-05 22.00.30.png

Athena確認

スクリーンショット 0030-06-05 22.01.30.png

件数

スクリーンショット 0030-06-05 22.02.30.png

チュートリアルのコード修正

コード修正

Jobの"flights conversion"にチェックを入れ、Actionの"Edit script"をクリック

スクリーンショット 0030-06-05 22.09.47.png

表示されるコードがこちら

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")

以下を"dropnullfields3"の下に追記

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
dropnullfields3 = dropnullfields3.toDF().repartition(4)
dropnullfields3.write.mode('overwrite').parquet('s3://test-glue00/se2/out_flightdata1/')
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

修正したコードの出力ファイル確認

repartitionで指定した4つにファイルが出力されている

スクリーンショット 0030-06-05 23.08.56.png

Crawlerを作成しクローリングしTableを作成後、Athenaで確認

件数変わらず

スクリーンショット 0030-06-05 23.11.55.png

クエリ結果によるデータ内容も変わらず

スクリーンショット 0030-06-05 23.11.27.png

1つのファイルが細かすぎる場合、想定する性能がでないことがあります。SparkやHadoopは1ファイルのサイズは128-512MBあたりを推奨しているのである程度大きめな単位が望ましいです。出力ファイルが細かすぎる場合は、sparkの処理の中でrepartitionをすることでマージすることができます。

repartitionについてはピンクのブタさんブログに詳しく書かれてます
https://yohei-a.hatenablog.jp/entry/20181206/1544036150

従来のHadoopであればFileUtils.copyMerge
https://qiita.com/RyujiKawazoe/items/0d2051637e9ce3b33e20

またはAWSであれば s3distcp でマージも可能です

Glueだけで完結させたい場合は今回のようなやり方がよいかと思います。

To Be Continue

TODO

こちらも是非

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

pioho07
こちらに記載の内容は所属会社とは関係ありませぬ。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした