5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Sparkの永続化処理に参った話

Last updated at Posted at 2019-11-01

最近故あってApache Sparkを触り始めました。
そこで得た知見、苦労話を共有します。
なお執筆時点のSparkのバージョンは2.4.4です。
Sparkをググって出てくる情報は1.x台の情報も結構多いんですが、バージョンによって結構いろいろ違うみたいなんで気を付けてください。

前書き

やりたいこと

大量(もしかしたらPC一台に収まらないくらい)の構造化されたデータに対して、繰り返し処理をするということをやりたくて、Sparkいいんじゃない?くらいの感じで触り始めました。

だめだったこと

GraphX

大量のデータというのが、関連を持ったデータの集合で、一般的には頂点と辺からなるグラフといわれる構造になっている。
で、SparkにはGraphXというグラフ処理のフレームワークがあって、これが用途に合ってるんじゃないかと触り始めたが、性能出ず。使い方にもよるのかもしれないけど、大量のデータの内、繰り返しの1回で触る必要のある頂点の数はごくごく限られているというのに、仕組み的に常に全データを処理する仕組みもあって、とにかく結果が返ってこない。
もちろんスペック/データ量依存だが、2ループするのに10分とかかかってもう待ちきれないのであきらめた。

GraphXの中にPregelという関数があって、繰り返し処理のフレームワーク的なもので使うのはお手軽だが、こちらはGraphX生叩きよりさらに輪をかけて遅い。残念。

Datasetに出会うが・・・

細かいことは良くわかってないが、Spark旧来のRDDというデータ構造からいろいろ改善されたのがDataset。性能的に数倍~数十倍という記事も見かけたので触ってみる。
GraphXはRDDベースであり、Datasetには相当するものがないのでその辺は自前で頑張る。

結果、確かに速い!
速い、が、GraphXであきらめた2ループ目以降もどんどん回していったら、指数関数的に遅くなっていくではないか!

本題

Sparkの仕組みというのは、RDD、Datasetどちらを使っても同じなんですが、map()とか実行してもすぐには実行されません。
その代わり、元データ.map().filter().reduce().map()みたいな処理のパスだけが一旦記録されて、結果が欲しいとき(Actionと言われる関数たちを呼んだとき)に計算してくれます。例えばデータ件数を得るcount()とかですね。これが、遅延実行というやつですね!

さて、遅延実行にはメモリ消費とか無駄な計算しないとか様々なメリットがあるわけですが、途中の結果が欲しいときもあるわけです。典型的なのが、途中の結果を別々の計算に回したいときです。

val A = original.map()
val B = A.filter().map().reduce()
val C = A.groupBy().count()

みたいなことで、Aの結果をBの式でもCの式でも使いたいのです。
この時、放っておくと、original.map()がBのとき、Cのときの2回計算してくれることになります。

無駄ですよね?
この無駄が、先ほどの「2ループ目以降もどんどん回していったら、指数関数的に遅く」なった原因です。

そんなわけでSparkには永続化の仕組みがあります。
Sparkで永続化と呼ばれているものは、DBなりファイルなりに保存することを(必ずしも)指していません。その時点の結果を使いまわせるよう、計算を一旦実行して保持する、というのが目的です。

永続化のいろいろ

persist()あるいはcache()

どちらもほぼ同じものです。
永続化=persist、覚えやすいですね。
呼び方もhoge.persist()だけです。お手軽ですね(しかしこれが落とし穴・・・)。

checkpoint()

DBのチェックポイントと同じと捉えてよいでしょう。RPGのセーブポイントでもいいです。
「そこまで戻ればなんとかなる」場所を作るものです。
ファイルに保存されますが、後続のwrite()と違って、hoge.checkpoint()って呼ぶだけなのでお手軽です(しかしこれが落とし穴・・・)。

write()

これは文字通り結果をSparkの外側の媒体に書き込むことです。基本的には計算結果を書き込むためのものなので、途中結果を書き込む用途としてはあんまり使わないでしょう。よって本稿では割愛します。
でも、persist()もcheckpoint()も「結果の一時保存」の意味合いが強いので、普通「永続化」って言ったらこっちだよなあ、と思わざるを得ない。

はまりポイント

persist()永続化されてなくね?

persist()で注意しないといけないのは、これを呼んだ時点では「何も起こらない」ことです。フラグが立つだけです。実際に計算が実行されて結果が保管されるのはActionが呼ばれたときです。
最初これにはまりました。
繰り返し処理なので、計算結果を入れる変数はループごとに上書きしてました。ループごとにpersist()呼んでると、もういらなくなった結果まで保管されてしまって無駄なのでunpersist()を叩いていたんですが、Actionが入る前にunpersist()してしまっていたので、結果が計算されないまま、フラグを下ろしてしまっていたのですよ。

var result = someDS.persist()
while(終了条件) {
  /*
   * resultに対するなにがしかの計算(Actionを含まないとする)
   */
  val newResult = hogehoge.persist() // 新しい計算結果をpersist
  result.unpersist() // 前の結果はpersist解除
  result = newResult // 結果を上書き
  println(result.count()) // 結果の件数を表示(count()はAction)
}

何がおかしいかわかりますか?
persist()はフラグを立てるだけだと書きましたが、ループの最後でcount()呼んでるから結果は計算されて永続化されているように一見見えます。
ところが、これを実際に動かすと、count()のところでループ一周目のからの計算が走りなおします。なぜでしょう。
実はunpersist()が早すぎなのが原因です。
newResultはresultから計算されます。result.count()を呼んだとき、ひとつ前の結果はunpersist()された状態になっているので、ひとつ前の結果からnewResultは計算できないと判断されてしまうのです。もっと言えば、その前の結果もその前の結果も使われなくて、とにかく最初のデータから計算しなおされることになります。そりゃループを繰り返すたびに遅くなりますわな。
一個前の結果のunpersist()よりcount()を前にすれば解決です。

checkpoint()のファイルが~

というのは後から分かった話で、どうにも原因が分からなかったときにはcheckpoint()にすがりました。checkpoint()は、呼んだその瞬間に結果を計算してファイルに書き出すので、persist()のようなタイミング的な気遣いは不要です。

なのですが、checkpoint()の難点は「ファイルに書きっぱなし」ということです。

persist()の場合は保存先をメモリかファイルか両方かなど選べますが、ファイルに書き込んだ場合でもセッションを終了するときに消してくれます。ゴミが残りません。

一方、checkpoint()は書きっぱなしです。2GBのデータを100回のループで毎回checkpoint()してたらそれだけで200GBです。いくらSSDが安くなったからってバカにできないサイズです。しかも手動で消すか、消す仕組みを作っておかないと勝手に消えてもくれません。uncheckpoint()はありません。

セッション終わっても残ってるってことは、別のセッションで再利用できたりするんでしょうか?そのへんはまだわかってません。

2020/5/1追記
書きっぱなしよりはマシにする方法を「Sparkの永続化処理(後日談)」に書いてますので気が向いたらそちらもお読みください。

persist()メモリ食いすぎじゃね?

というわけでcheckpoint()もいまいちだなということでpersist()を調べなおして、永続化されない原因に至ったわけですが、直した後に気前よくループを回していたら、OutOfMemoryが。
今回の作りは、ループごとにデータが増えていくので、無限に回してたらどこかでメモリが足りなくなるのはわかってるんですが、checkpoint()の時は余裕で回りきるくらいのループ回数でメモリが足りないって言われるのは納得がいきません。

原因として考えられるのは、persist()は過去を忘れるわけじゃない、ということ。
SparkのWeb UIで「DAG Visualization」というのがあって、それを見ると、ある元ネタからある結果を得るために実行される処理(mapとか)がDAG(有向非循環グラフというらしい)の形で連綿と連なっているのを見ることができます。その「ある元ネタから(Actionが呼ばれて?)ある結果を得る」までの一連の処理がJobと呼ばれています。
checkpoint()の場合は、checkpoint()を実行した次のJobからはcheckpoint()した時点からグラフが始まる(過去が清算される)のに対して、persist()の場合は過去の計算は記録されたままです。ただしpersist()以前の処理は灰色になっていて、スキップされたことがわかります。
これは、persist()した結果がなんらかの原因で失われてしまうことが想定されていて、仮に失われた場合は、元のデータから再計算できるようになっているためだと思われます。

ということで過去をいつまでも引きずってる未練たらたらの情念がメモリ消費として表れている、のでしょうか。でも途中のデータ自体を保持してるわけではなくて(と思う。おおもとのデータ以外は計算によって導けるから必要ないので)、計算順序を保持してるだけのはずなので、釈然としないですね。

まとめ

まだ正確な原因はわかってないし、やり方がまずいだけかもしれません(ツッコミお待ちしております)が、今回得られた知見をまとめます。

  • Sparkアプリケーションを新しく始める場合はRDDでなくDatasetを使いましょう。GraphXもRDDベースなのでよほどのことがなければ避けましょう。
  • persist()はすぐに実行されない。依存するデータをActionより前にunpersist()することのないように気を付けましょう。
  • checkpoint()はファイルが残ってしまうので、後片付けを忘れないようにしましょう。
  • persist()はcheckpoint()よりもメモリを消費します(多分・・・)
  • そもそもSparkは大量のデータに対して、「一括」で何かしらの処理・計算をさせるのに向いたフレームワークなので、大量のデータがあっても、そのごく一部を抽出してちょろちょろっと触るだけの用途にSpark向いてないかも。それインデックス効かせたDBを中心に処理したほうがよっぽど速いかも?(自分のやってきたこと全否定!でもごく一部っていうほどごく一部でもないんだよなあ、悩ましい)

Sparkマスターへの道は遠い。

5
3
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?