背景
S3に格納されたzipファイル(中身はcsv)をETLしたいが、どうやらsparkではzipファイルを取得することができなさそうなので、
いろいろ模索した結果、pandasを用いることとした。
まあ、zipを解凍さえすればその限りではないはずやが。。。
コード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pandas as pd #1)追加モジュール
# ジョブ初期化
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
#2) S3からzip取得
df = pd.read_csv('s3://バケット名/IF/testReadZip.zip',compression='zip')
read_data = pd.DataFrame(df)
#3) 取得したデータを加工
write_data = read_data.query('商品名 in ["エビフライ","讃岐うどん","JapaneseSushi"]')
#4) S3へzip書き込み
compression_opts = dict(method='zip',archive_name='testWriteZip1.csv')
write_data.to_csv('s3://バケット名/outIF/testReadZip1.zip', index=False, compression=compression_opts)
# ジョブコミット
job.commit()
解説
import pandas as pd #1)追加モジュール
1)追加モジュール
今回採用したpandasをimportする。
#2) S3からzip取得
df = pd.read_csv('s3://バケット名/IF/testReadZip.zip',compression='zip')
read_data = pd.DataFrame(df)
2) S3からzip取得
参考記事にもあるとおり、圧縮形式を明示的に指定するcompressionオプションがあり、zipもサポートされているため当オプションを指定した。
#3) 取得したデータを加工
write_data = read_data.query('商品名 in ["エビフライ","讃岐うどん","JapaneseSushi"]')
3) 取得したデータを加工
カラム名,レコードともにシングル/マルチバイト文字の両方が用いられることから、その挙動を試すための処理。
10個の商品名で等分した100万行のcsvに対して、指定の条件で抽出した。
結果、30万行のcsvが出力されたので、大満足!
取引日 | 商品分類 | JANコード | 商品名 |
---|---|---|---|
2022/12/7 | 1 | 491230000001 | エビフライ |
2022/12/7 | 6 | 491230000001 | 讃岐うどん |
2022/12/7 | 8 | 491230000001 | JapaneseSushi |
#4) S3へzip書き込み
compression_opts = dict(method='zip',archive_name='testWriteZip1.csv')
write_data.to_csv('s3://バケット名/outIF/testReadZip1.zip', index=False, compression=compression_opts)
4) S3へzip書き込み
参考記事に加え、公式レファレンスのpandas.to_csvを読みながら書いた。
変数compression_optsにcompressionのオプションを辞書型で格納しているが、これはレファレンスにあったものほぼほぼそのまま使って実現できた!
まとめ
実際にやりたいことはもっと複雑だが、そもそもの基礎部分を整えなお困りちゃんだったのでその部分だけなんとかやってみた。
正直sparkとpandasがどう違うか、dynamicframeとdataframeがどう違うかとかまだまだ分かってないけれど、一年前はこんなことできなかったので普通におもろかった!
というか、いろいろ調べてる中で一週間前の2022/11/29にGlueバージョン4.0リリースされていてたまげた。。。時の流れ早すぎませんか?
参考文献
- pandasでzip入出力を頑張ってる方々
- pandas公式レファレンスたち
- pandas初心者すぎるので使い方をざっくり把握しにかかれた記事たち
※pysparkやpandasが結局わからんってなってるけど参考になるはずなので記載
AWS Black Belt Online Seminar AWS Glue ETL パフォーマンス・チューニング① 基礎知識編
AWS GlueでSparkのDataframeを使う