使用している spark のバージョンは 1.6.3(古い) です。
spark-shell 起動
ローカルモードで起動しています。
$ spark-shell --master local[*]
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.3
/_/
テキストファイルのデータをRDDとして取得 (textFile)
scala> val rawData = sc.textFile("directory_name")
rawData: org.apache.spark.rdd.RDD[String] = linkage MapPartitionsRDD[1] at textFile at <console>:27
先頭の要素を取得 (RDD.first)
scala> rawData.first
res0: String = "col_1","col_2","col_3"
要素の総数を取得 (RDD.count)
scala> rawData.count
res3: Long = 1000
先頭から10個の要素を取得 (RDD.take)
scala> val head = rawData.take(10)
head: Array[String] = Array("col_1","col_2", ....
各要素を1行ずつ出力 (foreach)
scala> head.foreach(println)
"col_1","col_2","col_3"
70031,70237,1
84795,97439,1
36950,42116,1
25965,64753,1
49451,90407,1
37291,53113,1
39086,47614,1
42413,48491,1
39932,40902,1
関数を定義 (def)
"col_1" という文字が含まれていれば、1行目であると判断する関数
scala> def isFirstLine(line: String): Boolean = {
| line.contains("col_1")
| }
isFirstLine: (line: String)Boolean
上記の関数でフィルターにかける (filter)
1行目のみが抽出されるはず。
scala> head.filter(isFirstLine).foreach(println)
"col_1","col_2","col_3"
1行目以外を抽出する
例として無名関数を使っている。
scala> head.filter(x => !isFirstLine(x)).foreach(println)
or
scala> head.filter(!isFirstLine(_)).foreach(println)
70031,70237,1
84795,97439,1
36950,42116,1
25965,64753,1
49451,90407,1
37291,53113,1
39086,47614,1
42413,48491,1
39932,40902,1
4行目を取得
実際には、head.apply(4) が呼ばれている。
scala> val line = head(4)
line: String = 25965,64753,1