10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Glueの使い方的な⑪(DynamicFrameでpartitionByが使えるようになった)

Last updated at Posted at 2018-03-10

概要

従来DataFrameでしかできなかったWrite時のpartitionByに、DynamicFrameが対応しました。
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

今回のやってみた内容は、"Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)"と同じです。
csvデータをパーティション分割したparquetに変換します。

パーティション分割するジョブを作る

ジョブの内容

※"Glueの使い方的な①(GUIでジョブ実行)"(以後①とだけ書きます)と同様のcsvデータを使います

"csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する"

ジョブ名

se2_job7

クローラー名

se2_in0 (Glueの使い方的な②で作ったもの)
se2_out6

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは"in0"ディレクトリ配下、出力は"out6"ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job7ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は"S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する"です。

se2_job7
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out6"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out6"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

42行目の"datasink4"の行を以下のように修正

partitionby=['year','month','day','hour']
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out6", "partitionKeys": ['year','month','day','hour']}, format = "parquet", transformation_ctx = "datasink4")

TODO

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-03-10 20.27.07.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 cp s3://test-glue00/se2/out6/year=2017/month=11/day=14/hour=14/part-00000-c1945f99-ae0b-4d28-8642-49b87521b8da.snappy.parquet .
download: s3://test-glue00/se2/out6/year=2017/month=11/day=14/hour=14/part-00000-c1945f99-ae0b-4d28-8642-49b87521b8da.snappy.parquet to ./part-00000-c1945f99-ae0b-4d28-8642-49b87521b8da.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00000-c1945f99-ae0b-4d28-8642-49b87521b8da.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out6でクローラー作成

GlueのCrawlersをクリックし、"Add crawler"をクリック

スクリーンショット 0030-03-10 20.38.50.png

S3の出力パスを入力

スクリーンショット 0030-03-10 20.39.29.png

そのまま"Next"をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue"を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま"Next"をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

クローラー実行後

スクリーンショット 0030-03-10 20.46.07.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-03-10 20.46.30.png

Athenaから確認

左メニューからse2_out6のスキーマ情報確認、クエリ実行

スクリーンショット 0030-03-10 20.48.35.png

件数も19件で合っている

スクリーンショット 0030-03-10 20.48.50.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

10
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?