LoginSignup
6
5

More than 5 years have passed since last update.

【資格取得】Databricks Certified Developer - Apache Spark 2.x

Last updated at Posted at 2018-11-29

Databricksから出ているApache Spark資格について自分が行った試験対策を紹介したいと思います。
DatabricksCertifiedDeveloperLogo.png

Databricks Certified Developer - Apache Spark 2.xとは

Databricks社は最初Sparkのプロジェクトを始めたUC Berkeleyの研究陣が創立した会社で、Sparkプロジェクト関連の様々な支援活動を行っています。Sparkの資格はClouderaやHortonworksから出ているものもありますが、やはり元祖のところのものが良いのではーと思い、こちらにチャレンジしてみました。

  • 試験言語:英語
  • $300
  • 40問 / 180分
  • 合格スコアは65%以上
  • プログラミング言語はScala / Pythonの中で選択(コンテンツは一緒)
  • 2.x バージョンは2018年からスタート
  • 受験方法はOnsite / Online Proctorの中で選択
  • 不合格の場合、もう一度無料で受験できる

私の場合は夜中にOnline Proctorによる試験を選択しました。普通Proctor試験だとネットワークで繋がって実際の人により監督されますが、今回は人を介さず(なのでチャットする必要もない)、画像認識技術が適用されているKryterionの専用プログラムでモニタリングされるスタイルだったので、かなり不思議な体験でした。

出題内容

公式ホームページには概略しか公開されてないですが、Databricksのセミナーで紹介された詳細は以下の通りです。

  1. Spark Architecture and Run-time Behavior (30%)

    • Spark cluster components and deployment modes
    • Caching - cache(), persist(), unpersist(), and storage levels
    • Partitioning
      • Initial DataFrame partitioning when reading from data source
      • Repartitioning via coalesce() vs repartition()
      • Controlling number of shuffle partitions
      • Performance
        • Catalyst optimizer
        • Identifying performance bottlenecks in Spark applications
  2. Spark SQL and DataFrame/ DataSet Manipulation (40%)

    • Reading and writing DataFrames
    • Transformations, actions, and other operations
      • Wide vs narrow transformations
    • Joins
      • Supported Types
      • Broadcast Joins
      • Cross Joins
    • Defining and using User Defined Functions (UDFs)
    • Window functions
  3. RDDs and Low-Level APIs (10%)

    • Basic RDD and PairRDD operations
      • Transformations, such as map(), flatMap(), mapValues()
      • Aggregations
      • Actions
      • Joins
    • RDD <-> DataFrame conversions
    • Accumulator & AccumulatorV2
    • Wide Transformations, such as reduceByKey(), groupByKey() etc.
  4. Structured Streaming (10%)

    • No legacy DStream API coverage
    • Standard sources and sinks
    • Fault tolerance guarantees
    • Streaming DataFrame manipulation
      • Aggregation, including using time windows
    • Watermarking
    • Checkpointing
  5. Machine Learning (< 5%)

    • No legacy RDD-based APIs coverage
    • ML Pipeline basics
      • Initial DataFrame
      • Transformers
      • Estimators
    • Model selection
      • Evaluators
      • Parameter grids
    • No knowledge about specific algorithms is required
  6. GraphFrames (< 5%)

    • No GraphX coverage
    • Creating a GraphFrame instance
    • Basic GraphFrame operations
      • inDegrees(), outDegrees()
      • bfs(), shortestPaths()
      • triangleCount()
    • No knowledge about specific algorithms is required

学習方法

RDDなどSpark 1.x時代の古いAPIよりは、主にSpark 2.0以降のStructured API(DataFrame、DataSet、Structed Streaming)に関する知識が問われます。また私が受けた時は、MLlibやGraphFramesに関しましては1問ずつしか出題されなかったため、あまり事前知識のない方は飛ばしても良い気がします。

ドキュメント

Databricksのブログ

書籍(英語)

Spark: The Definitive Guide
http://amzn.asia/d/87vO632
SparkのStructured APIに関して最も詳しく書いてある書籍です。かなり分厚いですが、こちらをメインとして学習を行いました。

High Performance Spark
http://amzn.asia/d/hd9xNvQ

Advanced Analytics with Spark 2nd Edition
http://amzn.asia/d/9eW3RRi

残念ながらSpark 2.0以降に関して詳しく書いてある日本語の書籍は見つけませんでした。その他、韓国語で書いてある書籍も参考にしましたが、日本語での記事のため割愛します。

サンプル問題

1. Which of the following DataFrame operations are wide transformations (that is, they result in a shuffle)?

A. repartition()
B. filter()
C. orderBy()
D. distinct()
E. drop()
F. cache()
2. Which of the following methods are NOT a DataFrame action?

A. limit()
B. foreach()
C. first()
D. printSchema()
E. show()
F. cache()
3. Which of the following statements about Spark accumulator variables is NOT true?

A. For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will be applied only once, meaning that restarted tasks will not update the value. In transformations, each task’s update can be applied more than once if tasks or job stages are re-executed.
B. Accumulators provide a shared, mutable variable that a Spark cluster can safely update on a per-row basis.
C. You can define your own custom accumulator class by extending org.apache.spark.util.AccumulatorV2 in Java or Scala or pyspark.AccumulatorParam in Python.
D. The Spark UI displays all accumulators used by your application.
4. Given an instance of SparkSession named spark, review the following code:  

import org.apache.spark.sql.functions._

val a = Array(1002, 3001, 4002, 2003, 2002, 3004, 1003, 4006)

val b = spark
  .createDataset(a)
  .withColumn("x", col("value") % 1000)

val c = b
  .groupBy(col("x"))
  .agg(count("x"), sum("value"))
  .drop("x")
  .toDF("count", "total")
  .orderBy(col("count").desc, col("total"))
  .limit(1)
  .show()

Which of the following results is correct?

A.      
+-----+-----+
|count|total|
+-----+-----+
|    3| 7006|
+-----+-----+

B.      
+-----+-----+
|count|total|
+-----+-----+
|    1| 3001|
+-----+-----+

C.      
+-----+-----+
|count|total|
+-----+-----+
|    2| 8008|
+-----+-----+
D.      
+-----+-----+
|count|total|
+-----+-----+
|    8|20023|
+-----+-----+
5. Given an instance of SparkSession named spark, which one of the following code fragments executemost quickly and produce a DataFrame with the specified schema? Assume a variable named schema with the correctly structured StructType to represent the DataFrame's schema has already been initialized.

Sample data:
id,firstName,lastName,birthDate,email,country,phoneNumber 1,Pennie,Hirschmann,2017-12-03,ph123@databricks.com,US,+1(123)4567890

Schema:
id: integer
firstName: string
lastName: string
birthDate: timestamp
email: string
county: string
phoneNumber: string

A.      
val df = spark.read
   .option("inferSchema", "true")
   .option("header", "true")
   .csv("/data/people.csv")
B.      
val df = spark.read
   .option("inferSchema", "true")
   .schema(schema)
   .csv("/data/people.csv")
C.      
val df = spark.read
   .schema(schema)
   .option("sep", ",")
   .csv("/data/people.csv")
D.      
val df = spark.read
   .schema(schema)
   .option("header", "true")
   .csv("/data/people.csv")
6. Consider the following DataFrame:

val rawData = Seq(
  (1, 1000, "Apple", 0.76),
  (2, 1000, "Apple", 0.11),
  (1, 2000, "Orange", 0.98),
  (1, 3000, "Banana", 0.24),
  (2, 3000, "Banana", 0.99)
)
val dfA = spark.createDataFrame(rawData).toDF("UserKey", "ItemKey", "ItemName", "Score")

Select the code fragment that produces the following result:
+-------+-----------------------------------------------------------------+
|UserKey|Collection                                                       |
+-------+-----------------------------------------------------------------+
|1      |[[0.98, 2000, Orange], [0.76, 1000, Apple], [0.24, 3000, Banana]]|
|2      |[[0.99, 3000, Banana], [0.11, 1000, Apple]]                      |
+-------+-----------------------------------------------------------------+

A.      
import org.apache.spark.sql.expressions.Window
dfA.withColumn(
    "Collection",
    collect_list(struct("Score", "ItemKey", "ItemName")).over(Window.partitionBy("ItemKey"))
  )
  .select("UserKey", "Collection")
  .show(20, false)

B.      
dfA.groupBy("UserKey")
  .agg(collect_list(struct("Score", "ItemKey", "ItemName")))
  .toDF("UserKey", "Collection")
  .show(20, false)

C.      
dfA.groupBy("UserKey", "ItemKey", "ItemName")
  .agg(sort_array(collect_list(struct("Score", "ItemKey", "ItemName")), false))
  .drop("ItemKey", "ItemName")
  .toDF("UserKey", "Collection")
  .show(20, false)

D.      
dfA.groupBy("UserKey")
  .agg(sort_array(collect_list(struct("Score", "ItemKey", "ItemName")), false))
  .toDF("UserKey", "Collection")
  .show(20, false)
7. tableA is a DataFrame consisting of 20 fields and 40 billion rows of data with a surrogate key field. tableB is a DataFrame functioning as a lookup table for the surrogate key consisting of 2 fields and 5,000 rows. If the in-memory size of tableB is 22MB, what occurs when the following code is executed:

val df = tableA.join(broadcast(tableB), Seq("primary_key"))

A. The broadcast function is non-deterministic, thus a BroadcastHashJoin is likely to occur, but isn't guaranteed to occur.
B. A normal hash join will be executed with a shuffle phase since the broadcast table is greater than the 10MB default threshold and the broadcast command can be overridden silently by the Catalyst optimizer.
C. The contents of tableB will be replicated and sent to each executor to eliminate the need for a shuffle stage during the join.
D. An exception will be thrown due to tableB being greater than the 10MB default threshold for a broadcast join.
8. Consider the following DataFrame:

import org.apache.spark.sql.functions._

val people = Seq(
    ("Ali", 0, Seq(100)),
    ("Barbara", 1, Seq(300, 250, 100)),
    ("Cesar", 1, Seq(350, 100)),
    ("Dongmei", 1, Seq(400, 100)),
    ("Eli", 2, Seq(250)),
    ("Florita", 2, Seq(500, 300, 100)),
    ("Gatimu", 3, Seq(300, 100))
  )
  .toDF("name", "department", "score")

Select the code fragment that produces the following result:
+----------+-------+-------+
|department|   name|highest|
+----------+-------+-------+
|         0|    Ali|    100|
|         1|Dongmei|    400|
|         2|Florita|    500|
|         3| Gatimu|    300|
+----------+-------+-------+

A.      
val maxByDept = people
  .withColumn("score", explode(col("score")))
  .groupBy("department")
  .max("score")
  .withColumnRenamed("max(score)", "highest")

maxByDept
  .join(people, "department")
  .select("department", "name", "highest")
  .orderBy("department")
  .dropDuplicates("department")
  .show()

B.      
people
  .withColumn("score", explode(col("score")))
  .orderBy("department", "score")
  .select(col("name"), col("department"), first(col("score")).as("highest"))
  .show()

C.      
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("department").orderBy(col("score").desc)

people
  .withColumn("score", explode(col("score")))
  .select(
    col("department"),
    col("name"),
    dense_rank().over(windowSpec).alias("rank"),
    max(col("score")).over(windowSpec).alias("highest")
  )
  .where(col("rank") === 1)
  .drop("rank")
  .orderBy("department")
  .show()

D.      
people
  .withColumn("score", explode(col("score")))
  .groupBy("department")
  .max("score")
  .withColumnRenamed("max(score)", "highest")
  .orderBy("department")
  .show()
9. Which of the following standard Structured Streaming sink types are idempotent and can provide end-to-end exactly-once semantics in a Structured Streaming job?

A. Console
B. Kafka
C. File
D. Memory
10. Which of following statements regarding caching are TRUE?

A. The default storage level for a DataFrame is StorageLevel.MEMORY_AND_DISK.
B. The uncache() method evicts a DataFrame from cache.
C. The persist() method immediately loads data from its source to materialize the DataFrame in cache.
D. Explicit caching can decrease application performance by interfering with the Catalyst optimizer's ability to optimize some queries.

答えは公開されてないですが、私が出した正解は以下の通りです。少なくとも80%以上はあってると思います。
1. A, C, D
2. A, D, F
3. D
4. A
5. D
6. D
7. B
8. C
9. B
10. A, D

最後に

実際の試験では半分以上Sparkのコードに関する問題が出てましたため、実際に手を動かしならSparkのコードを実装した経験が重要になると思います。複数選択しないといけない問題も多く、1問にかかる時間も長かったですが、問題数(40問)に比べかなり余裕のある時間(180分)が与えられましたので、時間が足りない印象ではありませんでした。新しくリニューアルされた試験で関連情報も少なかったですが、今まで目を通してなかったところを含めSparkのStructured APIが一通りおさらいできる良い機会でした。

6
5
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5