3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pysparkにおけるキャッシュの利用

Last updated at Posted at 2022-06-01

はじめに

こんにちは。株式会社ジール所属の@oreo_tです。

最近、PySparkでキャッシュを利用したパフォーマンス改善をしたのでTipsとしてまとめます。
キャッシュ利用の背景としては、顧客情報に対して複数の加工をするときに必要になりました。
例えば、契約者には契約情報、負傷者には治療情報など、顧客テーブルのデータに対して役割ごとに違うテーブルの情報を結合しています。
そこで、結合のたびに都度顧客テーブルを呼び出すと時間がかかるという状況を改善するために、キャッシュを使用しました。

Pysparkのキャッシュとは?

PysparkのDataframeをメモリ上(またはディスク上)で保持することにより、一つの処理の中で複数回Dataframeを呼び出すときにパフォーマンスを向上することができる機能です。
Pysparkにおけるキャッシュは関数が用意されていて、「persist」と「cache」の2種類があります。

Pysparkのキャッシュの種類

persist

引数にStorageLevelを入れるとどこにキャッシュするか指定できます。
以下、StorageLevelのオプション例です。

引数の値 仕様
DISK_ONLY ディスクのみに格納
MEMORY_ONLY メモリのみに格納
MEMORY_AND_DISK メモリからあふれた分はディスクに格納
MEMORY_AND_DISK_DESER メモリからあふれた分はディスクに格納、さらにデータをシリアライズする

persistを引数無しで呼び出すと、デフォルトのMEMORY_AND_DISK_DESERが選択されます。

persistの使用例①

ここでの使用例としては、「ドラッグストアの全店舗の売上データが1テーブルにまとまっておりデータ量が膨大になっているため、店舗ごとに分けたい」という状況でキャッシュを使用しようとしています。
以下すべての使用例で同じ目的のためにキャッシュを使用しています。
売上テーブルから上野店のデータと渋谷店のデータを分けて取得したい場合、
以下のようにキャッシュしてから上野店のdataframeと渋谷店のdataframeを作成することで、
パフォーマンスが向上します。

persist_1.py
dataframe = spark.("SELECT * FROM sales_table")
dataframe.persist()
dataframe_ueno = dataframe.filter(col("shop_name")=='上野')
dataframe_shibuya = dataframe.filter(col("shop_name")=='渋谷')

persistの使用例②

persistの中に引数を入れることで、キャッシュの仕様を変更できます。
以下、メモリ上のみにキャッシュするように設定した場合です。

persist_2.py
dataframe = spark.("SELECT * FROM sales_table")
dataframe.persist(StorageLevel="MEMORY_ONLY")
dataframe_ueno = dataframe.filter(col("shop_name")=='上野')
dataframe_shibuya = dataframe.filter(col("shop_name")=='渋谷')

cache

引数はなく、前述した「persistの引数MEMORY_AND_DISKの場合」と同じ仕様(メモリからあふれた分はディスクに格納)でキャッシュをします。
cacheとpersistのどちらを使えばよいか迷った場合、こちらを使えばよいかと思います。

cacheの使用例

売上テーブルから上野店のデータと渋谷店のデータを分けて取得したい場合、
以下のようにキャッシュしてから上野店のdataframeと渋谷店のdataframeを作成することで、
パフォーマンスが向上します。

cache.py
dataframe = spark.("SELECT * FROM sales_table")
dataframe.cache()
dataframe_ueno = dataframe.filter(col("shop_name")=='上野')
dataframe_shibuya = dataframe.filter(col("shop_name")=='渋谷')

キャッシュの利用状況の監視について

キャッシュの容量の設定や、実際に使われているメモリやディスクの使用量の監視方法については、説明範囲が広いため今回は割愛させていただき、また別の記事で説明したいと思います。

終わりに

今回紹介したキャッシュはパフォーマンス改善に役立つ便利な手法ですが、扱うデータ量やマシンスペックによっては必ずしも効果を発揮するわけではない、という点には注意が必要です。
例えば取得したデータが大きすぎる場合、メモリ不足でキャッシュできない、ということもあります。
また、キャッシュできたとしても、キャッシュしたデータを使いまわさないような処理構造であれば意味がなくなってしまいます。
ですが、パフォーマンスチューニングの手法の中では比較的簡単で効果が出やすいと思うので、
利用できそうな場面ではぜひご活用ください。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?