背景
初心者としてのSpark入門です。
入門編→基本編→応用編という順番でいきたいと思います。
分散処理の基礎知識色々が必要ですが、それを説明するのは苦手なので、下記の資料ご確認いただければ幸いです。
(RDDって何で上手く説明できない。。。)
・初めてのSpark
・Sparkによる実践データ解析 ―大規模データのための機械学習事例集
・入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム
目次
- 事前準備
- Sparkのインストール
- 最初のshell実行
- IDEA・Maven配置
- wordcountの実現
- Spark API
- RDD[T]のFunctions
- RDD[(K, V)]のPairRDDFunctions
- その他
事前準備
環境:
Windows+VMware+OS:Ubuntu22.04
Java -version
openjdk version "1.8.0_312"
Sparkのインストール
インストール前に先ずはSpark用のユーザ作成(必須ではない)
sudo adduser spark sudo
今回のSparkのバージョンが3.2.1
サイト:https://spark.apache.org/downloads.html
sudo wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
前回Hadoopと同じ/opt
に解凍
sudo tar -xzvf spark-3.2.1-bin-hadoop3.2.tgz -C /opt
HOMEとPATHの設定
vim ~/.bashrc
下記追加
export SPARK_HOME=/opt/spark-3.2.1-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
source ~/.bashrc
そしてユーザsparkに権限をつけ
sudo chown -R spark /opt/spark-3.2.1-bin-hadoop3.2/
インストールがここまで完了しました。
最初のshell実行
Sparkではscalaとpythonのshellが付けられています。
scala-shellを確認しましょう。
cd /opt/spark-3.2.1-bin-hadoop3.2/bin
./pyspark
少々待ち後、下記のような画面が出てきます。
scalaのバージョンが2.12.15
下記のコードを入力して、実行しましょう。
val lines = sc.textFile("/opt/spark-3.2.1-bin-hadoop3.2/README.md")
lines.count()
lines.first()
結果が下記の通りになります。
次はpythonのshellを体験しましょう。
ctrl+D
または
:quit
によりshellから抜け出し、下記のコマンドによりpython-shellに入ります。
./pyspark
図のように
pythonのバージョンが3.10.4
簡単にコードを叩く
ここまでsparkのshellが体験出来ました。
IDEA・Maven配置
実際の開発ではshellではなく、IDE上でコードを書き、Gitにより管理することが多いと思います。
これから自分で書いたコードをVMに持ち込んで、Spark上実行しよう。
今回はJavaなので、先ずはIDEAとMavenを配置します。
IDEA.Ultimate
https://www.jetbrains.com/ja-jp/idea/download/#section=windows
(有料版を使っていますが、代わりにeclipseを使ってもいいですかな。)
Maven
https://maven.apache.org/download.cgi
上記のダウンロードが割愛させていただきます。
IDEA上のMaven
先ずは新たなプロジェクトを作成し、Build systemのところをMavenを選択してからcreate
そして下記のようにFile->SettingsでMaven home pathを設定します。
次はpom.xmlを編集
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>SparkTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<spark.version>3.2.1</spark.version>
</properties>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<!-- https://blog.csdn.net/weixin_42961644/article/details/115090477 -->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.12.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>2.12.15</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
</project>
javaとscalaのバージョンがSparkと一致してください。
ここまで配置完了しました。
wordcountの実現
下記のところにパッケージcom
を作成
下記の内容でJavaClassを作成
package com;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
public class wordCount {
public static void main(String[] args) {
/*
Sparkを実行する前に必ず必要なconfファイルです
setAppName("spark-wordcount")は識別用の名
setMaster("local")今回はローカルで実行
*/
SparkConf sparkConf = new SparkConf().setAppName("spark-wordcount").setMaster("local");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
//データを読み込む、フォルダ/opt/dataの中にテキストファイルを置いてあります。型はRDD
JavaRDD<String> lines = sparkContext.textFile("/opt/data");
//flatMap関数:要素を別の型(要素数は複数可)に変換する。ここではLineごとのwordsに変更
JavaRDD<String> words = lines.flatMap(s-> Arrays.asList(s.split(" ")).iterator());
//String型のRDDをKeyValue型のPairRDDに変更して、現時点のデータは[word,1]という型になっています。
JavaPairRDD<String,Integer> wordsOnes = words.mapToPair(s-> new Tuple2(s,1));
//同じKeyで出現回数をcount
JavaPairRDD<String, Integer> wordsCounts = wordsOnes.reduceByKey((v1, v2) -> v1 + v2);
//集計結果を出力
wordsCounts.saveAsTextFile("/opt/data1");
}
}
上記のコードの説明はコメントまでご参照ください。
次は、IDEAのTerminalのところに、下記のコマンドを実行します。
mvn clean package
こちらは作成したコードと依頼関係などをpackageまで作成
targetというフォルダが作成されて、その中の.jarファイルをコピーし、VMにアップロードします。
(今回はSparkと同じフォルダ/opt/の下に置きました。)
ようやくSparkを実行する時です。
/opt/まで行って、下記のコマンドを実行します。
/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local[3] --executor-memory 512m --class com.wordCount SparkTest-1.0-SNAPSHOT.jar
/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit
はアプリケーションを起動
--master local[3]
はローカル上core数3で実行
--executor-memory 512m
はメモリーを512m指定
--class com.wordCount
は先ほど作成したJavaCLASSを指定
SparkTest-1.0-SNAPSHOT.jar
はjarを指定
実行完了後、/opt/data1
の中に下記のファイルを置いてあります。その中part-00000が結果ファイルになります。
part-00000 _SUCCESS
Spark API
Scala、JavaおよびpythonのAPIがあります。
主に下記のサイドに参考
http://mogile.web.fc2.com/spark/rdd-programming-guide.html#rdd-operations
https://www.ne.jp/asahi/hishidama/home/tech/scala/spark/RDD.html
RDD処理は、変換処理、結合処理、アクションまたは永続化(キャッシュ)処理があります。
これからよく使われている処理を実行しようと思います。
RDD[T]のFunctions
RDDとしてファイルにを読み込む
JavaRDD<String> lines = sparkContext.textFile("/opt/data/test");
よくあるJsonファイルを読み込む
String jsonPath = "/opt/data/test.json";
JavaRDD<Row> items = spark.read().json("jsonPath").toJavaRDD();
RDDを作成
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 3, 4, 5, 6));
RDDをプリント
for(String line:lines.collect()){System.out.println("* "+line);}
or
lines.foreach(line -> {System.out.println("* "+line); });
変換処理
map関数:要素ごとに対して処理を行う
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> num2 = num1.map(x -> x * x) //num2 : (1,4,9)
flatmap関数:要素ごとに対して処理を行う、「結果を平坦化する」
JavaRDD<String> lines = sparkContext.textFile("/opt/data/test");//lines:"we are introduced to the narrator, a pilot, and his ideas about grown-ups","..."
JavaRDD<String> words = lines.flatMap(s-> Arrays.asList(s.split(" ")).iterator());//words: "we","are",...
filter関数:
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaRDD<Integer> num2 = num1.filter(x -> ( x % 2 == 1)); //num2 : (1,3,5)
distinct関数:重複排除
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 3));
JavaRDD<Integer> num2 = num1.distinct(); //num2 : (1,2,3)
結合処理
SQLと同じ、join関数(leftOuterJoin, rightOuterJoin および fullOuterJoin)とunion関数があります。
アクション
reduce関数:要素同士の演算を行う。
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> num2 = num1.reduce((a, b)-> a + b) //num2 : (6)
collect:RDD中の全要素を返す ※データサイズにより、あまり使わない、代わりにTake()を使用
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
Integer num2 = num1.Take(1); //num2 : 1 first()も同じ結果
count:要素の数を返す
JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
Integer num2 = num1.count(); //num2 : 3
永続化(キャッシュ)処理
persist()/cache() RDDを永続化する
unpersist(true) 永続化を解除する。
RDD[(K, V)]のPairRDDFunctions
KeyValue型のRDDでは、pairRDDFunctionsを使用します。
主に下記のFunctionsがあります(例は省略)。
変換処理
sortByKey
groupByKey
partitionBy
keys
values
mapValues
flatMapValues
combineByKey
reduceByKey
結合処理
subtractByKey
join
leftOuterJoin
rightOuterJoin
アクション
collectAsMap
reduceByKeyLocally
countByKey
lookup
saveAsSequenceFile
永続化(キャッシュ)処理
persist()/cache() RDDを永続化する
unpersist(true) 永続化を解除する。
その他
入門として、今回はここまでで十分だと思います。(理論基礎は全くないですが)
次回からは基礎編で、先ずはSparkのデータの読み込みを深く理解してから、共有変数についてのケーススタディをとプログラム実装しようと思います。