1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【トリビアのDelta Lake】#6 AWS S3にあるZipファイルをPythonで解凍してDelta Lakeにブチ込む

Posted at

「シャ!S3に生データ入れたぜ!あとはspark.read.csvで楽勝やな!って、あれ??」

…CSVが存在するはずだったそのS3プレフィックスには、謎のZipファイルが置かれていました。

Zipファイルで固められている

おかしい。
慌てて、S3にあるZipファイルをダウンロードします。

それを解凍すると、確かにCSVファイルが存在します。これを読みたいだけなのに、自動取得の方法を取るとS3にはZipファイルとして置かれるみたい…

あんまりないシチュエーションだと思いますが、こういうときはどうすればいいのでしょうか。

ドキュメントを漁るも…

Pysparkに関連するドキュメントを漁りまくり、結論、「PysparkはZipに対応していない」。

参考記事:

.gzには対応しているが、.gzipや.zipには未対応とのこと。
はい、試合終了!

…とはなりません。なんとかしてPythonでZipを溶かす必要に迫られました。

困ったときのPandas展開

やはり、困ったときはPandas先輩の出番です。
Pandasのread_csvにはめちゃくちゃ多くの引数があり、そのなかの「compression='zip'」がどんぴしゃり。

さらには今回データソースがS3にあるので、それを見に行くのに必要なライブラリも数点追加して、サンプルコードとしてはこんな感じに。

melt_zip.py
import zipfile
import pandas as pd
import fsspec
import s3fs

pandas_df = pd.read_csv("s3://<bucket>/<prefix>/sample.zip",compression = 'zip')

ちゃんとファイル実態までS3パスを書くことがキモです。
もし、複数プレフィックスにまたがってデータを読み込みたいときは、/と*をうまく使ってパスを書けばイケると思います。

また、Pandasのread_jsonでも同じ引数があったので、JsonがZipで固められている場合でも泣かなくてすみますね…!

Into your Delta Lake

Pandasデータフレームとして定義できてしまえば、あとはこっちのもの。
spark.createDataFrame()でSparkDataFrameに変更し、あとはいつもの方法でInto your Delta Lakeです!

PandasとかSparkとかの変換については、以前まとめましたのでご覧くださいませ。

このシリーズの目的

クラウドストレージに蓄積された生データ(CSV,JSON等)を加工するのに必要なPysparkの知識を溜めていく、まさにPysparkに関するトリビアのDalta Lakeを目指しています。
Pysparkに関する記事はまだ海外のものが多く、このシリーズが私のような国内の駆け出しデータエンジニアの一助になることを願っています。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?