LoginSignup
1
7

More than 5 years have passed since last update.

Apache Spark を試す

Posted at

Apache Spark とは

  • 分散処理を行うフレームワーク
  • 複数台でクラスタ構成を取り各ワーカーで処理を手分けして行う
  • Hadoop の MapReduce 層と同じような役割を持ち、ファイルシステムベースでデータを扱う Hadoop に対してメモリベースでデータを扱うため高速

Kubernetes 環境で Spark を立ち上げる

Docker イメージの作成

  • サンプルのイメージはバージョンが古いので作り直してみる
  • ついでに Dockerfile 内でアメリカのサーバから直接ソースをダウンロードしているので事前にダウンロードしてコピーするように変更
FROM java:openjdk-8-jdk

ENV hadoop_ver 2.7.4
ENV spark_ver 2.2.0

# download from http://ftp.kddilabs.jp/infosystems/apache/hadoop/common/hadoop-${hadoop_ver}/hadoop-${hadoop_ver}-src.tar.gz
COPY hadoop.tgz /tmp/
# download from https://d3kbcqa49mib13.cloudfront.net/spark-${spark_ver}-bin-hadoop2.7.tgz
COPY spark.tgz /tmp/

RUN mkdir -p /opt && \
    cd /tmp && \
    tar -zxf hadoop.tgz && \
    mkdir -p /opt/hadoop/lib/ && \
    mv hadoop-${hadoop_ver}-src /opt/hadoop/lib/native && \
    echo Hadoop ${hadoop_ver} native libraries installed in /opt/hadoop/lib/native

RUN mkdir -p /opt && \
    cd /tmp && \
    tar -zxf spark.tgz && \
    mv spark-2.2.0-bin-hadoop2.7 /opt/spark && \
    echo Spark ${spark_ver} installed in /opt

# Add the GCS connector.
RUN cd /opt/spark/jars && \
    curl -O https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar

# if numpy is installed on a driver it needs to be installed on all
# workers, so install it everywhere
RUN apt-get update && \
    apt-get install -y python-numpy netcat && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

ADD log4j.properties /opt/spark/conf/log4j.properties
ADD start-common.sh start-worker start-master /
ADD core-site.xml /opt/spark/conf/core-site.xml
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ENV PATH $PATH:/opt/spark/bin

マスターコンテナを立ち上げる

  • このサンプルではクラスタリングに Standalone mode という Spark 組み込みのクラスタマネージャを使い、1 台のマスターと複数台のワーカーという構成を取る
  • マスターのコンテナを立ち上げる Kubernetes の定義ファイルが ReplicationController で書かれていたので後継の Deployment に変更して立ち上げる

定義ファイル

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: spark-master-controller
spec:
  replicas: 1
  template:
    metadata:
      labels:
        component: spark-master
    spec:
      containers:
        - name: spark-master
          image: bl/spark:v1.0
          command: ["/start-master"]
          ports:
            - containerPort: 7077
            - containerPort: 8080
          resources:
            requests:
              cpu: 100m

コマンド

# マスターコンテナの立ち上げ
kubectl apply -f spark-master-controller.yml

# マスターサービスの立ち上げ. こちらはサンプルそのまま使用
kubectl apply -f spark-master-service.yml

ワーカーコンテナを立ち上げる

  • ワーカーコンテナも同様に Deployment で定義し直して立ち上げる
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: spark-worker-controller
spec:
  replicas: 3
  template:
    metadata:
      labels:
        component: spark-worker
    spec:
      containers:
        - name: spark-worker
          image: bl/spark:v1.0
          command: ["/start-worker"]
          ports:
            - containerPort: 8081
          resources:
            requests:
              cpu: 100m
kubectl apply -f spark-worker-controller.yml

Spark を動かしてみる

  • 今回作ったイメージだと /opt/spark/bin にあるスクリプトを使って Spark に処理を実行させることができる
  • 任意のコンテナにログインしてスクリプトを実行してみる
# 実行中の Pod を確認
kubectl get pods

# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh

# Scala と Java のサンプルスクリプト実行には run-example を使う
/opt/spark/bin/run-example SparkPi

# Python のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/python/pi.py

# R のサンプルスクリプト実行には spark-submit を使う
/opt/spark/bin/spark-submit examples/src/main/r/dataframe.R

Spark で実行できるスクリプトを作る

  • サンプルは /opt/spark/examples にあるのでこれを自分でコンパイルして実行してみる
  • 今回は Java で Spark Streaming を使って Kafka からデータを読み込んで Word Count するスクリプトを作ってみる

コード

KafkaWordCount.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.spark;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.Durations;

import org.apache.log4j.*;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
 *      topic1,topic2
 */

public final class KafkaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
                    "  <brokers> is a list of one or more Kafka brokers\n" +
                    "  <topics> is a list of one or more kafka topics to consume from\n\n");
            System.exit(1);
        }

        //StreamingExamples.setStreamingLogLevels();
        Logger.getRootLogger().setLevel(Level.WARN);

        String brokers = args[0];
        String topics = args[1];

        // Create context with a 2 seconds batch interval
        SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "my-consumer-group");
        kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        // Create direct kafka stream with brokers and topics
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

        // Get the lines, split them into words, count the words and print
        JavaDStream<String> lines = messages.map(ConsumerRecord::value);
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
                .reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();

        // Start the computation
        jssc.start();
        jssc.awaitTermination();
    }
}

pom.xml

  • maven-assembly-plugin を使って依存しているパッケージも含んだ JAR ファイルを作成する
  • spark_core_2.11 などのパッケージは Spark 側で用意するので含める必要はない
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.spark</groupId>
    <artifactId>spark-app</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.example.spark.KafkaWordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- bind to the packaging phase -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

実行

  • 作成した JAR ファイルをコンテナにコピーしてキックスクリプトで実行する
# ファイルのコピー
kubectl cp spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar spark-master-controller-xxxxxxxxxx:/opt/spark/jars/

# ログイン
kubectl exec -it spark-master-controller-xxxxxxxxxx /bin/sh

# 実行
/opt/spark/bin/spark-submit \
  --class com.example.spark.KafkaWordCount \
  --master spark://spark-master:7077 \
  /opt/spark/jars/spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar \
  kafka-0:9092 topic-0
1
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
7