CLIによる操作でGlueジョブを作る
"Glueの使い方①(GUIでジョブ実行)"(以後①と書きます)で書いたように、現在GlueではGUIからジョブのコピーができないので、テスト時やデプロイ時などにもCLIでのジョブ操作が便利な場面があります
今回は①で実行したジョブをCLIで作成します
IAM role
ジョブ作成のコマンド発行するノードに付与するIAM roleもこの時に使ったtest-glueを使います。今回手元ではMacのPCでしたが本来だとジョブの作成や変更操作を行うスケジューラーなどに付与するIAM roleになると思います。
付与されるポリシーはこの2つ
・AmazonS3FullAccess
・AWSGlueServiceRole
全体の流れ
- 前準備
- CLIでジョブ作成
- トリガー作成
前準備
①で使ったジョブで実行されているPySparkスクリプトを持ってきます
Glueのジョブで実行されるスクリプトはここにあります
AWSマネージメントコンソールからGlueをクリック、左側メニューのETLの"Jobs"をクリック
対象ジョブにチェックを入れ、"Action"をクリックし"Edit job"をクリック
ジョブの内容が表示されます
Script pathに入力されているS3のパスが、このジョブで実行されるPySparkスクリプトの保存先です。
デフォルトだと以下の場所にスクリプトは保存されます。今回はデフォルトのままです。
s3://aws-glue-scripts-[AWSアカウントID]-[リージョン名]/[ユーザー名]/[ジョブ名]
ローカルにもってくる
このファイルをローカルにダウンロードておきます。
ダウンロードしたPySparkスクリプトは前回GUIのみで操作して作られたスクリプトです。
処理内容は、"指定したS3にあるcsvファイルを指定したS3にparquetとして出力する"というものです
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
CLIでジョブ作成
今回はAWS Cliを使います(他の各言語のSDKでも同じ操が作可能です)
まずawscliが古いとglueの操作ができないのでupgradeしておきましょう
pip install awscli --upgrade
Cliによるジョブ作成は、先程ダウンロードしたPySparkスクリプトファイルをリネームします。se2_job0.txtをse2_job2.pyにします。それをS3の任意の場所(今回はs3://test-glue00/se2/script/)にアップロードし、JSON形式でそこを指定してジョブ作成します
今回のジョブ作成に使ったJSON
{
"Name": "se2_job2",
"Description": "test",
"Role": "test-glue",
"ExecutionProperty": {
"MaxConcurrentRuns": 1
},
"Command": {
"Name": "glueetl",
"ScriptLocation": "s3://test-glue00/se2/script/se2_job2.py"
},
"MaxRetries": 0,
"AllocatedCapacity": 5
}
-
Name
ジョブ名 -
AllocatedCapacity
"The number of AWS Glue data processing units (DPUs) to allocate to this Job.From 2 to 100 DPUs can be allocated; the default is 10."
とあるようにこのジョブに割り当てるDPUを指定します。2-100で指定。デフォは10
https://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html -
ExecutionPropertyのMaxConcurrentRuns
ジョブの最大同時実行数。デフォは1 -
ScriptLocation
PySparkスクリプトファイルの保存場所 -
MaxRetries
ジョブの最大リトライ数。デフォは0
※別途スケジューラーを使ってジョブを実行してるならリトライ制御はスケジューラー側に任せたほうがよいかも -
CommandのName
"glueetl"でなければなりません。固定のようです
スケルトン出力
他のCLIと同じく補助機能でJSONのスケルトンを作るコマンドもあります。
$ aws glue create-job --generate-cli-skeleton
{
"Name": "",
"Description": "",
"LogUri": "",
"Role": "",
"ExecutionProperty": {
"MaxConcurrentRuns": 0
},
"Command": {
"Name": "",
"ScriptLocation": ""
},
"DefaultArguments": {
"KeyName": ""
},
"Connections": {
"Connections": [
""
]
},
"MaxRetries": 0,
"AllocatedCapacity": 0
}
実行結果
こんな感じで実行します
以下実行結果です
$ aws glue create-job --cli-input-json file://test.json
{
"Name": "se2_job2"
}
コマンドラインに他の引数があればJSONを上書きします。ファイルよりコマンドの引数の方が強いです。
ベースのJSONを作っておいて、DPUなどを状況に応じて大きくするとかいいと思います
引数では以下を指定できます
$ aws glue create-job help
CREATE-JOB() CREATE-JOB()
NAME
create-job -
DESCRIPTION
Creates a new job.
See also: AWS API Documentation
SYNOPSIS
create-job
--name <value>
[--description <value>]
[--log-uri <value>]
--role <value>
[--execution-property <value>]
--command <value>
[--default-arguments <value>]
[--connections <value>]
[--max-retries <value>]
[--allocated-capacity <value>]
[--cli-input-json <value>]
[--generate-cli-skeleton <value>]
GUIからもジョブが作成されていることがわかります。
ジョブを実行
対象ジョブのse2_job2をチェックし、Actionの"Run job"をクリックします。
画面のように正常に完了しています。
同じ結果がS3へ出力されています。
このようにコマンドであれば同じジョブを作りやすいので、DPUだけ変えるとか、テストのためにテストデータの入力や出力だけ変えて同じ処理を実行するなど、パラメータの一部だけ変更したジョブを作りやすいです。
ただ、画面からわかるように今回の出力のparquetファイル1つとその他メタデータが最新の日付で出力と更新がされていますが、コピー元ジョブで実行したparquetファイルが1つ残っています。
コピー元ジョブの処理内容は
"S3にある1つのcsvファイルをparquetにしてS3に出力する"という処理でした。
Athenaでクエリ実行すると件数が倍の38件で、出力が重複していることがわかります。(元データは19件でした)
ジョブフローを設計する時やスケジューラーの機能などで、処理したデータはムーブしたり消したりする場合もありますし、処理対象のディレクトリをタイムスタンプなどで判別して古いデータは処理対象から除外する場合もあると思います。どういったジョブフローにしているかに依存する部分かもですが、Glueにはこういった事象を防ぐブックマークという機能があります。ブックマーク機能を有効にすると既に処理したデータを処理の対象外とすることができます。
これはまたの機会で書けたらと思います。
トリガー作成
Glueには簡易的なスケジュール機能にTriggerがあります。
以下3つのスケジューリングができます。今回はCRON形式で作成してみます
選べるTrigger Typeは以下3つです
・CRON形式
・前のジョブが完了したら実行
・手動実行
このトリガーの対象とするジョブを選びます。ジョブ名の横にある"Add"をクリックすることで選択できます。
"Next"をクリックし最後にサマリがでるので問題なければ"Finish"をクリックします。
Triggerのコマンドライン操作もいくつか書いておきます。
先程作ったse2_trigger2トリガーの内容表示
$ aws glue get-trigger --name se2_trigger2
{
"Trigger": {
"Predicate": {},
"Name": "se2_trigger2",
"Schedule": "cron(0 0 * * ? *)",
"Actions": [
{
"Arguments": {
"--job-bookmark-option": "job-bookmark-enable"
},
"JobName": "se2_job2"
}
],
"State": "CREATED",
"Type": "SCHEDULED"
}
}
トリガー作成用のjsonスケルトン表示
$ aws glue create-trigger --generate-cli-skeleton
{
"Name": "",
"Type": "SCHEDULED",
"Schedule": "",
"Predicate": {
"Logical": "AND",
"Conditions": [
{
"LogicalOperator": "EQUALS",
"JobName": "",
"State": "STARTING"
}
]
},
"Actions": [
{
"JobName": "",
"Arguments": {
"KeyName": ""
}
}
],
"Description": ""
}
スケルトンを元に先程のトリガse2_trigger2と同じ内容で、名前だけse2_trigger3に変更したJSON作成
$ cat testtrigger2.json
{
"Name": "se2_trigger3",
"Type": "SCHEDULED",
"Schedule": "cron(0 0 * * ? *)",
"Actions": [
{
"JobName": "se2_job2",
"Arguments": {
"--job-bookmark-option": "job-bookmark-enable"
}
}
],
"Description": ""
}
se2_trigger3を作成
$ aws glue create-trigger --cli-input-json file://testtrigger2.json
{
"Name": "se2_trigger3"
}
トリガー更新
以下でcronの時間を0時から2時に変更してます。
$ cat testtrigger2_upd.json
{
"Name": "se2_trigger3",
"TriggerUpdate": {
"Name": "se2_trigger3",
"Description": "",
"Schedule": "cron(0 2 * * ? *)",
"Actions": [
{
"JobName": "se2_job2",
"Arguments": {
"--job-bookmark-option": "job-bookmark-enable"
}
}
]
}
}
$ aws glue update-trigger --cli-input-json file://testtrigger2_upd.json
{
"Trigger": {
"Predicate": {},
"Name": "se2_trigger3",
"Schedule": "cron(0 2 * * ? *)",
"Actions": [
{
"Arguments": {
"--job-bookmark-option": "job-bookmark-enable"
},
"JobName": "se2_job2"
}
],
"State": "CREATED",
"Type": "SCHEDULED"
}
}
その他
CloudFormationも対応しているのでそちらでもCLI操作や自動化が可能かと思います。
ジョブのデプロイ時、より慎重にやるなら、出力先を一時的な別のS3や一時的なDBに変更したジョブを作り、既存と並行稼働させるのもいいかと思います。
ジョブフローの形成は、どのようなデプロイツールを使っているか、どのようなジョブスケジューラーを使っているかにもよるものです。
To Be Continue
Triggerの設定時やStartJobRunコマンドの"--job-bookmark-option"でブックマークを有効無効にできます。
ブックマーク機能について今後書いていければと思います。
Glueのトリガーで設定できるスケジュールは見ての通り現在は簡易的なものです。
AWS Step Functionsで少し複雑なジョブフロー作成について今後書いていければと思います。
参考
AWS CLI
http://docs.aws.amazon.com/cli/latest/reference/glue/create-job.html
http://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html
PythonのAPI
http://boto3.readthedocs.io/en/latest/reference/services/glue.html#Glue.Client.create_job
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f