AWS
Spark
glue
Athena

Glueの使い方的な③(CLIでジョブ作成)

CLIによる操作でGlueジョブを作る

"Glueの使い方①(GUIでジョブ実行)"(以後①と書きます)で書いたように、現在GlueではGUIからジョブのコピーができないので、テスト時やデプロイ時などにもCLIでのジョブ操作が便利な場面があります

今回は①で実行したジョブをCLIで作成します

IAM role

ジョブ作成のコマンド発行するノードに付与するIAM roleもこの時に使ったtest-glueを使います。今回手元ではMacのPCでしたが本来だとジョブの作成や変更操作を行うスケジューラーなどに付与するIAM roleになると思います。
付与されるポリシーはこの2つ
・AmazonS3FullAccess
・AWSGlueServiceRole

全体の流れ

  • 前準備
  • CLIでジョブ作成
  • トリガー作成

前準備

①で使ったジョブで実行されているPySparkスクリプトを持ってきます

Glueのジョブで実行されるスクリプトはここにあります

AWSマネージメントコンソールからGlueをクリック、左側メニューのETLの"Jobs"をクリック
対象ジョブにチェックを入れ、"Action"をクリックし"Edit job"をクリック

スクリーンショット 0030-01-01 18.39.07.png

ジョブの内容が表示されます

Script pathに入力されているS3のパスが、このジョブで実行されるPySparkスクリプトの保存先です。
デフォルトだと以下の場所にスクリプトは保存されます。今回はデフォルトのままです。

s3://aws-glue-scripts-[AWSアカウントID]-[リージョン名]/[ユーザー名]/[ジョブ名]

スクリーンショット 0030-01-02 15.54.51.png

ローカルにもってくる

このファイルをローカルにダウンロードておきます。
ダウンロードしたPySparkスクリプトは前回GUIのみで操作して作られたスクリプトです。
処理内容は、"指定したS3にあるcsvファイルを指定したS3にparquetとして出力する"というものです

se2_job0.txt
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

CLIでジョブ作成

今回はAWS Cliを使います(他の各言語のSDKでも同じ操が作可能です)

まずawscliが古いとglueの操作ができないのでupgradeしておきましょう

pip install awscli --upgrade

Cliによるジョブ作成は、先程ダウンロードしたPySparkスクリプトファイルをリネームします。se2_job0.txtをse2_job2.pyにします。それをS3の任意の場所(今回はs3://test-glue00/se2/script/)にアップロードし、JSON形式でそこを指定してジョブ作成します

今回のジョブ作成に使ったJSON

test.json
{
    "Name": "se2_job2", 
    "Description": "test", 
    "Role": "test-glue", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 1
    }, 
    "Command": {
        "Name": "glueetl", 
        "ScriptLocation": "s3://test-glue00/se2/script/se2_job2.py"
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 5
}
  • Name
    ジョブ名

  • AllocatedCapacity
    "The number of AWS Glue data processing units (DPUs) to allocate to this Job.From 2 to 100 DPUs can be allocated; the default is 10."
    とあるようにこのジョブに割り当てるDPUを指定します。2-100で指定。デフォは10
    https://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

  • ExecutionPropertyのMaxConcurrentRuns
    ジョブの最大同時実行数。デフォは1

  • ScriptLocation
    PySparkスクリプトファイルの保存場所

  • MaxRetries
    ジョブの最大リトライ数。デフォは0
    ※別途スケジューラーを使ってジョブを実行してるならリトライ制御はスケジューラー側に任せたほうがよいかも

  • CommandのName
    "glueetl"でなければなりません。固定のようです

スケルトン出力

他のCLIと同じく補助機能でJSONのスケルトンを作るコマンドもあります。

cli
$ aws glue create-job --generate-cli-skeleton 
{
    "Name": "", 
    "Description": "", 
    "LogUri": "", 
    "Role": "", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 0
    }, 
    "Command": {
        "Name": "", 
        "ScriptLocation": ""
    }, 
    "DefaultArguments": {
        "KeyName": ""
    }, 
    "Connections": {
        "Connections": [
            ""
        ]
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 0
}

実行結果

こんな感じで実行します
以下実行結果です

cli
$ aws glue create-job --cli-input-json file://test.json
{
    "Name": "se2_job2"
}

コマンドラインに他の引数があればJSONを上書きします。ファイルよりコマンドの引数の方が強いです。
ベースのJSONを作っておいて、DPUなどを状況に応じて大きくするとかいいと思います

引数では以下を指定できます

$ aws glue create-job help

CREATE-JOB()                                                      CREATE-JOB()



NAME
       create-job -

DESCRIPTION
       Creates a new job.

       See also: AWS API Documentation

SYNOPSIS
            create-job
          --name <value>
          [--description <value>]
          [--log-uri <value>]
          --role <value>
          [--execution-property <value>]
          --command <value>
          [--default-arguments <value>]
          [--connections <value>]
          [--max-retries <value>]
          [--allocated-capacity <value>]
          [--cli-input-json <value>]
          [--generate-cli-skeleton <value>]

GUIからもジョブが作成されていることがわかります。

スクリーンショット 0030-01-02 16.05.41.png

ジョブを実行

対象ジョブのse2_job2をチェックし、Actionの"Run job"をクリックします。
画面のように正常に完了しています。
スクリーンショット 0030-01-02 16.10.18.png

同じ結果がS3へ出力されています。
このようにコマンドであれば同じジョブを作りやすいので、DPUだけ変えるとか、テストのためにテストデータの入力や出力だけ変えて同じ処理を実行するなど、パラメータの一部だけ変更したジョブを作りやすいです。

ただ、画面からわかるように今回の出力のparquetファイル1つとその他メタデータが最新の日付で出力と更新がされていますが、コピー元ジョブで実行したparquetファイルが1つ残っています。

スクリーンショット 0030-01-02 16.12.04.png

コピー元ジョブの処理内容は

"S3にある1つのcsvファイルをparquetにしてS3に出力する"という処理でした。

Athenaでクエリ実行すると件数が倍の38件で、出力が重複していることがわかります。(元データは19件でした)

スクリーンショット 0030-01-02 16.17.43.png

ジョブフローを設計する時やスケジューラーの機能などで、処理したデータはムーブしたり消したりする場合もありますし、処理対象のディレクトリをタイムスタンプなどで判別して古いデータは処理対象から除外する場合もあると思います。どういったジョブフローにしているかに依存する部分かもですが、Glueにはこういった事象を防ぐブックマークという機能があります。ブックマーク機能を有効にすると既に処理したデータを処理の対象外とすることができます。
これはまたの機会で書けたらと思います。

トリガー作成

Glueには簡易的なスケジュール機能にTriggerがあります。
以下3つのスケジューリングができます。今回はCRON形式で作成してみます
選べるTrigger Typeは以下3つです
・CRON形式
・前のジョブが完了したら実行
・手動実行

スクリーンショット 0030-01-02 16.36.19.png

このトリガーの対象とするジョブを選びます。ジョブ名の横にある"Add"をクリックすることで選択できます。
"Next"をクリックし最後にサマリがでるので問題なければ"Finish"をクリックします。

スクリーンショット 0030-01-02 16.37.22.png

Triggerのコマンドライン操作もいくつか書いておきます。

先程作ったse2_trigger2トリガーの内容表示

get-trigger
$ aws glue get-trigger --name se2_trigger2
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger2", 
        "Schedule": "cron(0 0 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

トリガー作成用のjsonスケルトン表示

skelton
$ aws glue create-trigger --generate-cli-skeleton 
{
    "Name": "", 
    "Type": "SCHEDULED", 
    "Schedule": "", 
    "Predicate": {
        "Logical": "AND", 
        "Conditions": [
            {
                "LogicalOperator": "EQUALS", 
                "JobName": "", 
                "State": "STARTING"
            }
        ]
    }, 
    "Actions": [
        {
            "JobName": "", 
            "Arguments": {
                "KeyName": ""
            }
        }
    ], 
    "Description": ""
}

スケルトンを元に先程のトリガse2_trigger2と同じ内容で、名前だけse2_trigger3に変更したJSON作成

testtrigger2.json
$ cat testtrigger2.json 
{
    "Name": "se2_trigger3", 
    "Type": "SCHEDULED", 
    "Schedule": "cron(0 0 * * ? *)", 
    "Actions": [
        {
            "JobName": "se2_job2", 
            "Arguments": {
                "--job-bookmark-option": "job-bookmark-enable"
            }
        }
    ], 
    "Description": ""
}

se2_trigger3を作成

$ aws glue create-trigger --cli-input-json file://testtrigger2.json
{
    "Name": "se2_trigger3"
}

トリガー更新
以下でcronの時間を0時から2時に変更してます。

$ cat testtrigger2_upd.json 
{
    "Name": "se2_trigger3", 
    "TriggerUpdate": {
        "Name": "se2_trigger3", 
        "Description": "", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "JobName": "se2_job2", 
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }
            }
        ] 
    }
}
$ aws glue update-trigger --cli-input-json file://testtrigger2_upd.json
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger3", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

その他

CloudFormationも対応しているのでそちらでもCLI操作や自動化が可能かと思います。

ジョブのデプロイ時、より慎重にやるなら、出力先を一時的な別のS3や一時的なDBに変更したジョブを作り、既存と並行稼働させるのもいいかと思います。

ジョブフローの形成は、どのようなデプロイツールを使っているか、どのようなジョブスケジューラーを使っているかにもよるものです。

To Be Continue

Triggerの設定時やStartJobRunコマンドの"--job-bookmark-option"でブックマークを有効無効にできます。
ブックマーク機能について今後書いていければと思います。

Glueのトリガーで設定できるスケジュールは見ての通り現在は簡易的なものです。
AWS Step Functionsで少し複雑なジョブフロー作成について今後書いていければと思います。

参考

AWS CLI
http://docs.aws.amazon.com/cli/latest/reference/glue/create-job.html
http://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

PythonのAPI
http://boto3.readthedocs.io/en/latest/reference/services/glue.html#Glue.Client.create_job

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f