始めに
私の所属する内製チームではユニケージからの移行を進めており、テキストファイルの大規模トランザクションデータをユニケージコマンド以外の方法でどう扱うかが課題になっております。
以前pandasでユニケージファイルの取り扱いを試してみたところ、100万行レベルになるとユニケージコマンドよりも圧倒的に遅いという結果になってしまいました。その後parquetに変換してはどうかという話があり、早速試してみることにしました。
環境
- MacOSX 10.15.4
- Python3.8.2
pysparkのインストール
pipenv install pyspark
まずpysparkでそのまま読み込み
実際に使われている89万行のデータを用意して先頭5行を表示します
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName('SparkSample').getOrCreate()
# ユニケージ形式なのでスペース区切りとして読み込む
df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
df.show(5)
これだけで30秒以上かかってます。思ったより遅いです。読み込み方を工夫すればもっと高速になるのかもしれません。
ユニケージ形式ファイルをparquet形式に変換
$ pyspark
(中略)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Python version 3.8.2 (default, Aug 25 2020 09:23:57)
SparkSession available as 'spark'.
>>> df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
>>> df.write.save('./SAMPLE_DATA.parquet')
pyspark
でpython環境が立ち上がるので再度読み込んでから、今度はparquet形式で保存します。
$ ls SAMPLE_DATA.parquet/
_SUCCESS
part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00001-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00002-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00003-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
フォルダが作成され、フォルダ配下にparquetファイルができています。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName('SparkSample').getOrCreate()
df = spark.read.parquet('./SAMPLE_DATA.parquet')
df.show(5)
parquet形式で読み込みましたがさっきと同じように30秒以上かかりました。ファイル行数を10行にしても同等の時間がかかっており、pysparkの起動にかなり時間がかかるようです。
$ python sample.py
20/12/24 00:50:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
スクリプトの起動時のメッセージを見るとネイティブライブラリを使えていない…それで遅いのかもしれません。
調べたのですが簡単に解決しそうにないので、残念ですが飛ばします。
参照したり更新してみる
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.appName('SparkSample').getOrCreate()
df = spark.read.parquet('./SAMPLE_DATA.parquet')
# SQLで絞り込み
df.createOrReplaceTempView('sample_data')
df = spark.sql("select SCAN_CODE,BUNRUI2_CODE from sample_data where BUNRUI2_CODE = '29' limit 5").show()
SQLで参照できました。なおparquetの作成時に特にスキーマを設定していないので全てSTRING型になっています。数字データを扱うときは注意が必要です。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
spark = SparkSession.builder.appName('SparkSample').getOrCreate()
df = spark.read.parquet('./SAMPLE_DATA.parquet',)
# 条件分岐させて値を更新
new_df = df.withColumn('BUNRUI2_CODE', F.when(F.col('BUNRUI2_CODE') == '29','9999').otherwise(F.col('BUNRUI2_CODE')))
SQLで更新しようとしましたが、SparkSQLはUpdate文がない模様。新しい値の新しい列を作ることで更新できました。
new_df.write.save('./SAMPLE_DATA.new.parquet')
更新して保存。
parquet-tool のインストール
parquetファイルを閲覧するためにparquet-toolsをインストール。
$ brew install parquet-tools
$ parquet-tools cat --json SAMPLE_DATA.parquet/part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
csv形式で確認できないのが辛いですね…。
終わりに
今回は初歩的な内容でした。今後はAWS Glueなどの組み合わせも視野に進めていきたいと思います。参照用途であれば、RDSやDynamoDBに格納するよりもparquetにしてS3に置けばコスト的にも好ましいです。
発表されたばかりの AWS Glue Elastic Viewsもユニケージ移行において有力なサービスとして社内で期待が集まっており、こちらも検証を進めていきたいと考えています。