LoginSignup
14

More than 5 years have passed since last update.

Glueの使い方的な㉜(Python Shellを使う)

Last updated at Posted at 2019-01-19

Glue ジョブ の Python shell

Glueのジョブタイプは今まではSpark(PySpark,Scala)だけでしたが、新しくPython Shellというジョブタイプができました。GlueのジョブとしてPythonを実行できます。もちろん並列分散処理するわけではないので以下のようにライトなタスクでの用途を想定しています。そのため料金も秒課金になっています。ETLの前や後や途中のライトな処理に活用できるかと

ユースケース アイデア

Serverless Edge Node for triggering, light transformations, uncompress, tar extract, Parquet conversion

思いつくのはAWS SDKの操作、入力データのメタデータを使った設定処理、転送後のデータ確認とかかな

Python shellを使ったジョブを作る

内容

  • 1つ目のジョブ(pyspark):csvデータをparquetフォーマットに変換し、パーティションを切り、出力ファイルを1つにする。
  • 2つ目のジョブ(python shell):1つ目のジョブの出力ファイル part-xxxxx.snappy.parquetのファイル名をリネームする

流れ

  • 1つ目のPySparkのジョブを実行 => 2つ目のPythonShellのジョブを実行

まず1つ目のジョブ

内容

1つ目のPySparkジョブは以下のリンクの後半にあるse2_job3とほぼ同じ内容です
https://qiita.com/pioho07/items/523aec26ca5dc5bc9697

上記読んでいただければ1つ目のジョブの内容はわかります。簡単に概要だけ説明します。
以下のcsvデータをparquetフォーマットに変換し、'country', 'year','month','day','hour'でパーティションを切ります。一箇所追加したのはrepartition(1)で出力ファイルを1つにしています。

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

ジョブ名

se2_job15(se2_job3とほぼ同じ内容)

クローラー名

se2_in0

S3

in0 (入力)
out15 (出力)

ジョブの実行と確認

以下のジョブを実行する

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out15/'
codec='snappy'

df.repartition(1).write.partitionBy(partitionby).mode("overwrite").parquet(output,compression=codec)
#df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").format('com.databricks.spark.avro').save(output,compression=codec)
###add


## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out15"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

S3上の出力ファイルを確認

"part-00000-a0be54dc-83d1-4aeb-a167-db87d24457af.c000.snappy.parquet"という名前のファイルが出力されている

スクリーンショット 0031-01-19 11.52.50.png

次に2つ目のジョブ

内容

part-xxxというオブジェクト名を、パーティションのディレクトリの一部の"country=xxx"という名前にリネームする

ジョブ名

se2_job16

S3

out15 (入力/出力)

python shellのジョブの作成と実行

Glueの画面から"ジョブ"->[ジョブの追加]をクリックする

スクリーンショット 0031-01-19 11.57.24.png

ジョブ名"se2_job16"、Typeで"Python shell"を選択し、"ユーザーが作成する新しいスクリプト"にチェックを入れる。

スクリーンショット 0031-01-19 12.00.44.png

"セキュリティの設定"箇所をクリックしオプションを確認してみる。Maximum capacityはデフォルトが0.0625となっている。[次へ]をクリックする
次の画面で"接続"は必要ないので、そのまま[ジョブを保存してスクリプトを編集する]をクリックする

※Capacity:このジョブの実行時に割り当てることができるAWS Glueデータ処理ユニット(DPU)の最大数。 DPUは4vCPU,16GBメモリ。値は0.0625または1に設定できます。デフォルトは0.0625です

スクリーンショット 0031-01-19 12.01.53.png

非常にシンプルな画面が出る

スクリーンショット 0031-01-19 12.06.49.png

コードを書き[保存]をクリックする
今回はpart-xxxxxの長い名前を、パーティションで切っている国名にファイル名をリネームする(オブジェクトコピーして元オブジェクトをデリートだが).

# -*- coding: utf-8 -*-
import boto3
import re

s3 = boto3.resource('s3')
bucket = s3.Bucket('test-glue00')
bucket_name='test-glue00'
for object in bucket.objects.filter(Prefix='se2/tmp2/country='):
    #print(object.key)
    old_file = object.key

    pattern1 = r'.*part.*'
    result1 = re.match(pattern1, old_file)
    if result1:
        Copy_from = result1.group()
        Copy_to = result1.group().rsplit('/', 1)[0] + '/' + result1.group().split("/")[2]
        s3.Object(bucket_name,Copy_to).copy_from(CopySource=bucket_name + '/' + Copy_from )
        s3.Object(bucket_name,Copy_from).delete()

作成されたジョブにチェックを入れ、アクションからジョブの実行を行う

スクリーンショット 0031-01-19 12.10.42.png

ジョブが成功し"country=AUS"の名前でリネームされた。

スクリーンショット 0031-01-19 12.21.59.png

S3 selectで確認

"アクション"から"S3 Select"をクリックし

スクリーンショット 0031-01-19 12.26.26.png

"Parquet"にチェックを入れ、[ファイルプレビューの表示]をクリックし、データが表示されることを確認できる

スクリーンショット 0031-01-19 12.26.39.png

これジョブなので

Glue のトリガとしても設定できます

スクリーンショット 0031-01-19 12.25.28.png

StepFunctionで直接よびだせます

ジョブ1->ジョブ2の単純なフローを作る

{
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
  "StartAt": "Glue PySpark Job",
  "States": {
      "Glue PySpark Job": {
      "Type": "Task",
      "Resource": "arn:aws:states:::glue:startJobRun.sync",
      "Parameters": {
        "JobName": "se2_job15"
      },
      "Next": "Glue PythonShell Job",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "Glue PythonShell Job": {
    "Type": "Task",
    "Resource": "arn:aws:states:::glue:startJobRun.sync",
    "Parameters": {
      "JobName": "se2_job16"
    },"End": true,
    "Retry": [
      {
        "ErrorEquals": ["States.ALL"],
        "IntervalSeconds": 1,
        "MaxAttempts": 3,
        "BackoffRate": 2.0
      }
    ]
  }
  }
}

スクリーンショット 0031-01-19 12.50.21.png

その他

料金

1秒あたりに課金され、PythonシェルタイプのETLジョブごとに最小1分で、1 DPU時間あたり0.44ドル。最小1分なので0.0625DPUだと1実行が最小で0.05円くらい(多分)。
正確にはWebをご確認ください
https://aws.amazon.com/glue/pricing/

ライブラリとか

Python 2.7互換スクリプト

サポートライブラリ:
Boto3
collections
CSV
gzip
multiprocessing
NumPy
pandas
pickle
re
SciPy
sklearn
sklearn.feature_extraction
sklearn.preprocessing
xml.etree.ElementTree
zipfile

こちらも是非

Python Shell Job in Glue
https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

re:Invent 2018での説明
https://www.slideshare.net/AmazonWebServices/building-serverless-analytics-pipelines-with-aws-glue-ant308-aws-reinvent-2018

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14