LoginSignup
8
8

More than 5 years have passed since last update.

[Apache Spark入門メモ]その2 簡単なプログラムを書こう

Last updated at Posted at 2016-04-10

前回、インストールを済ませたと思うので、HelloWrod的な簡単なプログラムをSpark上で実行したいと思います。

単純なFizzBuzz

まずは多少、今さら感はありますが、みんな大好きFizzBuzzを作ってみます。

Spark版

 def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Example01")
    val sc = new SparkContext(conf)

    sc.parallelize(1 to 100).map(x => {
      if (x % 15 == 0) {
        "FizzBuzz"
      } else if (x % 5 == 0) {
        "Fizz"
      } else if (x % 3 == 0) {
        "Buzz"
      } else {
        x.toString
      }
    }).foreach(println)
    sc.stop()
  }

Java版

    public static void main(String[] args) throws IOException {
        SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("Example01");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        sc.parallelize(IntStream.range(1, 100).boxed().collect(Collectors.toList())).map(x -> {
            if (x % 15 == 0) {
                return "FizzBuzz";
            } else if (x % 5 == 0) {
                return "Fizz";
            } else if (x % 3 == 0) {
                return "Buzz";
            } else {
                return x.toString();
            }
        }).foreach(x -> System.out.println(x));
        sc.stop();
    }

IDEから実行すると、標準出力にFizzBuzzの結果が表示されたはずです。

SparkContextを作成することで、Sparkが起動されます。
今回は開発環境なのでSparkConfのsetMasterには"local"を指定しています。通常はクラスタ名とかデプロイ先を書くみたいです。
setAppNameにはSpark上でのアプリケーション名称を付けます。

sc.parallelizeは引数にとった値をRDDというSparkでの基本的なデータ構造に格納します。RDDに関しては別途解説しますが、
簡単に言うとクラスタ上の各ノードに構築された分散メモリです。API的にはコレクションのように振舞います。

実際のアプリケーションでは、ファイルやデータベースなどの巨大なデータがインプットになるので、
SparkContext#parallelizeを使うことは、まず無いでしょうがサンプルを作ったり振る舞いを検証するのには便利です。

で、初期化の部分こそ独特なものの、mapやforeachなど普通のコレクション操作のAPIですね?
Scalaユーザはもちろん、Java8のStreamAPIに慣れているなら特に違和感なく使えるはずです。この辺は記述方式が特殊なMapReduceとは大きく違うところです。

メソッドや変数、フィールドの利用

どのくらい「普通」にコーディングできるかをもう少し見ていきます。

まずは、メソッド呼び出し。

Scala版

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Example01")
    val sc = new SparkContext(conf)

    sc.parallelize(1 to 100)
        .filter(x => x % 15 == 0)
        .map(x => fizzBuzz(x))
        .foreach(println)
    sc.stop()
  }

  def fizzBuzz(x: Int): String = {
    if (x % 15 == 0) {
      "FizzBuzz"
    } else if (x % 5 == 0) {
      "Fizz"
    } else if (x % 3 == 0) {
      "Buzz"
    } else {
      x.toString
    }
  }

Java版

    public static void main(String[] args) throws IOException {
        SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("Example01");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        sc.parallelize(IntStream.range(1, 100).boxed().collect(Collectors.toList()))
                .filter(x -> x % 15 == 0)
                .map(x -> fizzBuzz(x))
                .foreach(x -> System.out.println(x));
        sc.stop();
    }

    static String fizzBuzz(Integer x) {
        if (x % 15 == 0) {
            return "FizzBuzz";
        } else if (x % 5 == 0) {
            return "Fizz";
        } else if (x % 3 == 0) {
            return "Buzz";
        } else {
            return x.toString();
        }
    }

ブロックの外の変数も利用できます。Javaの場合はLambdaなので当然finalまたは事実上のfinalである必要があります。
また、フィールド変数の場合は、そのフィールドを含むインスタンスごとシリアライズされてSparkクラスタに分散されてしまうので、
いったんローカル変数に代入するなどをすると良いようです。

Scala版

val n = 3;
sc.parallelize(1 to 100)
    .filter(x => x % n == 0)
    .map(x => fizzBuzz(x))
    .foreach(println)
sc.stop()

Java版

int n = 3;
sc.parallelize(IntStream.range(1, 100).boxed().collect(Collectors.toList()))
        .filter(x -> x % n == 0)
        .map(x -> fizzBuzz(x))
        .foreach(x -> System.out.println(x));
sc.stop();

自分で作成したクラスも利用できますが、単一のJVM上ではなく、他のノードで動作する可能性があるので、シリアライズ可能である必要があります。
というかJavaBeanはcase classに比べてやはり冗長ですね。

Scala

  case class Person(val name:String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Example01")
    val sc = new SparkContext(conf)

    sc.parallelize(Array("Nanoha", "Fate", "Vivio"))
      .map(x => Person(x))
      .foreach(println)
    sc.stop()
  }

Java

    static class Person implements Serializable {

        private String name;

        public Person(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return "Person(" + name + ")";
        }

    }

    public static void main(String[] args) throws IOException {
        SparkConf sparkConf = new SparkConf()
                .setMaster("local")
                .setAppName("Example01");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        sc.parallelize(Arrays.asList("Nanoha", "Fate", "Vivio"))
                .map(x -> new Person(x))
                .foreach(x -> System.out.println(x));
        sc.stop();
    }

まとめ

まずは動かす、ということで簡単なサンプルを作成してみました。
単なるmap/filter/reduceな構文なので関数型はもちろんJSやRubyや最近ならJavaやC#な人にも違和感の無い構文ですね。
この「普通に書ける」ということがSparkの大きな特徴の一つだと思っています。

次回は、もう少しAPI等の解説をしていきます。
それではHappy Hacking!

8
8
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8