#はじめに
この記事は、Apache Spark
のpartition
の概念について例題を添えてまとめてみようと試みた記事です。
Apache Spark
の概要についてはApache Spark で分散処理入門をどうぞ。
#例題で見るパーティションのイメージ
以下は、Apache Spark
の分散処理のイメージを例題とともに図解しているものです。
まず、使う関数の説明、次にその処理のイメージ、最後にソースコードの順番で紹介しています。
パーティションの使い方が異なる三例を紹介します。
- filter関数の例
- flatmap関数の例
- reduceByKey関数の例
##filter関数の例
要素単位の変換としてはmap()
やfilter
などがあります。
関数 | 説明 |
---|---|
map() | 引数に関数を一つ取り、その関数をRDD内の各要素に適応し、その結果を新しい値とするRDDを返す |
filter() | 引数に関数を一つ取り、そのフィルタ関数が真になる要素だけを含むRDDを返す |
ソースコードはこちら
package other;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample02 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/*.txt");
JavaRDD<String> result = input.filter(s-> !s.contains("Triangle"));
result.saveAsTextFile("bin/output/output02");
sc.stop();
}
}
Rectangle
Circle
Triangle
Circle
Rectangle
Triangle
Triangle
Rectangle
RDD
は大量のデータを要素として保持する分散コレクションです。RDD
は複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的には__partition
__という塊に分割されています。Spark
ではこのpartition
が分散処理の単位となっています。RDD
をpartition
ごとに複数のマシンで処理することによって、単一のマシンでは処理できないデータを扱うことができます。
このように分割されたファイルを一つのRDD
として扱い、input.filter(s-> !s.contains("Triangle"))
と記述するだけで、分散を意識せずプログラミングできるようになってます。
##flatmap関数の例
filter
やmap
関数が1:1の関係だったのに対し、この関数は、一つの要素から複数の要素を生成する1:Nの関係の変換です。
関数 | 説明 |
---|---|
flatMap() | 引数に関数を一つ取り、その関数をRDD内の各要素に適応して呼ばれるが、この関数はその結果を返すIteratorを返します。それらのIterator全てから返された要素を値とするRDDを最優的に返します |
ソースコードはこちら
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample03 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/flatmap/*.txt");
JavaRDD<String> result = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
}
);
result.saveAsTextFile("bin/output/output03");
sc.stop();
}
}
I am from Fukuoka
Hello Apache Spark
Tokyo Institute of Technology
##reduceByKey関数の例
reduceByKey
は返還前のRDD
に含まれる要素を、同じRDD
に含まれるほかのpartition
の要素とまとめて処理する必要のあるものです。この変換はkey・valueペアを要素とするRDD
を対象にしており、同じkeyを持つ要素をまとめて処理します。Spark
はpartition
ごとに独立して分散処理を行うため同じkeyを持つ要素はすべて同じpartition
に含まれている必要があります。そのためreduce
処理の前に配置換え(shuffle
)が行われます。shuffle
は、返還前のRDD
の要素を、keyに基づいて変換後のRDD
のpartition
に振り分けるように行われます。そのため、shuffle
によって同じkeyを持つ要素が同じpartition
に含まれることが保証されます。
下の図では表しきれませんでしたが、shuffle
の前にそのpartitionないで
reduce`処理が行われ、通信コストを減らしているようです。
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
関数 | 説明 |
---|---|
reduceByKey() | 引数に関数を一つ取り、その関数は同じキーの値を結合する際のreduce処理である |
ソースコードはこちら
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* Created by hirokinaganuma on 2016/10/11.
*/
public class Sample04 {
public static void main(String[] args) throws Exception {
String master;
if (args.length > 0) {
master = args[0];
} else {
master = "local";
}
JavaSparkContext sc = new JavaSparkContext(master, "basicavg", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile("bin/input/reducebykey/*.txt");
JavaRDD<String> words = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
}
);
JavaPairRDD<String, Integer> pairrdd = words.mapToPair(
new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}
}
);
JavaPairRDD<String, Integer> output = pairrdd.reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){
return x+y;
}
});
output.saveAsTextFile("bin/output/output04");
}
}
fish
cat
cat
fish
fish
cat
cat
dog
#Apache Spark
のpartition
とは何なのか
RDD
は大量のデータを要素として保持する分散コレクションです。RDD
は複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的には__partition
__という塊に分割されています。Spark
ではこのpartition
が分散処理の単位となっています。RDD
をpartition
ごとに複数のマシンで処理することによって、単一のマシンでは処理できないデータを扱うことができます。
##モチベーション
分散プログラムでは通信は非常にコストが高く、ネットワークのトラフィックを最低限に抑えることはパフォーマンスの大幅な改善につながります。
Spark
のプログラムはRDD
のパーティショニングを制御して通信を削減していく必要があります。
例えばSpark
はkeyの集合がまとまってあるノードに現れることをプログラムで保証することができます(reduceByKey()で紹介しました)。特にパーティショニングが役立つのはデータセットが結合のようなキー操作を複数回再利用される場合に限ります。というのもSpark
の操作の多くはネットワーク越しにキーに基づくシャッフルを起こします。それらはすべてパーティショニングの恩恵を受けることができます。
##データのパーティショニング
Java
やScala
ではpartitioner
プロパティを用いてRDD
のパーティショニングの方法を指定できます。ここでspark.Partitioner
オブジェクトの値として、RDDに対してそれぞれのキーの行き先のパーティションを知らせるものをセットします。
reduceByKey()のように単一の
RDDに対して働く操作をパーティション化された
RDD上で動作させると、各keyのすべての値は、単一マシン上でローカルに計算され、ローカルに
reduce()された最終の値だけが、各ワーカーノードからマスターに戻されます(先ほど紹介しました)。 より細かく説明すると、シャッフル時に同じkeyを持つ要素を同じ
partitionに振り分けるのは
partitionerの仕事で、
partitionerは変換後の
RDDの
partition数と振り分け対象の要素のkeyの内容をもとに、要素を振り分ける先の
partitionを決定します。
Sparkではデフォルトで
keyのハッシュ値を変換後の
RDDの
partition数で割ったあまりをもとに、振り分け先の
partition`を決定します。
また、二つのRDD
に対して操作する場合は、事前にパーティショニングが行われていると、最低でも二つのRDD
のうちの片方はシャッフルされません。
さらに、Spark
はそれぞれの操作がパーティショニングに及ぼす内部的な影響を知っているので、データパーテショニングを行う操作によって生成されるRDD
に、自動的にpartitioner
を設定します。
##永続化
補足ですがRDD
に対してパーティショニングを行い、それによって生成されたRDD
を変換の対象とするのであればpersist()
で永続化すべきです。理由としては、それ以降のRDD
のアクションはパーティショニングで生成したRDD
の系統全体を評価し直すことになるので、パーティショニング前のRDD
に何度もハッシュパーティショニングが行われてしまい意味がなくなってしまいます。
#参考