Edited at

Sparkのチューニングに関するメモ

More than 1 year has passed since last update.

Sparkを使った際にチューニングで考慮した点のメモです。


前提となる環境


  • Spark1.4

  • 元データはJSON


データ形式と圧縮コーデック


  • データをParquet形式で扱う

    元となるデータはJSON形式ですが、Parquetの方が効率的に扱えるためJSONをParquetに変換します。

  • 必要なデータだけをParquetに保存する

    DataFrame#select()で必要なカラムだけを選択し、計算に不要なデータを取り除きます。

  • Parquetの圧縮形式にはsnappyを使用する

    デフォルトではParquetの圧縮形式はgzip形式ですが、snappyを選択することで高速な圧縮・伸長が行えます。
    (追記:2.0ではsnappyがデフォルトになっています)


コード例

sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

val jsonDataFrame = sqlContext.read.json(jsonFilePath)
val jsonDataFrame.select("id","type","arg1","arg2","arg3","time").write.format("parquet").save(parquetPath)


設定

spark-defaults.confを設定して使用可能なメモリを増やし、高速なシリアライザーの指定をしました。

spark.serializerはデフォルトではjava.io.Serializerですが、それより高速なシリアライザが用意されているためそれを使用します。

spark.executor.memoryとspark.driver.memoryのデフォルトは512mとかなり少ない設定になっています。

特にspark.executor.memoryはその60%がRDDのキャッシュに使用されるため効果が大きいと考え、spark.driver.memoryよりspark.executor.memoryにより多くメモリを振っています。


設定例

spark.serializer                   org.apache.spark.serializer.KryoSerializer

spark.executor.memory 6g
spark.driver.memory 2g


コーディング


  • 使いまわすRDDやDataFrameの計算結果をキャッシュする

    複数回使うRDDやDataFrameはcache()メソッドを使って計算結果をキャッシュするようにしました。
    これをしないと、使用される度に再計算が走ってしまうため、とても非効率になってしまいます。

    ちなみに、こういった計算結果のキャッシュをSparkでは永続化と呼ぶそうです。
    DBや一般的なシステムにおける永続化とは異なる概念なので注意が必要です。

  • バッチ全体をscalaで記述する

    最初は複数のコンポーネントをシェルで繋いでいたのですが、ディスクへの書き出しと読み込みを何度も発生させてしまい効率が悪いため、繋ぎの部分もscalaで記述することで計算のほとんどをオンメモリで処理できるようにしました。


まだ試してない


  • spark.io.compression.codecにlz4を指定

    RDDの圧縮コーデックを設定する spark.io.compression.codec というプロパティがある。

    デフォルトはsnappyなのだが、snappyよりも高速なlz4という圧縮形式が指定できるらしい。


  • spark.sql.parquet.filterPushdown をtrueに指定

    バグがあってデフォルトがoffになっているけどデータにnullが含まれない場合は安全だからtrueにしてもよいという記述が公式ドキュメントに載っている。

    「Parquet filter pushdown optimization」というのがなんの最適化なのか分からないけれど、trueにできれば何かが早くなるっぽい。

    (追記: 2.0ではデフォルトでtrueになっています)