SparkR overview | Databricks on AWS [2022/6/27時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
SparkRはRからApache sparkを使うための軽量フロントエンドを提供するRパッケージです。また、SparkRではMLlibを用いた分散機械学習をサポートしています。
ノートブックにおけるSparkRの利用
- Spark 2.0以降では、全ての関数呼び出しに明示的に
sqlContext
を引き渡す必要はありません。 - Spark 2.2以降では、SparkRの関数が他の著名なパッケージと名前の競合が起きるため、ノートブックではデフォルトでSparkRはインポートされません。SparkRを使うためには、ノートブックで
library(SparkR)
を呼び出す必要があります。SparkRのセッションは設定済みなので、全てのSparkRの関数は既存のセッションを用いてお使いのクラスターと通信を行います。
spark-submitジョブにおけるSparkRの利用
少々のコード変更を行うことで、spark-submitoジョブとしてDatabricks上でSparkRを使用するスクリプトを実行することができます。サンプルは、Create and run a spark-submit job for R scriptsを参照ください。
SparkRデータフレームの作成
ローカルのR data.frame
、データソース、Spark SQLクエリーからデータフレームを作成することができます。
ローカルのRのdata.frame
から作成
データフレームを作成する最もシンプルな方法はローカルのR data.frame
をSparkDataFrame
に変換することです。特に、SparkDataFrame
を作成するために、ローカルのR data.frame
をcreateDataFrame
に引き渡すことができます。他のほとんどのSparkR関数と同様、createDataFrame
の構文はSpark 2.0で変更されています。以下でコードスニペットのサンプルを参照することができます。他のサンプルに関しては、createDataFrameをご覧ください。
library(SparkR)
df <- createDataFrame(faithful)
# Displays the content of the DataFrame to stdout
head(df)
データソースAPIから作成
データソースからデータフレームを作成する一般的な方法はread.df
です。このメソッドはロードするファイルのパスとデータソースのタイプを受け取ります。SparkRではCSV、JSON、テキスト、Parquetファイルをネイティブでサポートしています。
library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)
Sparkパッケージのデータソースコネクターを追加して作成
Sparkパッケージを通じて、Avroのような著名なファイルフォーマットのデータソースコネクターを見つけ出すことができます。サンプルとしては、Avroファイルをロードするためにspark-avroパッケージを使用することができます。spark-avroパッケージを利用できるかどうかは、お使いのクラスターのイメージのバージョンによります。Avro fileをご覧ください。
最初に既存のdata.frame
を受け取り、Sparkデータフレームに変換し、Avroファイルとして保存します。
require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")
Avroファイルが保存されたことを確認します。
%fs ls /tmp/iris.avro
データを読み戻すために再度spark-avroパッケージを使います。
irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)
また、複数ファイルのフォーマットでデータフレームを保存するために、データソースAPIを使用することができます。例えば、上の例で得たデータフレームをwrite.df
を用いてParquetファイルとして保存します。
write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/iris.parquet
Spark SQLクエリーから作成
また、Spark SQLのクエリーを用いてSparkRデータフレームを作成することもできます。
# Register earlier df as temp view
createOrReplaceTempView(irisDF2, "irisTemp")
# Create a df consisting of only the 'species' column using a Spark SQL query
species <- sql("SELECT species FROM irisTemp")
species
はSparkデータフレームとなります。
データフレームのオペレーション
Sparkデータフレームは構造化データ処理を行うための数多くの関数をサポートしています。こちらは基本的なサンプルです。完全なリストはAPI docsで確認することができます。
行と列の選択
# Import SparkR package if this is a new notebook
require(SparkR)
# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# Select only the "eruptions" column
head(select(df, df$eruptions))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
グルーピング及び集計
Sparkデータフレームはグルーピング後にデータを集計するための数多くの関数をサポートしています。例えば、faithfulデータセットにおける待ち時間のカウントを取ることができます。
head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
列のオペレーション
SparkRでは、データ処理、集計のために列に直接適用できる関数を数多くサポートしています。以下の例では、基本的な代数関数の使い方を示しています。
# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
機械学習
SparkRではほとんどのMLlibアルゴリズムを公開しています。SparkRは内部では、モデルをトレーニングするためにMLlibを使用します。
以下のサンプルでは、SparkRを用いてどのようにガウシアンGLMモデルを構築するのかを示しています。線形回帰を実行するには、ファミリーを"gaussian"
に設定します。SparkML GLM SparkRを使用する際には、手動で操作する必要がないように、カテゴリー変数に対して自動でワンホットエンコーディングを行います。StringとDouble型以外の特徴量に対しては、他のMLlibのコンポーネントとの互換性のために、MLlibのVector特徴量に対してフィッティングを行うことも可能です。
# Create the DataFrame
df <- createDataFrame(iris)
# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")
# Model coefficients are returned in a similar format to R's native glm().
summary(model)