5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Glueの使い方的な③(CLIでジョブ作成)

Last updated at Posted at 2018-01-05

CLIによる操作でGlueジョブを作る

"Glueの使い方①(GUIでジョブ実行)"(以後①と書きます)で書いたように、現在GlueではGUIからジョブのコピーができないので、テスト時やデプロイ時などにもCLIでのジョブ操作が便利な場面があります

今回は①で実行したジョブをCLIで作成します

IAM role

ジョブ作成のコマンド発行するノードに付与するIAM roleもこの時に使ったtest-glueを使います。今回手元ではMacのPCでしたが本来だとジョブの作成や変更操作を行うスケジューラーなどに付与するIAM roleになると思います。
付与されるポリシーはこの2つ
・AmazonS3FullAccess
・AWSGlueServiceRole

全体の流れ

  • 前準備
  • CLIでジョブ作成
  • トリガー作成

前準備

①で使ったジョブで実行されているPySparkスクリプトを持ってきます

Glueのジョブで実行されるスクリプトはここにあります

AWSマネージメントコンソールからGlueをクリック、左側メニューのETLの"Jobs"をクリック
対象ジョブにチェックを入れ、"Action"をクリックし"Edit job"をクリック

スクリーンショット 0030-01-01 18.39.07.png

ジョブの内容が表示されます

Script pathに入力されているS3のパスが、このジョブで実行されるPySparkスクリプトの保存先です。
デフォルトだと以下の場所にスクリプトは保存されます。今回はデフォルトのままです。

s3://aws-glue-scripts-[AWSアカウントID]-[リージョン名]/[ユーザー名]/[ジョブ名]

スクリーンショット 0030-01-02 15.54.51.png

ローカルにもってくる

このファイルをローカルにダウンロードておきます。
ダウンロードしたPySparkスクリプトは前回GUIのみで操作して作られたスクリプトです。
処理内容は、"指定したS3にあるcsvファイルを指定したS3にparquetとして出力する"というものです

se2_job0.txt
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

CLIでジョブ作成

今回はAWS Cliを使います(他の各言語のSDKでも同じ操が作可能です)

まずawscliが古いとglueの操作ができないのでupgradeしておきましょう

pip install awscli --upgrade

Cliによるジョブ作成は、先程ダウンロードしたPySparkスクリプトファイルをリネームします。se2_job0.txtをse2_job2.pyにします。それをS3の任意の場所(今回はs3://test-glue00/se2/script/)にアップロードし、JSON形式でそこを指定してジョブ作成します

今回のジョブ作成に使ったJSON

test.json
{
    "Name": "se2_job2", 
    "Description": "test", 
    "Role": "test-glue", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 1
    }, 
    "Command": {
        "Name": "glueetl", 
        "ScriptLocation": "s3://test-glue00/se2/script/se2_job2.py"
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 5
}
  • Name
    ジョブ名

  • AllocatedCapacity
    "The number of AWS Glue data processing units (DPUs) to allocate to this Job.From 2 to 100 DPUs can be allocated; the default is 10."
    とあるようにこのジョブに割り当てるDPUを指定します。2-100で指定。デフォは10
    https://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

  • ExecutionPropertyのMaxConcurrentRuns
    ジョブの最大同時実行数。デフォは1

  • ScriptLocation
    PySparkスクリプトファイルの保存場所

  • MaxRetries
    ジョブの最大リトライ数。デフォは0
    ※別途スケジューラーを使ってジョブを実行してるならリトライ制御はスケジューラー側に任せたほうがよいかも

  • CommandのName
    "glueetl"でなければなりません。固定のようです

スケルトン出力

他のCLIと同じく補助機能でJSONのスケルトンを作るコマンドもあります。

cli
$ aws glue create-job --generate-cli-skeleton 
{
    "Name": "", 
    "Description": "", 
    "LogUri": "", 
    "Role": "", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 0
    }, 
    "Command": {
        "Name": "", 
        "ScriptLocation": ""
    }, 
    "DefaultArguments": {
        "KeyName": ""
    }, 
    "Connections": {
        "Connections": [
            ""
        ]
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 0
}

実行結果

こんな感じで実行します
以下実行結果です

cli
$ aws glue create-job --cli-input-json file://test.json
{
    "Name": "se2_job2"
}

コマンドラインに他の引数があればJSONを上書きします。ファイルよりコマンドの引数の方が強いです。
ベースのJSONを作っておいて、DPUなどを状況に応じて大きくするとかいいと思います

引数では以下を指定できます

$ aws glue create-job help

CREATE-JOB()                                                      CREATE-JOB()



NAME
       create-job -

DESCRIPTION
       Creates a new job.

       See also: AWS API Documentation

SYNOPSIS
            create-job
          --name <value>
          [--description <value>]
          [--log-uri <value>]
          --role <value>
          [--execution-property <value>]
          --command <value>
          [--default-arguments <value>]
          [--connections <value>]
          [--max-retries <value>]
          [--allocated-capacity <value>]
          [--cli-input-json <value>]
          [--generate-cli-skeleton <value>]

GUIからもジョブが作成されていることがわかります。

スクリーンショット 0030-01-02 16.05.41.png

ジョブを実行

対象ジョブのse2_job2をチェックし、Actionの"Run job"をクリックします。
画面のように正常に完了しています。
スクリーンショット 0030-01-02 16.10.18.png

同じ結果がS3へ出力されています。
このようにコマンドであれば同じジョブを作りやすいので、DPUだけ変えるとか、テストのためにテストデータの入力や出力だけ変えて同じ処理を実行するなど、パラメータの一部だけ変更したジョブを作りやすいです。

ただ、画面からわかるように今回の出力のparquetファイル1つとその他メタデータが最新の日付で出力と更新がされていますが、コピー元ジョブで実行したparquetファイルが1つ残っています。

スクリーンショット 0030-01-02 16.12.04.png

コピー元ジョブの処理内容は

"S3にある1つのcsvファイルをparquetにしてS3に出力する"という処理でした。

Athenaでクエリ実行すると件数が倍の38件で、出力が重複していることがわかります。(元データは19件でした)

スクリーンショット 0030-01-02 16.17.43.png

ジョブフローを設計する時やスケジューラーの機能などで、処理したデータはムーブしたり消したりする場合もありますし、処理対象のディレクトリをタイムスタンプなどで判別して古いデータは処理対象から除外する場合もあると思います。どういったジョブフローにしているかに依存する部分かもですが、Glueにはこういった事象を防ぐブックマークという機能があります。ブックマーク機能を有効にすると既に処理したデータを処理の対象外とすることができます。
これはまたの機会で書けたらと思います。

トリガー作成

Glueには簡易的なスケジュール機能にTriggerがあります。
以下3つのスケジューリングができます。今回はCRON形式で作成してみます
選べるTrigger Typeは以下3つです
・CRON形式
・前のジョブが完了したら実行
・手動実行

スクリーンショット 0030-01-02 16.36.19.png

このトリガーの対象とするジョブを選びます。ジョブ名の横にある"Add"をクリックすることで選択できます。
"Next"をクリックし最後にサマリがでるので問題なければ"Finish"をクリックします。

スクリーンショット 0030-01-02 16.37.22.png

Triggerのコマンドライン操作もいくつか書いておきます。

先程作ったse2_trigger2トリガーの内容表示

get-trigger
$ aws glue get-trigger --name se2_trigger2
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger2", 
        "Schedule": "cron(0 0 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

トリガー作成用のjsonスケルトン表示

skelton
$ aws glue create-trigger --generate-cli-skeleton 
{
    "Name": "", 
    "Type": "SCHEDULED", 
    "Schedule": "", 
    "Predicate": {
        "Logical": "AND", 
        "Conditions": [
            {
                "LogicalOperator": "EQUALS", 
                "JobName": "", 
                "State": "STARTING"
            }
        ]
    }, 
    "Actions": [
        {
            "JobName": "", 
            "Arguments": {
                "KeyName": ""
            }
        }
    ], 
    "Description": ""
}

スケルトンを元に先程のトリガse2_trigger2と同じ内容で、名前だけse2_trigger3に変更したJSON作成

testtrigger2.json
$ cat testtrigger2.json 
{
    "Name": "se2_trigger3", 
    "Type": "SCHEDULED", 
    "Schedule": "cron(0 0 * * ? *)", 
    "Actions": [
        {
            "JobName": "se2_job2", 
            "Arguments": {
                "--job-bookmark-option": "job-bookmark-enable"
            }
        }
    ], 
    "Description": ""
}

se2_trigger3を作成

$ aws glue create-trigger --cli-input-json file://testtrigger2.json
{
    "Name": "se2_trigger3"
}

トリガー更新
以下でcronの時間を0時から2時に変更してます。

$ cat testtrigger2_upd.json 
{
    "Name": "se2_trigger3", 
    "TriggerUpdate": {
        "Name": "se2_trigger3", 
        "Description": "", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "JobName": "se2_job2", 
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }
            }
        ] 
    }
}
$ aws glue update-trigger --cli-input-json file://testtrigger2_upd.json
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger3", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

その他

CloudFormationも対応しているのでそちらでもCLI操作や自動化が可能かと思います。

ジョブのデプロイ時、より慎重にやるなら、出力先を一時的な別のS3や一時的なDBに変更したジョブを作り、既存と並行稼働させるのもいいかと思います。

ジョブフローの形成は、どのようなデプロイツールを使っているか、どのようなジョブスケジューラーを使っているかにもよるものです。

To Be Continue

Triggerの設定時やStartJobRunコマンドの"--job-bookmark-option"でブックマークを有効無効にできます。
ブックマーク機能について今後書いていければと思います。

Glueのトリガーで設定できるスケジュールは見ての通り現在は簡易的なものです。
AWS Step Functionsで少し複雑なジョブフロー作成について今後書いていければと思います。

参考

AWS CLI
http://docs.aws.amazon.com/cli/latest/reference/glue/create-job.html
http://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

PythonのAPI
http://boto3.readthedocs.io/en/latest/reference/services/glue.html#Glue.Client.create_job

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?