0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

【初心者】ApacheSpark入門編

Posted at

背景

初心者としてのSpark入門です。
入門編→基本編→応用編という順番でいきたいと思います。
分散処理の基礎知識色々が必要ですが、それを説明するのは苦手なので、下記の資料ご確認いただければ幸いです。
(RDDって何で上手く説明できない。。。)
・初めてのSpark
・Sparkによる実践データ解析 ―大規模データのための機械学習事例集
・入門 PySpark ―PythonとJupyterで活用するSpark 2エコシステム

目次

  1. 事前準備
  2. Sparkのインストール
  3. 最初のshell実行
  4. IDEA・Maven配置
  5. wordcountの実現
  6. Spark API
  7. RDD[T]のFunctions
  8. RDD[(K, V)]のPairRDDFunctions
  9. その他

事前準備

環境:
Windows+VMware+OS:Ubuntu22.04
Java -version

openjdk version "1.8.0_312"

Sparkのインストール

インストール前に先ずはSpark用のユーザ作成(必須ではない)

sudo adduser spark sudo

今回のSparkのバージョンが3.2.1
サイト:https://spark.apache.org/downloads.html

sudo wget https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz

前回Hadoopと同じ/optに解凍

sudo tar -xzvf spark-3.2.1-bin-hadoop3.2.tgz -C /opt

HOMEとPATHの設定

vim ~/.bashrc

下記追加
export SPARK_HOME=/opt/spark-3.2.1-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

source ~/.bashrc

そしてユーザsparkに権限をつけ

sudo chown -R spark /opt/spark-3.2.1-bin-hadoop3.2/

インストールがここまで完了しました。

最初のshell実行

Sparkではscalaとpythonのshellが付けられています。
scala-shellを確認しましょう。

cd /opt/spark-3.2.1-bin-hadoop3.2/bin
./pyspark

少々待ち後、下記のような画面が出てきます。
scala.png
scalaのバージョンが2.12.15
下記のコードを入力して、実行しましょう。

val lines = sc.textFile("/opt/spark-3.2.1-bin-hadoop3.2/README.md") 
lines.count()
lines.first()

結果が下記の通りになります。
scala1、.png
次はpythonのshellを体験しましょう。
ctrl+Dまたは

:quit

によりshellから抜け出し、下記のコマンドによりpython-shellに入ります。

./pyspark

図のように
python.png
pythonのバージョンが3.10.4
簡単にコードを叩く
python2.png
ここまでsparkのshellが体験出来ました。

IDEA・Maven配置

実際の開発ではshellではなく、IDE上でコードを書き、Gitにより管理することが多いと思います。
これから自分で書いたコードをVMに持ち込んで、Spark上実行しよう。
今回はJavaなので、先ずはIDEAとMavenを配置します。
IDEA.Ultimate
https://www.jetbrains.com/ja-jp/idea/download/#section=windows
(有料版を使っていますが、代わりにeclipseを使ってもいいですかな。)
Maven
https://maven.apache.org/download.cgi
上記のダウンロードが割愛させていただきます。
IDEA上のMaven
先ずは新たなプロジェクトを作成し、Build systemのところをMavenを選択してからcreate
p1.png
そして下記のようにFile->SettingsでMaven home pathを設定します。
p2.png
次はpom.xmlを編集

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SparkTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <spark.version>3.2.1</spark.version>
    </properties>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>
    <!-- https://blog.csdn.net/weixin_42961644/article/details/115090477 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.12.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>2.12.15</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

javaとscalaのバージョンがSparkと一致してください。
ここまで配置完了しました。

wordcountの実現

下記のところにパッケージcomを作成
mvn2.png
下記の内容でJavaClassを作成

package com;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.*;

public class wordCount {

    public static void main(String[] args) {
        /*
        Sparkを実行する前に必ず必要なconfファイルです
        setAppName("spark-wordcount")は識別用の名
        setMaster("local")今回はローカルで実行
         */
        SparkConf sparkConf = new SparkConf().setAppName("spark-wordcount").setMaster("local");
        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

        //データを読み込む、フォルダ/opt/dataの中にテキストファイルを置いてあります。型はRDD
        JavaRDD<String> lines = sparkContext.textFile("/opt/data");

        //flatMap関数:要素を別の型(要素数は複数可)に変換する。ここではLineごとのwordsに変更
        JavaRDD<String> words = lines.flatMap(s-> Arrays.asList(s.split(" ")).iterator());

        //String型のRDDをKeyValue型のPairRDDに変更して、現時点のデータは[word,1]という型になっています。
        JavaPairRDD<String,Integer> wordsOnes = words.mapToPair(s-> new Tuple2(s,1));

        //同じKeyで出現回数をcount
        JavaPairRDD<String, Integer> wordsCounts = wordsOnes.reduceByKey((v1, v2) -> v1 + v2);

        //集計結果を出力
        wordsCounts.saveAsTextFile("/opt/data1");
    }
}

上記のコードの説明はコメントまでご参照ください。
次は、IDEAのTerminalのところに、下記のコマンドを実行します。

mvn clean package

こちらは作成したコードと依頼関係などをpackageまで作成
mvn1.png
targetというフォルダが作成されて、その中の.jarファイルをコピーし、VMにアップロードします。
(今回はSparkと同じフォルダ/opt/の下に置きました。)
ようやくSparkを実行する時です。
/opt/まで行って、下記のコマンドを実行します。

/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submit --master local[3] --executor-memory 512m --class com.wordCount  SparkTest-1.0-SNAPSHOT.jar

/opt/spark-3.2.1-bin-hadoop3.2/bin/spark-submitはアプリケーションを起動
--master local[3]はローカル上core数3で実行
--executor-memory 512mはメモリーを512m指定
--class com.wordCountは先ほど作成したJavaCLASSを指定
SparkTest-1.0-SNAPSHOT.jarはjarを指定
実行完了後、/opt/data1の中に下記のファイルを置いてあります。その中part-00000が結果ファイルになります。

part-00000  _SUCCESS

Spark API

Scala、JavaおよびpythonのAPIがあります。
主に下記のサイドに参考
http://mogile.web.fc2.com/spark/rdd-programming-guide.html#rdd-operations
https://www.ne.jp/asahi/hishidama/home/tech/scala/spark/RDD.html
RDD処理は、変換処理、結合処理、アクションまたは永続化(キャッシュ)処理があります。
これからよく使われている処理を実行しようと思います。

RDD[T]のFunctions

RDDとしてファイルにを読み込む

JavaRDD<String> lines = sparkContext.textFile("/opt/data/test");

よくあるJsonファイルを読み込む

String jsonPath = "/opt/data/test.json";
JavaRDD<Row> items = spark.read().json("jsonPath").toJavaRDD();

RDDを作成

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 3, 4, 5, 6));

RDDをプリント

for(String line:lines.collect()){System.out.println("* "+line);}

or

lines.foreach(line -> {System.out.println("* "+line); });
変換処理

map関数:要素ごとに対して処理を行う

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> num2 = num1.map(x -> x * x) //num2 : (1,4,9)

flatmap関数:要素ごとに対して処理を行う、「結果を平坦化する」

JavaRDD<String> lines = sparkContext.textFile("/opt/data/test");//lines:"we are introduced to the narrator, a pilot, and his ideas about grown-ups","..."
JavaRDD<String> words = lines.flatMap(s-> Arrays.asList(s.split(" ")).iterator());//words: "we","are",...

filter関数:

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaRDD<Integer> num2 = num1.filter(x -> ( x % 2 == 1)); //num2 : (1,3,5)

distinct関数:重複排除

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3, 3));
JavaRDD<Integer> num2 = num1.distinct(); //num2 : (1,2,3)
結合処理

SQLと同じ、join関数(leftOuterJoin, rightOuterJoin および fullOuterJoin)とunion関数があります。

アクション

reduce関数:要素同士の演算を行う。

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> num2 = num1.reduce((a, b)-> a + b) //num2 : (6)

collect:RDD中の全要素を返す ※データサイズにより、あまり使わない、代わりにTake()を使用

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
Integer num2 = num1.Take(1); //num2 : 1 first()も同じ結果

count:要素の数を返す

JavaRDD<Integer> num1 = sparkContext.parallelize(Arrays.asList(1, 2, 3));
Integer num2 = num1.count(); //num2 : 3
永続化(キャッシュ)処理

persist()/cache() RDDを永続化する
unpersist(true) 永続化を解除する。

RDD[(K, V)]のPairRDDFunctions

KeyValue型のRDDでは、pairRDDFunctionsを使用します。
主に下記のFunctionsがあります(例は省略)。

変換処理

sortByKey
groupByKey
partitionBy
keys
values
mapValues
flatMapValues
combineByKey
reduceByKey

結合処理

subtractByKey
join
leftOuterJoin
rightOuterJoin

アクション

collectAsMap
reduceByKeyLocally
countByKey
lookup
saveAsSequenceFile

永続化(キャッシュ)処理

persist()/cache() RDDを永続化する
unpersist(true) 永続化を解除する。

その他

入門として、今回はここまでで十分だと思います。(理論基礎は全くないですが)
次回からは基礎編で、先ずはSparkのデータの読み込みを深く理解してから、共有変数についてのケーススタディをとプログラム実装しようと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?