ApacheSpark基礎編-SparkSQLはこちらです。
https://qiita.com/SHA_AKA/items/b69fdd6a268d503682aa
今回は共有変数について、コードを書くと思います。
目次
- 共有変数とは
- アキュムレーター(accumulator)
- ブロードキャスト変数(Broadcast)
- その他
共有変数とは
通常、(map あるいは reduceのような)Spark操作に渡される関数は別々のエグゼキュータ(executor)上で実行され、関数の変数はドライバー(driver)各エグゼキュータにコピーされました。
各エグゼキュータ上の変数への更新はドライバーには反映されない。
ドライバープログラムにある変数を更新しても、各エグゼキュータ上の関数には影響がない。
この時は、Sparkから提供されている2種類の共有変数:Broadcastとaccumulator、を利用して、この制限を無視できます。
(下記の例を見たほうが理解しやすくなると思います。)
アキュムレーター(accumulator)
・“追加”のみを行う変数。
・ドライバーでアキュムレーターを生成し、各エグゼキュータでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。
データ:
share_code | datetime_JST | open | high | low | close | volume |
---|---|---|---|---|---|---|
7832 | 2022-07-19 09:00:00 | 9960.0 | 10010.0 | 9950.0 | 9950.0 | 0.0 |
7832 | 2022-07-19 09:01:00 | 9879.0 | 9901.0 | 9876.0 | 9901.0 | 12200.0 |
7832 | 2022-07-19 09:02:00 | 9892.0 | 9898.0 | 9888.0 | 9888.0 | 900.0 |
7832 | 2022-07-19 09:03:00 | 9883.0 | 9900.0 | 9881.0 | 9900.0 | 2300.0 |
7832 | 2022-07-19 09:04:00 | 9894.0 | 9905.0 | 9889.0 | 9905.0 | 1600.0 |
… |
問題設定:前回の株価のデータで、カラムvolumnがnullの行をカウントする
countnullの結果volumeがnullの行数が334です。
先ずはSparkのLongAccumulatorを直接に利用します。
package com;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
public class SparkAcc {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.appName("Java Spark Accumulator")
.enableHiveSupport()
.getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//jsonファイルの読み込み
Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
System.out.println("Json: ");
dataset_js.show(10);
//アキュムレーターを定義
LongAccumulator accum = jsc.sc().longAccumulator();
//アキュムレーターの関数をcall
filterandaccumulator(sparkSession,dataset_js,accum);
//アキュムレーターの値を確認
System.out.println("accum: " + accum.value());
}
private static void filterandaccumulator(SparkSession sparkSession,Dataset<Row> dataset_js, LongAccumulator accum){
dataset_js.createOrReplaceTempView("shareprice");
Dataset<Row> shareprice = sparkSession.sql("SELECT * FROM shareprice where volume is null ");
accum.add(shareprice.count());
}
}
次はカスタマイズのAccumulatorを作ってから使用します。
MyAccumulator.class
package com;
import org.apache.spark.util.AccumulatorV2;
/*
AccumulatorV2によりアキュムレータをカスタマイズ
*/
public class MyAccumulator extends AccumulatorV2<Long,Long> {
Long num = Long.valueOf(0);
public MyAccumulator(Long num){
this.num = Long.valueOf(num);
}
//アキュムレータの値を0判定
@Override
public boolean isZero() {
return num == 0;
}
//コピーを作る
@Override
public AccumulatorV2<Long, Long> copy() {
MyAccumulator myAccumulator = new MyAccumulator(this.num);
return myAccumulator;
}
//アキュムレータをリセット
@Override
public void reset() {
this.num = Long.valueOf(0);
}
//アキュムレータの値を加算
@Override
public void add(Long i) {
num += i;
}
//他のアキュムレータとマージ・合算
@Override
public void merge(AccumulatorV2<Long,Long> other) {
num += (other.value());
}
//値を返す
@Override
public Long value() {
return this.num;
}
}
main関数
package com;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkMyAcc {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.appName("Java Spark Accumulator")
.enableHiveSupport()
.getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//jsonファイルの読み込み
Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
System.out.println("Json: ");
dataset_js.show(10);
//アキュムレーターの関数をcall
testMyAcc(sparkSession,jsc,dataset_js);
}
/*
アキュムレーターの関数を作る
*/
private static void testMyAcc(SparkSession sparkSession,JavaSparkContext jsc, Dataset<Row> dataset_js){
MyAccumulator myAccumulator = new MyAccumulator(Long.valueOf(0));
//アキュムレーターをregister
jsc.sc().register(myAccumulator,"myAccumulator");
//データ処理
dataset_js.createOrReplaceTempView("shareprice");
Dataset<Row> shareprice = sparkSession.sql("SELECT * FROM shareprice where volume is null ");
//アキュムレーターに値を加算
myAccumulator.add(shareprice.count());
//アキュムレーターの値を確認
System.out.println("myAccumulator: " + myAccumulator.value());
}
}
結果は同じく、下記のようになっています。
LongAccumulator:
MyAccumulator:
ブロードキャスト変数(Broadcast)
・ドライバで定義した定数を各エグゼキュータに転送するための変数
・すべてのエグゼキュータに送信される
データ:日経平均225の各上場企業の1日5分ごとの株価です。
share_code | datetime_JST | open | high | low | close | volume |
---|---|---|---|---|---|---|
1332 | 2022-07-25 12:30:00 | 597.0 | 597.0 | 596.0 | 596.0 | 0.0 |
1332 | 2022-07-25 12:35:00 | 596.0 | 598.0 | 596.0 | 597.0 | 21300.0 |
1332 | 2022-07-25 12:40:00 | 596.0 | 597.0 | 596.0 | 597.0 | 18400.0 |
1332 | 2022-07-25 12:45:00 | 597.0 | 600.0 | 597.0 | 599.0 | 128900.0 |
1332 | 2022-07-25 12:50:00 | 598.0 | 599.0 | 598.0 | 599.0 | 8600.0 |
… |
問題設定:特定企業の株価だけを確認したい
ドライバで定義した企業コードリストにより、各エグゼキュータにフィルターしたデータだけ抽出することです。
コード
package com;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
public class SparkBroadcast {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.master("local")
.appName("Java Spark Accumulator")
.enableHiveSupport()
.getOrCreate();
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
//jsonファイルの読み込み
Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/nikkei_stock_price_1days.json");
System.out.println("Json: ");
dataset_js.show(10);
/*
broadcast変数リストを作る
バンダイナムコ:7832、任天堂:7974、ソニー:6758
三つのブランドのデータを抽出
*/
final List<Integer> broadcastList = Arrays.asList(7832,7974,6758);
final Broadcast<List<Integer>> broadcast = jsc.broadcast(broadcastList);;
/*
Datasetのfilter関数適用
broadcastの値をstream().toArray(Integer[]::new)ストリーミングメゾットでvarargsに変換
→isin方法はScalaのSeqまたはvarargsしか適用しない
*/
List<Integer> share_code = broadcast.value();
Dataset<Row> filterDataset = dataset_js.filter(dataset_js.col("share_code").isin(share_code.stream().toArray(Integer[]::new)));
System.out.println("filterDataset: ");
filterDataset.show();
}
}
その他
Local実行なので、実際開発中ではワークノードに分散して処理を行うため、多分色々な調整が必要かなと思っていますが、
勉強としては一旦アキュムレータとブロードキャストを体験しました。
次回は基礎編の知識まとめ、もしくは応用編Sparkstreamingだと思います。