概要
Glue ETLジョブがCloudWatchメトリクスをサポートしました。
今回は、Glueチュートリアルのflightdataを使うジョブを実行して試します。
以下がGlueのチュートリアル
まずはチュートリアルにそって操作を行うと、以下のGlueの各リソースが出来上がり、Flight Dataのparquetフォーマット変換処理が行われます。そのままこのリソースで試してもいいし、別の名前でリソースを作って試しても構いません。
- Crawler : flights data crawler
- Job : flights conversion
- input data : s3://crawler-public-ap-northeast-1/flight/2016/csv/
- output data : s3://test-glue00/se2/out_flightdata
今回は以下のような名前でリソースを作ります。内容はチュートリアルと同じです。
ジョブ名
se2_monitortest
クローラー名(今回は使わない)
flights_data_csv
flights_data_out1
全体の流れ
- モニタリング有効化
- ジョブ実行
- CloudWatch確認
- 料金
モニタリング有効化
デフォルトではモニタリングされません。
既存のジョブは、該当ジョブにチェックを入れ、"Edit job"をクリック
Advanced propertiesのJob profilingを"Enable"にする。これでこのジョブのモニタリングがされることになります。
ジョブ実行時のみモニタリングを有効化することもできます。
該当ジョブにチェックを入れ"Run job"をクリックし、Advanced propertiesのJob profilingを"Enable"にすることでOK。
ジョブ実行
ジョブのコードの内容は下に貼ったようにほぼほぼチュートリアルのままです。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "flights_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "flights_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("year", "long", "year", "long"), ("quarter", "long", "quarter", "long"), ("month", "long", "month", "long"), ("day_of_month", "long", "day_of_month", "long"), ("day_of_week", "long", "day_of_week", "long"), ("fl_date", "string", "fl_date", "string"), ("unique_carrier", "string", "unique_carrier", "string"), ("airline_id", "long", "airline_id", "long"), ("carrier", "string", "carrier", "string"), ("tail_num", "string", "tail_num", "string"), ("fl_num", "long", "fl_num", "long"), ("origin_airport_id", "long", "origin_airport_id", "long"), ("origin_airport_seq_id", "long", "origin_airport_seq_id", "long"), ("origin_city_market_id", "long", "origin_city_market_id", "long"), ("origin", "string", "origin", "string"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("origin_state_fips", "long", "origin_state_fips", "long"), ("origin_state_nm", "string", "origin_state_nm", "string"), ("origin_wac", "long", "origin_wac", "long"), ("dest_airport_id", "long", "dest_airport_id", "long"), ("dest_airport_seq_id", "long", "dest_airport_seq_id", "long"), ("dest_city_market_id", "long", "dest_city_market_id", "long"), ("dest", "string", "dest", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dest_state_fips", "long", "dest_state_fips", "long"), ("dest_state_nm", "string", "dest_state_nm", "string"), ("dest_wac", "long", "dest_wac", "long"), ("crs_dep_time", "long", "crs_dep_time", "long"), ("dep_time", "long", "dep_time", "long"), ("dep_delay", "long", "dep_delay", "long"), ("dep_delay_new", "long", "dep_delay_new", "long"), ("dep_del15", "long", "dep_del15", "long"), ("dep_delay_group", "long", "dep_delay_group", "long"), ("dep_time_blk", "string", "dep_time_blk", "string"), ("taxi_out", "long", "taxi_out", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("taxi_in", "long", "taxi_in", "long"), ("crs_arr_time", "long", "crs_arr_time", "long"), ("arr_time", "long", "arr_time", "long"), ("arr_delay", "long", "arr_delay", "long"), ("arr_delay_new", "long", "arr_delay_new", "long"), ("arr_del15", "long", "arr_del15", "long"), ("arr_delay_group", "long", "arr_delay_group", "long"), ("arr_time_blk", "string", "arr_time_blk", "string"), ("cancelled", "long", "cancelled", "long"), ("cancellation_code", "string", "cancellation_code", "string"), ("diverted", "long", "diverted", "long"), ("crs_elapsed_time", "long", "crs_elapsed_time", "long"), ("actual_elapsed_time", "long", "actual_elapsed_time", "long"), ("air_time", "long", "air_time", "long"), ("flights", "long", "flights", "long"), ("distance", "long", "distance", "long"), ("distance_group", "long", "distance_group", "long"), ("carrier_delay", "long", "carrier_delay", "long"), ("weather_delay", "long", "weather_delay", "long"), ("nas_delay", "long", "nas_delay", "long"), ("security_delay", "long", "security_delay", "long"), ("late_aircraft_delay", "long", "late_aircraft_delay", "long"), ("first_dep_time", "long", "first_dep_time", "long"), ("total_add_gtime", "long", "total_add_gtime", "long"), ("longest_add_gtime", "long", "longest_add_gtime", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out_flightdata"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
(チュートリアルのためDPUを変えてもジョブの実行時間は全て2分程度で終わっています)
これをモニタリング有効にし、DPU=2で2回、DPU=5で2回の合計4回実行します。
以下実行後のGlueジョブの"Metrics"タブの画面。ここでは"ETL Job Movement"と"Memory profile"が表示される
画面左上の"View additional metrics"をクリックするともっと表示される
DPU=2の時はExecutorが2つ起動しCPU利用率も高いが
DPU=5とした時はExecutorがおよそ5つ起動しCPU利用率も下がっている
その他
表示される時間はUTC。
表示はだいたい5分遅れくらい。
デフォルトで1分おきにメトリクスが取得さている。(今の所変更はできません)
まぁこの辺はバッチ処理なんで十分かなと。
CloudWatch確認
先程の続きで、右上の"点々"ボタンをクリックし、"メトリクスを表示”をクリックするとCloudWatchに移動する。
もちろんアラーム設定も可能です。
(ちなみにCloudWatch側の画面から、Glueのメトリクスには辿って行けなかった)
料金
以下の料金のリンクを見ても、CloudWatchについてはCloudWatch logsとCloudWatch Eventsを使った場合に料金がかかるとあるので、20180714現在はメトリクス自体には費用はかからなそうです。どのジョブもデフォルトでモニタリングを有効にしておいても良さそう。
https://aws.amazon.com/glue/pricing/
その他
こちらのドキュメントにデバックパターンがあるので参考にしてください。
- Debugging OOM Exceptions and Job Abnormalities
- Debugging Demanding Stages and Straggler Tasks
- Monitoring the Progress of Multiple Jobs
- Monitoring for DPU Capacity Planning
こちらも是非
Glueマニュアル
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f
Glue ETL Job の CloudWatch Metricsについて
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-profile-glue-job-cloudwatch-metrics.html