4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Spark 入門 チュートリアルで概念を学ぶ

Last updated at Posted at 2020-04-20

はじめに

Spark 勉強の一環で、前回の記事で CentOS に Spark 2.4.4 を構築しました。
Spark の 公式Document に、Spark の基本的な概念を学ぶための、体験的なQuick Start があります。今回の記事は、Quick Start の内容を実施してSpark の基礎を学ぶ備忘録です。

下の Document に書かれている Quick Start を進めて行きます。

Basic

Spark Shell を起動します。Spark Shell は標準でついてくる Scala 言語の対話型シェルです。

spark-shell

実行例

[opc@spark01-63591 spark-2.4.4-bin-hadoop2.7]$ ./bin/spark-shell
20/04/19 18:56:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicableUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:4040
Spark context available as 'sc' (master = local[*], app id = local-1587322599558).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Spark がデータを扱う時には、Dataset と呼ばれる抽象的な概念を使って扱います。Dataset は、Hadoop InputFormat(HDFSなど) から生成します。今回は、Spark をインストールしたディレクトリにある Spark の README ファイルのテキストファイルから、新しい Dataset を作成します。

コマンド

val textFile = spark.read.textFile("/home/opc/sparkinstall/spark-2.4.4-bin-hadoop2.7/README.md")

実行例

scala> val textFile = spark.read.textFile("/home/opc/sparkinstall/spark-2.4.4-bin-hadoop2.7/README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala>

README の Dataset の行数を表示します

textFile.count()

実行例
105 行と数値が返ってきています。

scala> textFile.count()
res0: Long = 105

scala>

最初の行を表示します

textFile.first()

実行例
# Apache Spark という文字が出力されます。なお、実際の README ファイルは、この記事の末尾にある 付録 : READMEの中身 を参照してください。このファイルの先頭に書かれている文字が出力されています。

scala> textFile.first()
res1: String = # Apache Spark

scala>

README ファイルの全ての行を Dataset として表示します

textFile.collect()

実行例

scala> textFile.collect()
res14: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html)., This README file only contains basic setup instructions., "", ## Building Spark, "", Spark is built using [Apache Maven]...
scala>

Dataset でどのようなAPIを使えるか知りたい場合は、下のDocumentに詳細が記載されています
https://spark.apache.org/docs/2.4.4/api/scala/index.html#org.apache.spark.sql.Dataset

この Dataset から、新しい Dataset に変換します。"Spark" という文字列が含まれている行のみをフィルターして、新しい Dataset を変換します

val linesWithSpark = textFile.filter(line => line.contains("Spark"))

実行例

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

scala>

show で見やすく表示をします。

linesWithSpark.show(100,100)

実行例
確かに、"Spark" という文字列だけが含まれたデータセットとなっています

scala> linesWithSpark.show(100,100)
+----------------------------------------------------------------------------------------------------+
|                                                                                               value|
+----------------------------------------------------------------------------------------------------+
|                                                                                      # Apache Spark|
|                      Spark is a fast and general cluster computing system for Big Data. It provides|
|                          rich set of higher-level tools including Spark SQL for SQL and DataFrames,|
|                                                          and Spark Streaming for stream processing.|
|                                You can find the latest Spark documentation, including a programming|
|                                                                                   ## Building Spark|
|                                      Spark is built using [Apache Maven](http://maven.apache.org/).|
|                                                       To build Spark and its example programs, run:|
|You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel ...|
|                        ["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).|
|For general development tips, including info on developing Spark using an IDE, see ["Useful Devel...|
|                                    The easiest way to start using Spark is through the Scala shell:|
|                          Spark also comes with several sample programs in the `examples` directory.|
|                                                                           ./bin/run-example SparkPi|
|                                                  MASTER=spark://host:7077 ./bin/run-example SparkPi|
|                Testing first requires [building Spark](#building-spark). Once Spark is built, tests|
|                       Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported|
|                       Hadoop, you must build Spark against the same version that your cluster runs.|
|                              in the online documentation for an overview on how to configure Spark.|
|          Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)|
+----------------------------------------------------------------------------------------------------+


scala>

Dataset の変換と、アクションを一緒に実行もできます

textFile.filter(line => line.contains("Spark")).count()

実行例

scala> textFile.filter(line => line.contains("Spark")).count()
res22: Long = 20

scala>

More on Dataset Operations

Dataset のAPIは複雑な分析用途で便利に使えます。1行の中で最も単語の多い数を見つけたいとしましょう。次のコマンドを実行します。

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

実行例
22単語が、README の中に書かれている行の中で、一番多い単語数です。

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res29: Int = 22

scala>

コマンドを分解して、一つ一つ確認していきます。まずは、以下のコマンドを実行しましょう。

textFile.map(line => line.split(" ").size)

すると、コマンドの実行結果が返ってきており、int型のDataset が返ってきていることが分かります。

scala> textFile.map(line => line.split(" ").size)
res39: org.apache.spark.sql.Dataset[Int] = [value: int]

scala>

実際の中身を確認しましょう

textFile.map(line => line.split(" ").size).collect()

実行例
数字のArrayが出力されます。README.md の各行で、どれくらいの単語数か表す数値です。

scala> textFile.map(line => line.split(" ").size).collect()
res40: Array[Int] = Array(3, 1, 14, 13, 11, 12, 8, 6, 1, 1, 1, 1, 3, 1, 10, 6, 8, 1, 3, 1, 6, 8, 1, 8, 1, 13, 1, 22, 10, 2, 1, 16, 1, 4, 1, 12, 1, 5, 1, 8, 1, 8, 1, 4, 1, 11, 1, 
5, 1, 10, 1, 6, 1, 3, 1, 11, 11, 1, 6, 1, 6, 1, 12, 12, 9, 13, 14, 3, 1, 7, 1, 13, 1, 3, 1, 10, 4, 1, 5, 1, 7, 8, 1, 9, 1, 6, 1, 13, 11, 13, 1, 7, 7, 12, 8, 1, 2, 1, 6, 12, 1, 2, 1, 7, 11)

scala>

このArrayの中で最も大きい数値を出すために、Spark に組み込まれている reduce を使っています。Array のデータを頭から順番に比較していって、最も大きい数値を返しています。

textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

また、同様のことを Scala に含まれている便利なライブラリでもできます。Math を import したあとに、Max 関数を使うことで、if 文で比較していたものと同様の内容を簡単に記述できます。

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res42: Int = 22

scala>

次に README ファイルの中に含まれる全単語が、どれくらいの数含まれているのかを算出します。wordCounts のデータセットを生成します。

val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

wordCounts のデータセットの中身を出力します

wordCounts.collect()

実行例
各単語のcountが出力されています

scala> wordCounts.collect()
res47: Array[(String, Long)] = Array((online,1), (graphs,1), (["Parallel,1), (["Building,1), (thread,1), (documentation,3), (command,,2), (abbreviated,1), (overview,1), (rich,1), (set,2), (-DskipTests,1), (name,1), (page](http://spark.apache.org/documentation.html).,1), (["Specifying,1), (stream,1), (run:,1), (not,1), (programs,2), (tests,2), (./dev/run-tests,1), (will,1), ([run,1), (particular,2), (option,1), (Alternatively,,1), (by,1), (must,1), (using,5), (you,4), (MLlib,1), (DataFrames,,1), (variable,1), (Note,1), (core,1), 
(more,1), (protocols,1), (guidance,2), (shell:,2), (can,7), (site,,1), (systems.,1), (Maven,1), ([building,1), (configure,1), (for,12), (README,1), (Interactive,2), (how,3), ([Configuration,1), (Hive,2), (system,1), (provides,1), (Hadoop-supported,1), (pre-built,...
scala>

Caching

Spark では、データセットをクラスター全体のメモリにキャッシュすることが出来ます。Quick Start の例だと README は100行程度の非常に小さいサイズですが、大きいデータセットでも同様にキャッシュが出来ます。PageRank などの反復アルゴリズムを使う場合など、データに繰り返しアクセスする用途で非常に便利に利用できます。

以下の cache() コマンドで、linesWithSpark という名前の Dataset をクラスタのメモリ上にキャッシュします。

linesWithSpark.cache()

実行例

scala> linesWithSpark.cache()
res0: linesWithSpark.type = [value: string]

この後に count() を初回実行した時、linesWithSpark データセットは、クラスタ全体のメモリにキャッシュされます

linesWithSpark.count()

実行例

scala> linesWithSpark.count()
res1: Long = 20

2回目以降のアクセスは、キャッシュされたデータを使って処理をします

scala> linesWithSpark.count()
res2: Long = 20

scala> linesWithSpark.count()
res3: Long = 20

scala> linesWithSpark.count()
res4: Long = 20

Application で実行

ここまでは、Spark Shell を使って対話的にデータ処理を行ってきました。ここからは、実際の Java のアプリケーションを使って、Spark で動かす方法を確認します。
作業用の Directory を作成します

mkdir ~/javaspark
mkdir -p ~/javaspark/src/main/java

Java のソースコードを作成します。

cat <<'EOF' > ~/javaspark/src/main/java/SimpleApp.java
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/home/opc/sparkinstall/spark-2.4.4-bin-hadoop2.7/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("=================== start ====================================");
    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    System.out.println("=================== end   ====================================");

    spark.stop();
  }
}
EOF

Java のソースコードから、Spark の依存関係を含めた JAR ファイルを作成していきます。Apache Maven の pom.xml を作成して、依存関係を明示的に指定します。
公式Document に記載はなかったのですが、Build を1.8と指定しないと自分の環境では動作しませんでした。

cat <<'EOF' > ~/javaspark/pom.xml
<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.4.4</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>
EOF

JARファイルを作成します。Maven のコマンドで作成します。

cd ~/javaspark
mvn package

実行例

[opc@spark01-63591 javaspark]$ mvn package
[INFO] Scanning for projects...
[WARNING] 
[WARNING] Some problems were encountered while building the effective model for edu.berkeley:simple-project:jar:1.0
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-compiler-plugin is missing. @ line 18, column 15
[WARNING] 
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING] 
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING] 
[INFO] 
[INFO] --------------------< edu.berkeley:simple-project >---------------------
[INFO] Building Simple Project 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ simple-project ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/opc/javaspark/src/main/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ simple-project ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ simple-project ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /home/opc/javaspark/src/test/resources
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ simple-project ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ simple-project ---
[INFO] No tests to run.
[INFO] 
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ simple-project ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.820 s
[INFO] Finished at: 2020-04-20T12:09:30Z
[INFO] ------------------------------------------------------------------------

JAR ファイルを使って、Spark 上でデータ処理を行います。

cd ~/javaspark
$SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar

実行例
末尾の方で、============= start =========== という表示が確認できます。

[opc@spark01-63591 javaspark]$ $SPARK_HOME/bin/spark-submit \
>   --class "SimpleApp" \
>   --master local[4] \
>   target/simple-project-1.0.jar
20/04/20 12:09:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/04/20 12:09:43 INFO SparkContext: Running Spark version 2.4.4
20/04/20 12:09:43 INFO SparkContext: Submitted application: Simple Application
20/04/20 12:09:43 INFO SecurityManager: Changing view acls to: opc
20/04/20 12:09:43 INFO SecurityManager: Changing modify acls to: opc
20/04/20 12:09:43 INFO SecurityManager: Changing view acls groups to: 
20/04/20 12:09:43 INFO SecurityManager: Changing modify acls groups to: 
20/04/20 12:09:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(opc); groups with view permissions: Set(); users  with modify permissions: Set(opc); groups with modify permissions: Set()
20/04/20 12:09:43 INFO Utils: Successfully started service 'sparkDriver' on port 40705.
20/04/20 12:09:44 INFO SparkEnv: Registering MapOutputTracker
20/04/20 12:09:44 INFO SparkEnv: Registering BlockManagerMaster
20/04/20 12:09:44 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/04/20 12:09:44 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/04/20 12:09:44 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8a791ca5-10ac-4414-8714-7607ba53665a
20/04/20 12:09:44 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/04/20 12:09:44 INFO SparkEnv: Registering OutputCommitCoordinator
20/04/20 12:09:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/04/20 12:09:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:4040
20/04/20 12:09:44 INFO SparkContext: Added JAR file:/home/opc/javaspark/target/simple-project-1.0.jar at spark://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:40705/jars/simple-project-1.0.jar with timestamp 1587384584380
20/04/20 12:09:44 INFO Executor: Starting executor ID driver on host localhost
20/04/20 12:09:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 38156.
20/04/20 12:09:44 INFO NettyBlockTransferService: Server created on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156
20/04/20 12:09:44 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/04/20 12:09:44 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark01-63591.pubsubnet01.testvcn.oraclevcn.com, 38156, None)
20/04/20 12:09:44 INFO BlockManagerMasterEndpoint: Registering block manager spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 with 366.3 MB RAM, BlockManagerId(driver, spark01-63591.pubsubnet01.testvcn.oraclevcn.com, 38156, None)
20/04/20 12:09:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark01-63591.pubsubnet01.testvcn.oraclevcn.com, 38156, None)
20/04/20 12:09:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark01-63591.pubsubnet01.testvcn.oraclevcn.com, 38156, None)
20/04/20 12:09:44 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/opc/javaspark/spark-warehouse').
20/04/20 12:09:44 INFO SharedState: Warehouse path is 'file:/home/opc/javaspark/spark-warehouse'.
20/04/20 12:09:45 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/04/20 12:09:47 INFO FileSourceStrategy: Pruning directories with: 
20/04/20 12:09:47 INFO FileSourceStrategy: Post-Scan Filters: 
20/04/20 12:09:47 INFO FileSourceStrategy: Output Data Schema: struct<value: string>
20/04/20 12:09:47 INFO FileSourceScanExec: Pushed Filters: 
20/04/20 12:09:49 INFO CodeGenerator: Code generated in 334.181813 ms
20/04/20 12:09:49 INFO CodeGenerator: Code generated in 31.047449 ms
20/04/20 12:09:49 INFO CodeGenerator: Code generated in 9.526939 ms
20/04/20 12:09:49 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 282.8 KB, free 366.0 MB)
20/04/20 12:09:49 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.3 KB, free 366.0 MB)
20/04/20 12:09:49 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 23.3 KB, free: 366.3 MB)
20/04/20 12:09:49 INFO SparkContext: Created broadcast 0 from count at SimpleApp.java:11
20/04/20 12:09:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
20/04/20 12:09:49 INFO SparkContext: Starting job: count at SimpleApp.java:11
20/04/20 12:09:49 INFO DAGScheduler: Registering RDD 7 (count at SimpleApp.java:11)
20/04/20 12:09:49 INFO DAGScheduler: Got job 0 (count at SimpleApp.java:11) with 1 output partitions
20/04/20 12:09:49 INFO DAGScheduler: Final stage: ResultStage 1 (count at SimpleApp.java:11)
20/04/20 12:09:49 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
20/04/20 12:09:49 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
20/04/20 12:09:49 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at count at SimpleApp.java:11), which has no missing parents
20/04/20 12:09:49 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 17.5 KB, free 366.0 MB)
20/04/20 12:09:49 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.1 KB, free 366.0 MB)
20/04/20 12:09:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 8.1 KB, free: 366.3 MB)
20/04/20 12:09:49 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1161
20/04/20 12:09:49 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at count at SimpleApp.java:11) (first 15 tasks are for partitions Vector(0))
20/04/20 12:09:49 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
20/04/20 12:09:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 8270 bytes)
20/04/20 12:09:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/04/20 12:09:49 INFO Executor: Fetching spark://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:40705/jars/simple-project-1.0.jar with timestamp 1587384584380
20/04/20 12:09:49 INFO TransportClientFactory: Successfully created connection to spark01-63591.pubsubnet01.testvcn.oraclevcn.com/10.0.0.3:40705 after 52 ms (0 ms spent in bootstraps)
20/04/20 12:09:49 INFO Utils: Fetching spark://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:40705/jars/simple-project-1.0.jar to /tmp/spark-cad7154a-3020-4904-ae69-875804c90ba9/userFiles-565e16b2-fd2e-4765-9951-05078631d982/fetchFileTemp1448340344401509934.tmp
20/04/20 12:09:49 INFO Executor: Adding file:/tmp/spark-cad7154a-3020-4904-ae69-875804c90ba9/userFiles-565e16b2-fd2e-4765-9951-05078631d982/simple-project-1.0.jar to class loader
20/04/20 12:09:49 INFO FileScanRDD: Reading File path: file:///home/opc/sparkinstall/spark-2.4.4-bin-hadoop2.7/README.md, range: 0-3952, partition values: [empty row]
20/04/20 12:09:49 INFO CodeGenerator: Code generated in 17.063974 ms
20/04/20 12:09:50 INFO MemoryStore: Block rdd_2_0 stored as values in memory (estimated size 4.5 KB, free 366.0 MB)
20/04/20 12:09:50 INFO BlockManagerInfo: Added rdd_2_0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 4.5 KB, free: 366.3 MB)
20/04/20 12:09:50 INFO CodeGenerator: Code generated in 6.558283 ms
20/04/20 12:09:50 INFO CodeGenerator: Code generated in 31.43943 ms
20/04/20 12:09:50 INFO ContextCleaner: Cleaned accumulator 1
20/04/20 12:09:50 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2057 bytes result sent to driver
20/04/20 12:09:50 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 541 ms on localhost (executor driver) (1/1)
20/04/20 12:09:50 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
20/04/20 12:09:50 INFO DAGScheduler: ShuffleMapStage 0 (count at SimpleApp.java:11) finished in 0.673 s
20/04/20 12:09:50 INFO DAGScheduler: looking for newly runnable stages
20/04/20 12:09:50 INFO DAGScheduler: running: Set()
20/04/20 12:09:50 INFO DAGScheduler: waiting: Set(ResultStage 1)
20/04/20 12:09:50 INFO DAGScheduler: failed: Set()
20/04/20 12:09:50 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at count at SimpleApp.java:11), which has no missing parents
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.1 KB, free 366.0 MB)
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.8 KB, free 366.0 MB)
20/04/20 12:09:50 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 3.8 KB, free: 366.3 MB)
20/04/20 12:09:50 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1161
20/04/20 12:09:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at count at SimpleApp.java:11) (first 15 tasks are for partitions Vector(0))
20/04/20 12:09:50 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
20/04/20 12:09:50 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 7767 bytes)
20/04/20 12:09:50 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
20/04/20 12:09:50 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks including 1 local blocks and 0 remote blocks
20/04/20 12:09:50 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
20/04/20 12:09:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1782 bytes result sent to driver
20/04/20 12:09:50 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 42 ms on localhost (executor driver) (1/1)
20/04/20 12:09:50 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
20/04/20 12:09:50 INFO DAGScheduler: ResultStage 1 (count at SimpleApp.java:11) finished in 0.056 s
20/04/20 12:09:50 INFO DAGScheduler: Job 0 finished: count at SimpleApp.java:11, took 0.823528 s
20/04/20 12:09:50 INFO SparkContext: Starting job: count at SimpleApp.java:12
20/04/20 12:09:50 INFO DAGScheduler: Registering RDD 15 (count at SimpleApp.java:12)
20/04/20 12:09:50 INFO DAGScheduler: Got job 1 (count at SimpleApp.java:12) with 1 output partitions
20/04/20 12:09:50 INFO DAGScheduler: Final stage: ResultStage 3 (count at SimpleApp.java:12)
20/04/20 12:09:50 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 2)
20/04/20 12:09:50 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 2)
20/04/20 12:09:50 INFO DAGScheduler: Submitting ShuffleMapStage 2 (MapPartitionsRDD[15] at count at SimpleApp.java:12), which has no missing parents
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 17.5 KB, free 365.9 MB)
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 8.1 KB, free 365.9 MB)
20/04/20 12:09:50 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 8.1 KB, free: 366.3 MB)
20/04/20 12:09:50 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1161
20/04/20 12:09:50 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 2 (MapPartitionsRDD[15] at count at SimpleApp.java:12) (first 15 tasks are for partitions Vector(0))
20/04/20 12:09:50 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
20/04/20 12:09:50 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 8270 bytes)
20/04/20 12:09:50 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
20/04/20 12:09:50 INFO BlockManager: Found block rdd_2_0 locally
20/04/20 12:09:50 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1971 bytes result sent to driver
20/04/20 12:09:50 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 19 ms on localhost (executor driver) (1/1)
20/04/20 12:09:50 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
20/04/20 12:09:50 INFO DAGScheduler: ShuffleMapStage 2 (count at SimpleApp.java:12) finished in 0.031 s
20/04/20 12:09:50 INFO DAGScheduler: looking for newly runnable stages
20/04/20 12:09:50 INFO DAGScheduler: running: Set()
20/04/20 12:09:50 INFO DAGScheduler: waiting: Set(ResultStage 3)
20/04/20 12:09:50 INFO DAGScheduler: failed: Set()
20/04/20 12:09:50 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[18] at count at SimpleApp.java:12), which has no missing parents
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 7.1 KB, free 365.9 MB)
20/04/20 12:09:50 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 3.8 KB, free 365.9 MB)
20/04/20 12:09:50 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on spark01-63591.pubsubnet01.testvcn.oraclevcn.com:38156 (size: 3.8 KB, free: 366.2 MB)
20/04/20 12:09:50 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1161
20/04/20 12:09:50 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[18] at count at SimpleApp.java:12) (first 15 tasks are for partitions Vector(0))
20/04/20 12:09:50 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
20/04/20 12:09:50 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, localhost, executor driver, partition 0, ANY, 7767 bytes)
20/04/20 12:09:50 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
20/04/20 12:09:50 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks including 1 local blocks and 0 remote blocks
20/04/20 12:09:50 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
20/04/20 12:09:50 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3). 1782 bytes result sent to driver
20/04/20 12:09:50 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 11 ms on localhost (executor driver) (1/1)
20/04/20 12:09:50 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 
20/04/20 12:09:50 INFO DAGScheduler: ResultStage 3 (count at SimpleApp.java:12) finished in 0.019 s
20/04/20 12:09:50 INFO DAGScheduler: Job 1 finished: count at SimpleApp.java:12, took 0.056783 s
=================== start ====================================
Lines with a: 62, lines with b: 31
=================== end   ====================================
20/04/20 12:09:50 INFO SparkUI: Stopped Spark web UI at http://spark01-63591.pubsubnet01.testvcn.oraclevcn.com:4040
20/04/20 12:09:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/04/20 12:09:50 INFO MemoryStore: MemoryStore cleared
20/04/20 12:09:50 INFO BlockManager: BlockManager stopped
20/04/20 12:09:50 INFO BlockManagerMaster: BlockManagerMaster stopped
20/04/20 12:09:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/04/20 12:09:50 INFO SparkContext: Successfully stopped SparkContext
20/04/20 12:09:50 INFO ShutdownHookManager: Shutdown hook called
20/04/20 12:09:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-cad7154a-3020-4904-ae69-875804c90ba9
20/04/20 12:09:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-3d2226da-d703-44a3-8e80-7901a73c37aa

まとめ

Spark では、Dataset という抽象的な概念で多くのデータを簡単に扱えることを学びました。Dataset の中で特定の文字が含まれているものでフィルターといった便利なAPIが備わっています。
チュートリアルで出会った API しか学んでいないですが、他にも便利な API がたくさんあるのかなと思っています。

より深く学んでいきたい場合は公式Documentを参照すると良いでしょう。
https://spark.apache.org/docs/2.4.4/quick-start.html#where-to-go-from-here

付録 : READMEの中身

[opc@spark01-63591 spark-2.4.4-bin-hadoop2.7]$ cat README.md 
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>


## Online Documentation

You can find the latest Spark documentation, including a programming
guide, on the [project web page](http://spark.apache.org/documentation.html).
This README file only contains basic setup instructions.

## Building Spark

Spark is built using [Apache Maven](http://maven.apache.org/).
To build Spark and its example programs, run:

    build/mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.)

You can build Spark using more than one thread by using the -T option with Maven, see ["Parallel builds in Maven 3"](https://cwiki.apache.org/confluence/display/MAVEN/Parallel+builds+in+Maven+3).
More detailed documentation is available from the project site, at
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).

For general development tips, including info on developing Spark using an IDE, see ["Useful Developer Tools"](http://spark.apache.org/developer-tools.html).

## Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

    ./bin/spark-shell

Try the following command, which should return 1000:

    scala> sc.parallelize(1 to 1000).count()

## Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

    ./bin/pyspark

And run the following command, which should also return 1000:

    >>> sc.parallelize(range(1000)).count()

## Example Programs

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> [params]`. For example:

    ./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the `examples`
package. For instance:

    MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

## Running Tests

Testing first requires [building Spark](#building-spark). Once Spark is built, tests
can be run using:

    ./dev/run-tests

Please see the guidance on how to
[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).

There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md

## A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at
["Specifying the Hadoop Version and Enabling YARN"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version-and-enabling-yarn)
for detailed guidance on building for a particular distribution of Hadoop, including
building for particular Hive and Hive Thriftserver distributions.

## Configuration

Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)
in the online documentation for an overview on how to configure Spark.

## Contributing

Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)
for information on how to get started contributing to the project.

参考URL

Spark 2.4.4 の Document
https://spark.apache.org/docs/2.4.4/

Spark Document
https://spark.apache.org/

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?