Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
3
Help us understand the problem. What is going on with this article?

More than 1 year has passed since last update.

@pioho07

Glueの使い方的な㉘(DataFrameでデータ読み込み)

GlueのDynamicFrameではなく、GlueでDataFrameを使ってデータ入力出力する

ジョブの内容

csvファイルをparquetに変換します。

※"Glueの使い方的な①(GUIでジョブ実行)"(以後①とだけ書きます)と同じ処理です。データ入力と出力部分をDynamicFrameからDataFrameに変更します。

ジョブ名

se2_job14

前準備

IAMロールなどは①をご確認ください。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

実行コード

適当なGlueジョブを作成し、できあがったジョブの中身のコードを以下のコードで上書きする
inferSchemaでデータ型をSpark側で自動で類推する。DataFrameにしているので型が必要だから。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load('s3://test-glue00/se2/in0/*.csv', header=True)

output='s3://test-glue00/se2/out14/'
codec='snappy'

df.write.mode("overwrite").parquet(output,compression=codec)
job.commit()

クローラー作成し実行、テーブル確認

出力先のS3パスへのクローラーを作成し、クローラー実行

出来上がったテーブルが以下のようになる

スクリーンショット 0030-12-31 19.01.24.png

Athena確認

スクリーンショット 0030-11-10 19.20.46.png

こちらも是非

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

3
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
3
Help us understand the problem. What is going on with this article?