Redshift SpectrumやAthenaを使っていたり、使おうとするとS3に貯めている既存ファイルをParquetやAvroに変換したいということがあります。
AWS Glueを利用してJSONLからParquetに変換した際の手順などを記述しています。
S3上のファイルを変換するだけならばData catalog/Crawl機能は利用せずに、ETLのJobを作成するだけで利用できます。
Data catalogの作成からのGlue一連の流れを確認したい場合はクラスメソッドさんの『AWS Glue 実践入門:サービスメニュー内で展開されている「ガイド付きチュートリアル」を試してみた』が参考になるかと思います。
ジョブの作成
メニューの ETL > Jobs からAdd Jobを選択すると以下のような画面が出てきます。
以下の3項目以外は後から変更可能です。
- Name
- This job runs
- Script file name
「This job runs」で「A proposed script generated by AWS Glue」を選択するとGlueで事前にDataSourceを作成しておく必要がありますが、今回のようにDataSourceを使う必要がない場合は、それ以外を選択します。
Connectionsは設定ができないのでそのままNextを選択すると、Reviewとして今まで設定した内容の確認がでてくるのでFinishを選択すると、スクリプトの修正画面へ遷移します。
スクリプトは以下のような内容になります。
今回はほとんどGlueの機能(GlueContext)は使わずに、SparkContext/SQLContextのみを利用して処理を行っています。
(printデバッグを行う際の注意点としては、スクリプト内でprint文を記述した場合はError Logsに出力されます。)
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
sc = SparkContext()
glueContext = GlueContext(sc)
sqlContext = SQLContext(sc)
## @params: [IN_PATH, IN_PATH]
args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH'])
# 引数で入力出力パスを指定
print(args['IN_PATH'])
print(args['OUT_PATH'])
in_path = args['IN_PATH']
out_path = args['OUT_PATH']
# http://qiita.com/ajis_ka/items/e2e5b759e77933b08687
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
# http://tech-blog.tsukaby.com/archives/1162
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")
sqlContext = SQLContext(sc)
jsonDataFrame = sqlContext.read.json(in_path)
jsonDataFrame.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save(out_path)
ジョブの定義として以下のように設定しておくことでスクリプトに引数を渡すことが出来ます。
このままSpark-submitに渡されるので --引数名 というようにKeyに設定必要があります。
ジョブの実行
作成したジョブを即時実行する場合は Jobs画面で ジョブを選択して Action > Run Job と選択するだけです。
定期実行などを行いたい場合は別途 Trigger を作成する必要があります。
実行結果の確認
コードが実行されるとS3上は以下のようになります。
% aws s3 ls s3://出力先/data/
PRE parquet/
2017-08-15 19:41:42 0
2017-08-29 16:32:13 0 parquet_$folder$
% aws s3 ls s3://入力元/data/jsonl/
2017-08-28 16:16:35 0
2017-08-28 16:16:48 133687 part-00000-29825316-e4f5-49fd-a835-3f0615ea4292.json
2017-08-29 15:37:21 138225 part-00001-29825316-e4f5-49fd-a835-3f0615ea4292.json
2017-08-29 15:37:20 135261 part-00002-29825316-e4f5-49fd-a835-3f0615ea4292.json
2017-08-29 15:37:19 136963 part-00003-29825316-e4f5-49fd-a835-3f0615ea4292.json
% aws s3 ls s3://出力先/data/parquet/
2017-08-29 16:32:13 356 _common_metadata
2017-08-29 16:32:13 1606 _metadata
2017-08-29 16:32:13 44313 part-00000-b782317e-827d-4417-ab0c-acf9e32f10cf.snappy.parquet
2017-08-29 16:32:12 43462 part-00001-b782317e-827d-4417-ab0c-acf9e32f10cf.snappy.parquet
2017-08-29 16:32:12 43048 part-00002-b782317e-827d-4417-ab0c-acf9e32f10cf.snappy.parquet
2017-08-29 16:32:12 42360 part-00003-b782317e-827d-4417-ab0c-acf9e32f10cf.snappy.parquet