LoginSignup
0
0

More than 1 year has passed since last update.

【初心者】ApacheSpark基礎編-共有変数

Posted at

ApacheSpark基礎編-SparkSQLはこちらです。
https://qiita.com/SHA_AKA/items/b69fdd6a268d503682aa
今回は共有変数について、コードを書くと思います。

目次

  1. 共有変数とは
  2. アキュムレーター(accumulator)
  3. ブロードキャスト変数(Broadcast)
  4. その他

共有変数とは

通常、(map あるいは reduceのような)Spark操作に渡される関数は別々のエグゼキュータ(executor)上で実行され、関数の変数はドライバー(driver)各エグゼキュータにコピーされました。
各エグゼキュータ上の変数への更新はドライバーには反映されない。
ドライバープログラムにある変数を更新しても、各エグゼキュータ上の関数には影響がない。
この時は、Sparkから提供されている2種類の共有変数:Broadcastとaccumulator、を利用して、この制限を無視できます。
(下記の例を見たほうが理解しやすくなると思います。)

アキュムレーター(accumulator)

・“追加”のみを行う変数。
・ドライバーでアキュムレーターを生成し、各エグゼキュータでアキュムレーターに対して値の追加(加算・蓄積)を行い、driverでその結果(総計)を受け取ることが出来る。

データ:

share_code datetime_JST open high low close volume
7832 2022-07-19 09:00:00 9960.0 10010.0 9950.0 9950.0 0.0
7832 2022-07-19 09:01:00 9879.0 9901.0 9876.0 9901.0 12200.0
7832 2022-07-19 09:02:00 9892.0 9898.0 9888.0 9888.0 900.0
7832 2022-07-19 09:03:00 9883.0 9900.0 9881.0 9900.0 2300.0
7832 2022-07-19 09:04:00 9894.0 9905.0 9889.0 9905.0 1600.0

問題設定:前回の株価のデータで、カラムvolumnがnullの行をカウントする
countnullの結果volumeがnullの行数が334です。

先ずはSparkのLongAccumulatorを直接に利用します。

package com;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

public class SparkAcc {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .master("local")
                .appName("Java Spark Accumulator")
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());

        //jsonファイルの読み込み
        Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
        System.out.println("Json: ");
        dataset_js.show(10);

        //アキュムレーターを定義
        LongAccumulator accum = jsc.sc().longAccumulator();

        //アキュムレーターの関数をcall
        filterandaccumulator(sparkSession,dataset_js,accum);

        //アキュムレーターの値を確認
        System.out.println("accum: " + accum.value());
    }

    private static void filterandaccumulator(SparkSession sparkSession,Dataset<Row> dataset_js, LongAccumulator accum){
        dataset_js.createOrReplaceTempView("shareprice");
        Dataset<Row> shareprice = sparkSession.sql("SELECT * FROM shareprice where volume is null ");

        accum.add(shareprice.count());
    }
}

次はカスタマイズのAccumulatorを作ってから使用します。
MyAccumulator.class

package com;

import org.apache.spark.util.AccumulatorV2;

/*
AccumulatorV2によりアキュムレータをカスタマイズ
 */
public class MyAccumulator extends AccumulatorV2<Long,Long> {

    Long num = Long.valueOf(0);

    public MyAccumulator(Long num){
        this.num = Long.valueOf(num);
    }

    //アキュムレータの値を0判定
    @Override
    public boolean isZero() {
        return num == 0;
    }

    //コピーを作る
    @Override
    public AccumulatorV2<Long, Long> copy() {
        MyAccumulator myAccumulator = new MyAccumulator(this.num);
        return myAccumulator;
    }

    //アキュムレータをリセット
    @Override
    public void reset() {
        this.num = Long.valueOf(0);
    }

    //アキュムレータの値を加算
    @Override
    public void add(Long i) {
        num += i;
    }

    //他のアキュムレータとマージ・合算
    @Override
    public void merge(AccumulatorV2<Long,Long> other) {
        num += (other.value());
    }

    //値を返す
    @Override
    public Long value() {
        return this.num;
    }
}

main関数

package com;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkMyAcc {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .master("local")
                .appName("Java Spark Accumulator")
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());

        //jsonファイルの読み込み
        Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/code7832.json");
        System.out.println("Json: ");
        dataset_js.show(10);

        //アキュムレーターの関数をcall
        testMyAcc(sparkSession,jsc,dataset_js);


    }
    /*
    アキュムレーターの関数を作る
     */
    private static void testMyAcc(SparkSession sparkSession,JavaSparkContext jsc, Dataset<Row> dataset_js){
        MyAccumulator myAccumulator = new MyAccumulator(Long.valueOf(0));
        //アキュムレーターをregister
        jsc.sc().register(myAccumulator,"myAccumulator");
        //データ処理
        dataset_js.createOrReplaceTempView("shareprice");
        Dataset<Row> shareprice = sparkSession.sql("SELECT * FROM shareprice where volume is null ");
        //アキュムレーターに値を加算
        myAccumulator.add(shareprice.count());

        //アキュムレーターの値を確認
        System.out.println("myAccumulator: " + myAccumulator.value());
    }
}

結果は同じく、下記のようになっています。
LongAccumulator:
acc1.png
MyAccumulator:
acc2.png

ブロードキャスト変数(Broadcast)

・ドライバで定義した定数を各エグゼキュータに転送するための変数
・すべてのエグゼキュータに送信される

データ:日経平均225の各上場企業の1日5分ごとの株価です。

share_code datetime_JST open high low close volume
1332 2022-07-25 12:30:00 597.0 597.0 596.0 596.0 0.0
1332 2022-07-25 12:35:00 596.0 598.0 596.0 597.0 21300.0
1332 2022-07-25 12:40:00 596.0 597.0 596.0 597.0 18400.0
1332 2022-07-25 12:45:00 597.0 600.0 597.0 599.0 128900.0
1332 2022-07-25 12:50:00 598.0 599.0 598.0 599.0 8600.0

問題設定:特定企業の株価だけを確認したい
ドライバで定義した企業コードリストにより、各エグゼキュータにフィルターしたデータだけ抽出することです。
コード

package com;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

public class SparkBroadcast {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .master("local")
                .appName("Java Spark Accumulator")
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());

        //jsonファイルの読み込み
        Dataset<Row> dataset_js = sparkSession.read().json("/opt/data/nikkei_stock_price_1days.json");
        System.out.println("Json: ");
        dataset_js.show(10);

        /*
        broadcast変数リストを作る
        バンダイナムコ:7832、任天堂:7974、ソニー:6758
        三つのブランドのデータを抽出
         */
        final List<Integer> broadcastList = Arrays.asList(7832,7974,6758);
        final Broadcast<List<Integer>> broadcast = jsc.broadcast(broadcastList);;
        /*
        Datasetのfilter関数適用
        broadcastの値をstream().toArray(Integer[]::new)ストリーミングメゾットでvarargsに変換
        →isin方法はScalaのSeqまたはvarargsしか適用しない
         */
        List<Integer> share_code = broadcast.value();
        Dataset<Row> filterDataset = dataset_js.filter(dataset_js.col("share_code").isin(share_code.stream().toArray(Integer[]::new)));
        System.out.println("filterDataset: ");
        filterDataset.show();
    }
}

実行結果
dataframe1.png

その他

Local実行なので、実際開発中ではワークノードに分散して処理を行うため、多分色々な調整が必要かなと思っていますが、
勉強としては一旦アキュムレータとブロードキャストを体験しました。
次回は基礎編の知識まとめ、もしくは応用編Sparkstreamingだと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0