Edited at

Spark 初心者の頃に勘違いしていた5つのこと

Apache Spark は、ビッグデータに対して高速に分散処理を行うオープンソースのフレームワークです。

ここでは自分が Spark を使い始めてしばらく勘違いしていたことを簡単にですが、5つご紹介します。

当初、Spark を Java のAPIを用いて使い始めましたが、そもそも Java 言語もあまり触ったことは無かったため、「そんなことも解っていなかったのか?」と言われるような内容かと思いますが、ご容赦ください。


勘違い1.driver プロセス内に配置したログは処理の順番に沿って出力されている

一般的にプログラムは、上から下に処理が進んでいくと思います。

そのため、アプリケーションの動作確認の用のログも上に書いたものから順番に出力されていくと期待すると思います。

例えば、以下のような処理を書いた場合、

SparkConf sparkConf = new SparkConf();

sparkConf.setMaster("local[*]");
sparkConf.setAppName("DriverLogOrder");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

List<Integer> list = new ArrayList<Integer>(){
{
add(1);
add(2);
add(3);
}
};

logger.info("1. start to create rdd.");
JavaRDD<Integer> rdd = jsc.parallelize(list);
logger.info("1. end to create rdd.");

logger.info("2. start to convert rdd.");
JavaRDD<Integer> rdd2 = rdd.map(x -> x * 5);
logger.info("2. end to convert rdd.");

logger.info("3. start to read rdd.");
rdd2.foreach(System.out::println);
logger.info("3. end to read rdd.");

jsc.close();

ログに以下のように出力されると、"create rdd" も "convert rdd" の処理も完了しているように見えます。

18/12/XX 22:45:53 INFO DriverLogOrder: 1.start to create rdd.

18/12/XX 22:45:53 INFO DriverLogOrder: 1.end to create rdd.
18/12/XX 22:45:53 INFO DriverLogOrder: 2.start to convert rdd.
18/12/XX 22:45:53 INFO DriverLogOrder: 2.end to convert rdd.

しかしながら、実際には何れの処理もまだ行われていません。

Spark には遅延評価という特徴があり、各 rdd に対して"アクション"と呼ばれる処理がコールされるまで、それ以前の変換処理は行われません。

今回の例の場合、以下の行が"アクション"と呼ばれる処理になります。

rdd2.foreach(System.out::println);

この行が呼ばれることで、それ以前の変換処理などがスタートします。

実際のログを見てみると、"3.start to read rdd"(最初の行) の後に、map 処理(最後の行)が出力されていることが分かるかと思います。

18/12/XX 22:45:53 INFO DriverLogOrder: 3.start to read rdd.

18/12/XX 22:45:53 INFO SparkContext: Starting job: foreach at DriverLogOrder.java:41
18/12/XX 22:45:53 INFO DAGScheduler: Got job 0 (foreach at DriverLogOrder.java:41) with 2 output partitions
18/12/XX 22:45:53 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at DriverLogOrder.java:41)
18/12/XX 22:45:53 INFO DAGScheduler: Parents of final stage: List()
18/12/XX 22:45:53 INFO DAGScheduler: Missing parents: List()
18/12/XX 22:45:54 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at DriverLogOrder.java:37), which has no missing parents

このように、driver プロセス内に配置したログは、rdd のアクションを実行するタイミングに留意しないと、その出力結果に惑わされる可能性があります。

基本的に"アクション"に対して、ログを配置するようにすると良いと思います。


勘違い2.executor のログは参照ができない

いわゆる hadoop の webUI からアクセスできるログですが、直接アクセスした時に表示されるのは、driver プロセスのログであって、各 executor のログではありませんでした。

※yarn-cluster モードで実行した場合になります

そのため、executor のログは見れないと最初思い込んでいました。

いえいえ、実際には参照可能な方法はいくつかありました。

1.yarn コマンドでダウンロード(yarn をリソース管理マネージャーとして使っている場合)

yarn logs -applicationId application_1545141756756_0001  > application_1545141756756_0001.log

2.driver のログ上にある executor のログへのリンク(spark のバージョンに依存しますが、1.6 以上であれば含まれていると思います)

Spark アプリケーションの不具合は executor 上での不具合であるケースが多いため、そのログが有益な情報を持っていることが多いです。


勘違い3.データの永続化はとにかくするべき

Spark にはアプリケーション中に計算した内容をメモリ上やストレージ上にキャッシュする機能があります。

rdd.persist(StorageLevel.DISK_ONLY)

これは各RDDに対して指定することができますが、Spark を始めた頃はとにかくキャッシュするようにしていました。

というのも、キャッシュさえしておけば、もし何らかのアプリケーション障害があった場合に、データの復旧が早く行われると思っていたためです。

しかしながら、このキャッシュ機能の主目的は対障害ではなく、その rdd を複数回呼び出す場合に再計算を避けるところにあります。

例えば、以下のように "rdd2.foreach" を2回呼び出すプログラムがあったとします。

※あるRDD(rdd2) に対して、2回アクションを呼び出しています

List<Integer> list = new ArrayList<Integer>(){

{
add(1);
add(2);
add(3);
}
};

JavaRDD<Integer> rdd = jsc.parallelize(list);
JavaRDD<Integer> rdd2 = rdd.map(x -> x * 5);

logger.info("start to read rdd 1st");
rdd2.foreach(System.out::println);
logger.info("start to read rdd 2nd");
rdd2.foreach(System.out::println);

この場合、Spark では、rdd2 を作成する処理を2回行うことになります。

この例では、"rdd2 = rdd.map(x -> x * 5);" の処理が2回行われます。

何故ならば、1回目の "rdd2.foreach" を処理した後、rdd2 は破棄されるためです。

rdd2 は破棄されてしまうため、再度 "rdd2.foreach" を実行するためには、再計算が必要となるわけです。

これを避けるためには、rdd2 を以下のようにしてキャッシュしてあげます。

JavaRDD<Integer> rdd2 = rdd.map(x -> x * 5).persist(StorageLevel.MEMORY_ONLY());

ログを見てみると、persist 有の場合は、最後の行に "BlockManager: Found block rdd_1_0 locally" が出力されているのが分かります。

(persist 無)

18/12/19 00:15:39 INFO PersistCheck: start to read rdd 2nd

18/12/19 00:15:39 INFO SparkContext: Starting job: foreach at PersistCheck.java:41
18/12/19 00:15:39 INFO DAGScheduler: Got job 1 (foreach at PersistCheck.java:41) with 2 output partitions
18/12/19 00:15:39 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at PersistCheck.java:41)
18/12/XX 00:15:39 INFO DAGScheduler: Parents of final stage: List()
18/12/XX 00:15:39 INFO DAGScheduler: Missing parents: List()
18/12/XX 00:15:39 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at map at PersistCheck.java:34), which has no missing parents
18/12/XX 00:15:39 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 885.6 MB)
18/12/XX 00:15:39 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1801.0 B, free 885.6 MB)
18/12/XX 00:15:39 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:49822 (size: 1801.0 B, free: 885.6 MB)
18/12/XX 00:15:39 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
18/12/XX 00:15:39 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at map at PersistCheck.java:34) (first 15 tasks are for partitions Vector(0, 1))
18/12/XX 00:15:39 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
18/12/XX 00:15:39 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 7860 bytes)
18/12/XX 00:15:39 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, PROCESS_LOCAL, 7875 bytes)
18/12/XX 00:15:39 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
18/12/XX 00:15:39 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
18/12/XX 00:15:39 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 708 bytes result sent to driver

(persist 有)

18/12/XX 00:13:53 INFO PersistCheck: start to read rdd 2nd

18/12/XX 00:13:53 INFO SparkContext: Starting job: foreach at PersistCheck.java:41
18/12/XX 00:13:53 INFO DAGScheduler: Got job 1 (foreach at PersistCheck.java:41) with 2 output partitions
18/12/XX 00:13:53 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at PersistCheck.java:41)
18/12/XX 00:13:53 INFO DAGScheduler: Parents of final stage: List()
18/12/XX 00:13:53 INFO DAGScheduler: Missing parents: List()
18/12/XX 00:13:53 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[1] at map at PersistCheck.java:35), which has no missing parents
18/12/XX 00:13:53 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.9 KB, free 885.6 MB)
18/12/XX 00:13:53 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1805.0 B, free 885.6 MB)
18/12/XX 00:13:53 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:48199 (size: 1805.0 B, free: 885.6 MB)
18/12/XX 00:13:53 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
18/12/XX 00:13:53 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[1] at map at PersistCheck.java:35) (first 15 tasks are for partitions Vector(0, 1))
18/12/XX 00:13:53 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
18/12/XX 00:13:53 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 7860 bytes)
18/12/XX 00:13:53 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, PROCESS_LOCAL, 7875 bytes)
18/12/XX 00:13:53 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
18/12/XX 00:13:53 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
18/12/XX 00:13:53 INFO BlockManager: Found block rdd_1_0 locally

要はキャッシュ上に必要なデータ(rdd2)が見つかったということです。

例に挙げているのが、非常に軽量な処理であるため、実行速度での違いが見られませんが、もし大きい複雑な計算を経由して生成されるRDDであった場合、その再計算のコストは大きくなります。

そういったRDDに対しては、キャッシュしておくメリットがあります。

一方で、再利用されることの無いRDD、あるいは再計算するにしても非常に低コストなRDDの場合、むしろキャッシュするコストの方が高くつく可能性があります。

そのため、やみくもにキャッシュするのはパフォーマンス低下に繋がる可能性があるので、考えて設計するべき内容になります。

その他、RDDキャッシュ時の StorageLevel については、以下にも補足書いています。

Apache spark RDD キャッシュ時の StorageLevel について


勘違い4.リソース(executor数、CPUコア数)を増やすだけで、並列度が増える

executor数 × CPUコア数 = スレッド数(並列度)

アプリケーション実行時に指定(リクエスト)した executor と core が希望通りに割り当てされれば、原則このようにスレッド数は概ね決定すると思います。

しかしながら、タスクが特定の executor に偏っているようで上手く分散処理が出来ていませんでした。


  • executor や core の割り当てを増やしても、パフォーマンスが向上しない。

  • Spark の web-UI を参照しても、executor の割り当てはされているが、1つのexecutorだけがずっと処理している。

このときは、パーティションという概念を理解していませんでした。

Spark のタスクはパーティション単位で executor に割り当てられます。

つまり、いくらタスクが一万個あったとしても、それが1つのパーティション内に入っていたら、その一万個のタスクは1つの executor に全て割り当てられてしまいます。

特にテキストやデータベースからデータを読み込んだ直後などは、パーティション数が非常に小さい値になってしまっている可能性があるので注意が必要です。

これを出来るだけ避け、executor に出来るだけ処理を等しく分散させるためには、executor にタスクとして処理を渡してあげる前に、パーティション数を増やす必要があります。

JavaRDD<Integer> rdd2 = rdd.map(x -> x * 5).repartition(10);

これをリパーティションと呼びます。

この例ではパーティション数を10に設定しています。

※元々10より多かった場合は、パーティション数の減少という動きになります

トータルのタスク数が100あった場合、1つのパーティションが概ね10個ずつタスクを持つことになります。

そして、各 executor がパーティションを受け取り、10個のタスクを実行し終わったら、次のパーティションを受け取る。このサイクルを繰り返します。

以下のログを見てみると、パーティション単位で一つずつ完了させている様子が分かるかと思います。

※パーティション数がトータルで10ある場合

18/12/XX 01:00:36 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 96 ms on localhost (executor driver) (1/10)

18/12/XX 01:00:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 144 ms on localhost (executor driver) (2/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 4) in 56 ms on localhost (executor driver) (3/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 5) in 61 ms on localhost (executor driver) (4/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 (TID 6) in 53 ms on localhost (executor driver) (5/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 8.0 in stage 1.0 (TID 7) in 41 ms on localhost (executor driver) (6/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 9.0 in stage 1.0 (TID 8) in 41 ms on localhost (executor driver) (7/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 9) in 112 ms on localhost (executor driver) (8/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 10) in 109 ms on localhost (executor driver) (9/10)
18/12/XX 01:00:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 11) in 35 ms on localhost (executor driver) (10/10)

リパーティションは、ShuffleMapStage という扱いの上で処理されています。

つまり、各 executor 間でデータのシャッフルが行われているということです。

18/12/XX 00:49:29 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at repartition at RepartitionCheck.java:33), which has no missing parents

通常はノード間、即ちネットワーク経由でのデータのシャッフルが行われるはずです。

これは転送するデータサイズにも依りますが、低コストではないので、むやみにシャッフルするようなことも避けるべきです。


勘違い5.メモリはアプリケーション実行時に指定した量が正確に割り当てられる

executor に割り当てるメモリ量は以下で指定できます。

(アプリケーションプロパティの場合)

spark.executor.memory=1536m

(spark submit の引数の場合)

--executor-memory 1536m

バージョンに依って異なるかもしれませんが、デフォルトが 512m だったりして少ないので、大体のケースでは意図して多めに指定すると思います。

ただ、これが指定した値通りに割り当てられていませんでした。

※1536m と指定しているのに、2048m 割り当てられているなど

これは、yarn のメモリ割り当ての仕組みに依るものでした。

yarn には "yarn.scheduler.minimum-allocation-mb" というプロパティがあり、Spark(その他、クライアントAPP)は、この設定値の単位でのみリソース要求を行うことが出来ます。

もし "yarn.scheduler.minimum-allocation-mb" が 1024m だったとすると、"1024m","2048m","3072m","4096m"という単位でしか割り当てられないということです。

つまり、"1536m" を要求した場合は、その値以上で次の割り当て単位である "2048m" が割り当てられることになります。

この単位が細かく設定(1024mなど)されている場合は、仮に大きめに要求したとしても大きな誤差にはなりませんが、もし単位が4096mなどだった場合、最大で4G近く過剰にメモリを要求することになります。

一方で、最大で割り当て可能なプロパティも存在しており、"yarn.scheduler.maximum-allocation-mb" これを超えないようにする必要もあります。


おわりに

他にも勘違いしていることや、今でも理解不足の面があるように思いますが、最初に思いついた5つをピックアップしてみました。

個人の経験に基づくもので、体系的に学ぶには適していないかと思いますが、何らかの助けになれば幸いです。