LoginSignup
11
6

More than 3 years have passed since last update.

PythonでParquetを扱う

Posted at

始めに

私の所属する内製チームではユニケージからの移行を進めており、テキストファイルの大規模トランザクションデータをユニケージコマンド以外の方法でどう扱うかが課題になっております。

以前pandasでユニケージファイルの取り扱いを試してみたところ、100万行レベルになるとユニケージコマンドよりも圧倒的に遅いという結果になってしまいました。その後parquetに変換してはどうかという話があり、早速試してみることにしました。

環境

  • MacOSX 10.15.4
  • Python3.8.2

pysparkのインストール

pipenv install pyspark

まずpysparkでそのまま読み込み

実際に使われている89万行のデータを用意して先頭5行を表示します

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

# ユニケージ形式なのでスペース区切りとして読み込む
df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
df.show(5)

これだけで30秒以上かかってます。思ったより遅いです。読み込み方を工夫すればもっと高速になるのかもしれません。

ユニケージ形式ファイルをparquet形式に変換

$ pyspark

(中略)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Python version 3.8.2 (default, Aug 25 2020 09:23:57)
SparkSession available as 'spark'.
>>> df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
>>> df.write.save('./SAMPLE_DATA.parquet')

pyspark でpython環境が立ち上がるので再度読み込んでから、今度はparquet形式で保存します。

 $ ls SAMPLE_DATA.parquet/
_SUCCESS
part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00001-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00002-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00003-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet

フォルダが作成され、フォルダ配下にparquetファイルができています。

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()
df = spark.read.parquet('./SAMPLE_DATA.parquet')
df.show(5)

parquet形式で読み込みましたがさっきと同じように30秒以上かかりました。ファイル行数を10行にしても同等の時間がかかっており、pysparkの起動にかなり時間がかかるようです。

$ python sample.py 
20/12/24 00:50:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

スクリプトの起動時のメッセージを見るとネイティブライブラリを使えていない…それで遅いのかもしれません。

調べたのですが簡単に解決しそうにないので、残念ですが飛ばします。

参照したり更新してみる

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

df = spark.read.parquet('./SAMPLE_DATA.parquet')

# SQLで絞り込み
df.createOrReplaceTempView('sample_data')
df = spark.sql("select SCAN_CODE,BUNRUI2_CODE from sample_data where BUNRUI2_CODE = '29' limit 5").show()

SQLで参照できました。なおparquetの作成時に特にスキーマを設定していないので全てSTRING型になっています。数字データを扱うときは注意が必要です。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

df = spark.read.parquet('./SAMPLE_DATA.parquet',)

# 条件分岐させて値を更新
new_df = df.withColumn('BUNRUI2_CODE', F.when(F.col('BUNRUI2_CODE') == '29','9999').otherwise(F.col('BUNRUI2_CODE')))

SQLで更新しようとしましたが、SparkSQLはUpdate文がない模様。新しい値の新しい列を作ることで更新できました。

new_df.write.save('./SAMPLE_DATA.new.parquet')

更新して保存。

parquet-tool のインストール

parquetファイルを閲覧するためにparquet-toolsをインストール。

$ brew install parquet-tools

$ parquet-tools cat --json SAMPLE_DATA.parquet/part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet

csv形式で確認できないのが辛いですね…。

終わりに

今回は初歩的な内容でした。今後はAWS Glueなどの組み合わせも視野に進めていきたいと思います。参照用途であれば、RDSやDynamoDBに格納するよりもparquetにしてS3に置けばコスト的にも好ましいです。

発表されたばかりの AWS Glue Elastic Viewsもユニケージ移行において有力なサービスとして社内で期待が集まっており、こちらも検証を進めていきたいと考えています。

参考

11
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
6