0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

spark-3.0.1でcsvファイルを読み込む

Posted at

こちらの環境

■ Hadoop -> hadoop-3.2.1
■ Spark -> spark-3.0.1-bin-hadoop3.2
■ Scala -> 2.11.12

まずは、build.sbtから


 1 name := "App"                                                                                                                                                                                           
 2 version := "1.0"                                                                                                                                                                                        
 3 scalaVersion := "2.11.12"                                                                                                                                                                               
 4                                                                                                                                                                                                        
 5 libraryDependencies ++= Seq(                                                                                                                                                                            
 6   "org.apache.spark" %% "spark-core" % "2.2.0",                                                                                                                                                         
 7   "org.apache.spark" %% "spark-sql" % "2.1.2",                                                                                                                                                          
 8   "com.databricks" %% "spark-csv" % "1.0.3",                                                                                                                                                            
 9   "org.apache.logging.log4j" % "log4j-core" % "2.14.0"                                                                                                                                                  
10 )

8行目に、spark-csvというのがあるが、
github.com/databricks/spark-csv

依存の問題なのか、rdd.foreach(println)とするとエラーが出るので、今回は使わなかった。


7:      val rddFromFile = sc.textFile("random_data.txt")

とするのが、分かりやすくてよかった。

コードを見てみる。。。


1: import org.apache.spark.SparkContext
2: object Main {
3:  def main(args: Array[String]) {
4:
5:      val sc = new SparkContext("local[*]", "App") 
6:
7:      val rddFromFile = sc.textFile("random_data.txt")
8:      val rdd = rddFromFile.map(f=>{
9:      	  f.split(",")
10:      })
11:
12:      rdd.foreach(f=>{
13:      println("Col1: "+ f(0) + ",Col2: "+ f(1))
14:      })
15:
16:      sc.stop()
17:  }
18: }

8-10行目の.map処理の中で.splitが使える。

実行してみる。。。
github.com/RuoAndo/qiita/tree/master/spark/csv


~/qiita/spark/csv$ sbt run   

- 省略 - 

Col1: "2019/07/02 12:12:43.017",Col2: "2019/07/02 12:12:43"                                                                                                                                               
Col1: "2019/07/02 12:12:51.545",Col2: "2019/07/02 12:12:51"                                                                                                                                               
Col1: "2019/07/02 13:13:01.831",Col2: "2019/07/02 13:13:01"                                                                                                                                               
20/12/10 22:25:14 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver                                                                                                  
20/12/10 22:25:14 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 794 bytes result sent to driver
20/12/10 22:25:14 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 128 ms on localhost (executor driver) (1/2)
20/12/10 22:25:14 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 143 ms on localhost (executor driver) (2/2)
20/12/10 22:25:14 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/12/10 22:25:14 INFO DAGScheduler: ResultStage 0 (foreach at main.scala:13) finished in 0.157 s
20/12/10 22:25:14 INFO DAGScheduler: Job 0 finished: foreach at main.scala:13, took 0.246704 s
20/12/10 22:25:14 INFO SparkUI: Stopped Spark web UI at http://192.168.76.216:4040
20/12/10 22:25:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

- 省略 - 

[success] Total time: 5 s, completed Dec 10, 2020 10:25:14 PM                                                                                                                                             
20/12/10 22:25:14 INFO ShutdownHookManager: Shutdown hook called   

(`ー´)b

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?