■ こちらの環境
OS: Ubuntu 16 or 18
Hadoop: hadoop-3.2.1.tar.gz
JDK (Java): jdk-8u202-linux-x64.tar.gz
Spark: spark-3.0.1-bin-hadoop3.2.tgz
ネームノード
192.168.76.216: h-gpu05
データノード
192.168.76.210: h-gpu03
192.168.76.210: h-gpu04
↓このようなデータを処理する。。。
$ head -n 2 random_data.txt
"2019/07/02 12:12:45.778","2019/07/02 12:12:45","2019/07/02 12:12:45","841","89.118.182.77","25846","CS","120.110.61.68","51321","e1","h14","26Yj7h5hN","Jt3","b8KaT","9f34R11z","8","JhMjNaKPDzE0bHwLEiGlI2a6rN","912","198","336","769","278","554","rand-pa1"
"2019/07/02 14:14:40.621","2019/07/02 14:14:40","2019/07/02 14:14:40","478","67.90.179.12","41214","XT","121.249.84.35","23907","Yz","dqj","CMZEwFLc1","GLD","8H0QQ","rxRX6CdO","5","ptTB2mmY3N491qvxUPeWJeFGhK","953","917","636","718","142","607","rand-pa1"
↓このようなMapReduceをしてみる。。。
// start = System.currentTimeMillis
val wordAndOnePairRDD = rdd.map(word => (word(1),1))
val wordAndOnePairArray = wordAndOnePairRDD.collect
val top3 = wordAndOnePairArray.take(3)
top3.foreach(println)
val wordAndCountRDD = wordAndOnePairRDD.reduceByKey((result,enum) => result + enum)
val wordAndCountArray = wordAndCountRDD.collect
// println("elapsed time: " + (System.currentTimeMillis - start) + " msec")
val top3_2 = wordAndCountArray.take(3)
top3_2.foreach(println)
1)まずはこのようなエラー(?)から
[warn] In the last 9 seconds, 21.64 (255.7%) were spent in GC. [Heap: 0.13GB free of 0.89GB, max 0.89GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
20/12/17 11:16:30 ERROR Executor: Exception in task 24.0 in stage 0.0 (TID 24)
java.lang.OutOfMemoryError: GC overhead limit exceeded
spark.driver.memoryを変更すればよい、というような話がWEBにあったので変更する
hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/conf$ pwd
/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/conf
hadoop@h-gpu05:/mnt/data/hadoop/spark-3.0.1-bin-hadoop3.2/conf$ cat spark-defaults.conf
1: spark.eventLog.enabled true
2: spark.eventLog.dir file:///tmp/spark-events/
3: #spark.eventLog.dir hdfs://h-gpu05:8021/directory
4: #spark.serializer org.apache.spark.serializer.KryoSerializer
5: spark.driver.memory 10g
6: spark.executor.memory 10g
7: spark.driver.maxResultSize 10g
8: #spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
しかし、反映されないようなので、(*´Д`)実行時に指定することにした。
$ sbt -J-Xmx128G -J-Xms128G run
2) 次に、下記のようなエラーが出た。。
20/12/17 11:20:51 INFO BlockManagerInfo: Removed taskresult_8 on 192.168.76.216:34239 in memory (size: 163.4 MB, free: 72.8 GB)
20/12/17 11:20:51 INFO BlockManagerInfo: Removed taskresult_20 on 192.168.76.216:34239 in memory (size: 163.4 MB, free: 72.9 GB)
[error] org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 7 tasks (1078.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
これは、上のspark-defaults.confの7行目を設定したらうまくいった。
7: spark.driver.maxResultSize 10g
Scalaのプログラム側で設定した。
val conf = new SparkConf()
conf.set("spark.executor.memory", "32g")
conf.set("spark.driver.memory", "32g")
conf.set("spark.driver.maxResultSize", "32g")
実行した。。
20/12/17 14:28:22 INFO TaskSetManager: Finished task 6.0 in stage 2.0 (TID 60) in 680 ms on localhost (executor driver) (27/27)
20/12/17 14:28:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
20/12/17 14:28:22 INFO DAGScheduler: ResultStage 2 (collect at main.scala:34) finished in 0.683 s
20/12/17 14:28:22 INFO DAGScheduler: Job 1 finished: collect at main.scala:34, took 9.868276 s
elapsed time: 28847 msec
("2020/12/12 09:48:04",73)
("2020/12/12 11:37:13",61)
("2020/12/12 08:58:24",85)
(`ー´)b