「シャ!S3に生データ入れたぜ!あとはspark.read.csvで楽勝やな!って、あれ??」
…CSVが存在するはずだったそのS3プレフィックスには、謎のZipファイルが置かれていました。
Zipファイルで固められている
おかしい。
慌てて、S3にあるZipファイルをダウンロードします。
それを解凍すると、確かにCSVファイルが存在します。これを読みたいだけなのに、自動取得の方法を取るとS3にはZipファイルとして置かれるみたい…
あんまりないシチュエーションだと思いますが、こういうときはどうすればいいのでしょうか。
ドキュメントを漁るも…
Pysparkに関連するドキュメントを漁りまくり、結論、「PysparkはZipに対応していない」。
参考記事:
.gzには対応しているが、.gzipや.zipには未対応とのこと。
はい、試合終了!
…とはなりません。なんとかしてPythonでZipを溶かす必要に迫られました。
困ったときのPandas展開
やはり、困ったときはPandas先輩の出番です。
Pandasのread_csvにはめちゃくちゃ多くの引数があり、そのなかの「compression='zip'」がどんぴしゃり。
さらには今回データソースがS3にあるので、それを見に行くのに必要なライブラリも数点追加して、サンプルコードとしてはこんな感じに。
import zipfile
import pandas as pd
import fsspec
import s3fs
pandas_df = pd.read_csv("s3://<bucket>/<prefix>/sample.zip",compression = 'zip')
ちゃんとファイル実態までS3パスを書くことがキモです。
もし、複数プレフィックスにまたがってデータを読み込みたいときは、/と*をうまく使ってパスを書けばイケると思います。
また、Pandasのread_jsonでも同じ引数があったので、JsonがZipで固められている場合でも泣かなくてすみますね…!
Into your Delta Lake
Pandasデータフレームとして定義できてしまえば、あとはこっちのもの。
spark.createDataFrame()でSparkDataFrameに変更し、あとはいつもの方法でInto your Delta Lakeです!
PandasとかSparkとかの変換については、以前まとめましたのでご覧くださいませ。
このシリーズの目的
クラウドストレージに蓄積された生データ(CSV,JSON等)を加工するのに必要なPysparkの知識を溜めていく、まさにPysparkに関するトリビアのDalta Lakeを目指しています。
Pysparkに関する記事はまだ海外のものが多く、このシリーズが私のような国内の駆け出しデータエンジニアの一助になることを願っています。