search
LoginSignup
0

More than 1 year has passed since last update.

posted at

Spark-3.0.1(Scala)で前処理を理解しよう

はじめに

スターフェスティバルで色々やっている竹澤(a.k.a ytake)です。(今更!)
今回はここ数年しばらくやっているデータ処理系からSparkを使って前処理などを行うための
Scala(2.12.*)を使った基本的な実装方法を紹介します。
Apache Spark自体については解説しませんので、公式サイトやネット上のさまざまなエントリを参照ください!

例として、https://grouplens.org/datasets/movielens/ にあるデータセットを利用します。
映画の評価などのデータセットで、強調フィルタリングなどにもばっちりなデータセットです。

今回はsmallのデータを利用しますので、下記のコマンドで任意の場所にダウンロードしてください。

$ cd 任意のディレクトリ
$ wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
$ unzip ml-latest-small.zip

sbtでは下記の様に記述しておきます。

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.0.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "3.0.1" % "provided",
  "org.apache.spark" %% "spark-mllib" % "3.0.1" % "provided",
)

それではデータセットを使って実際にデータ処理をおこなっていきましょう!

Spark アプリケーション実装例

package jp.co.stafes

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

abstract class AbstractSparkApp {
  protected var master: String;

  protected var appName: String

  protected val config: ConfigFinder.type = ConfigFinder

  protected def context(): SparkContext = {
    val conf = new SparkConf().setAppName(appName).setMaster(master)
    conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")
    new SparkContext(conf)
  }

  protected def sparkSession(): SparkSession = {
    val spark = context()
    spark.setCheckpointDir("/tmp/example-checkpoint")
    SparkSession.builder.master(master)
      .appName(appName)
      .getOrCreate
  }
}

今回はudf(User Defined Function)を使ってカラムの変更をいくつかおこないますので、
Spark3.0系でdeprecatedとなってしまった、udfでの戻りの型変更を実行できる様に
spark.sql.legacy.allowUntypedScalaUDFtrueとしています。
udfはユーザ定義関数を使ってSparkクラスタで分散処理をするための機能で、
SparkSQLなどでデータの加工時に利用することも多くありますので、
初めての方は使い方を覚えておくといいでしょう!
(使いすぎたり、計算量を無視した実装をすると遅くなりますので要注意)

package jp.co.stafes

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.types.{ArrayType, LongType, StringType}
import java.io.File

object ExampleApp extends AbstractSparkApp {

  val ratingDataPath: String = "任意のディレクトリ/ratings.csv"
  val movieDataPath: String = "任意のディレクトリ/movies.csv"
  val linkDataPath: String = "任意のディレクトリ/links.csv"

  override protected var master: String = "local[*]"
  override protected var appName: String = "example-recommend"

  val ss: SparkSession = sparkSession()

  def main(args: Array[String]): Unit = {
    val ratings = readCsv(ss, ratingDataPath)
    ratings.cache
    ratings.show(5)
  }

  /**
   * データセットcsvファイルを取得
   */
  private def getFilePath(path: String): String = {
    val fs = new File(path)
    if (!fs.exists) {
      throw new RuntimeException("machine learning data file is gone.")
    }
    fs.getAbsoluteFile.toString
  }

  private def readCsv(spark: SparkSession, path: String): DataFrame = {
    spark.read.options(Map(
      "header" -> "true",
      "inferSchema" -> "true"
    )).csv(getFilePath(path))
  }
}

まずはデータセットをシンプルにデータフレームで読みこむ様にします。
ここではプロパティでデータセットのファイルを指定、リソース指定などをしていますが、
実際に実装する場合はconfやHadoop Yarnなどを使って外部から指定する様にしてください。

ここまでをコンパイルして実行してみましょう。
Sparkで手っ取り早く実行するには下記の様に実行します。

$ spark-submit --class クラス名 jar

*商用では環境に合わせてメモリなども指定してください

実行すると次のように出力されます。

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+

csvをそのまま読みこんでいるだけのものです。
このデータを元に値の変更をやってみましょう。

mainの中に次の物を追加します。

   val selectedRatings = ratings.select(
      col("userId"),
      col("movieId"),
      col("rating"),
      (col("timestamp").cast(LongType) * 1000).alias("timestamp")
    )
    selectedRatings.show(5)

csvのデータフレームで取得したいカラムを記述し、timestampの値を変更しています。
これはSparkSQLで記述することもできますので、好みに応じて選んでください。

+------+-------+------+------------+
|userId|movieId|rating|   timestamp|
+------+-------+------+------------+
|     1|      1|   4.0|964982703000|
|     1|      3|   4.0|964981247000|
|     1|      6|   4.0|964982224000|
|     1|     47|   5.0|964983815000|
|     1|     50|   5.0|964982931000|
+------+-------+------+------------+

これを実行すると、上記の通りにtimestampが*1000された値になっているはずです。
これでカラムの値を変更することができました。
他のカラムの値も同様に変更することができますので、
実装方法を覚えておくといいでしょう!

次に動画リストをそのまま読み込んでみます。

val rawMovies = readCsv(ss, movieDataPath)
rawMovies.show(5)

読み込むと中身は下記の様になっています。

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+

このまま使うこともできそうですが、
ジャンルなどは機械学習で強調フィルタリングに使えそうです。
またタイトルも公開年とタイトルが混ざっています。
これをデータ処理でそのまま使うには少し厳しいです。

まずはgenresカラムを配列に変換します。
このカラムは|でセパレートすることができます。
配列変換にはudf(ユーザー定義関数)を用意すれば良いでしょう。

val extractGenres: UserDefinedFunction = udf((value: String) => value.toLowerCase.split('|'), ArrayType(StringType))

記述した関数を動画リスト データフレームで利用します。

val rawMovies = readCsv(ss, movieDataPath)
rawMovies.show(5)
val m = rawMovies.select(
  col("movieId"),
  col("title"),
  extractGenres(col("genres")).as("genres")
)
m.show(5)

これを実行すると次の様に配列に変換されます。

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|[adventure, anima...|
|      2|      Jumanji (1995)|[adventure, child...|
|      3|Grumpier Old Men ...|   [comedy, romance]|
|      4|Waiting to Exhale...|[comedy, drama, r...|
|      5|Father of the Bri...|            [comedy]|
+-------+--------------------+--------------------+

これでジャンルをタグ的に利用することができるようになりました。
続いてtitleを分解します。

公開年は "\\(\\d{4}\\)"で対応できそうです。
正規表現を利用してマッチするものがあるかどうか調べて対応します。

package jp.co.stafes

object Extract {

  /**
   * @param title タイトル
   * @param year 公開年
   */
  case class ExtractResult(title: String, year: String)

  /**
   * 指定したタイトル表記を title, yearに分割
   */
  def titleAndYear(title: String): ExtractResult = {
    val pattern = "\\(\\d{4}\\)".r
    if (pattern.findFirstMatchIn(title).isEmpty) {
      return ExtractResult(title, "1970")
    }
    val mi = pattern.findFirstMatchIn(title).get
    ExtractResult(
      title.splitAt(mi.start)._1.trim,
      mi.group(0).stripPrefix("(").stripSuffix(")").trim
    )
  }
}

ここでは例として、正規表現にマッチするものがあるか findFirstMatchInで調べ、
なければ、タイトルのみのものと判断、公開年を1970固定でExtractResultオブジェクトを返却します。
マッチするものがあれば、タイトルと公開年に分離し、それぞれテキスト操作をおこない
ExtractResultオブジェクトを返却するようにします。

m.sqlContext.udf.register("extractYear", Extract.titleAndYear _)
val movies = m.sqlContext.sql(
  """
     SELECT tmp.movieId,
     tmp.extracts.title AS title,
     tmp.extracts.year AS year,
     tmp.genres
    FROM (SELECT *, extractYear(title) AS extracts FROM raw_movies) AS tmp
  """
)
movies.printSchema()
movies.show(10)

ここではsqlContext.udf.registerを使ってudfをextractYearという関数名で利用できる様に
Extract.titleAndYearを登録します。
SparkSQLを使って実際に取得してみましょう!

+-------+--------------------+----+--------------------+
|movieId|               title|year|              genres|
+-------+--------------------+----+--------------------+
|      1|           Toy Story|1995|[adventure, anima...|
|      2|             Jumanji|1995|[adventure, child...|
|      3|    Grumpier Old Men|1995|   [comedy, romance]|
|      4|   Waiting to Exhale|1995|[comedy, drama, r...|
|      5|Father of the Bri...|1995|            [comedy]|
|      6|                Heat|1995|[action, crime, t...|
|      7|             Sabrina|1995|   [comedy, romance]|
|      8|        Tom and Huck|1995|[adventure, child...|
|      9|        Sudden Death|1995|            [action]|
|     10|           GoldenEye|1995|[action, adventur...|
+-------+--------------------+----+--------------------+

タイトルと公開年がそれぞれ別のカラムになり、ジャンルも配列に変更されました。
これでさまざまなデータ処理に利用できそうな形となりました。

最後に異なるcsvとのjoinを記述してみましょう。

データセットには海外のTMDbのデータが含まれていますので、これとジョインすることができそうです。
外部サービスとの連携データのcsvを使って、結合して一つのテーブルにします。

val links = readCsv(ss, linkDataPath)
val joinedMovieDf = movies.join(links, movies("movieId") === links("movieId"))
  .select(
    movies("movieId"),
    movies("title"),
    movies("year"),
    movies("genres"),
    links("tmdbId")
  )
println(joinedMovieDf.count)
joinedMovieDf.show(10)

結合のジョインは一般的なSQLと同じ様に記述できます。
ここでは動画連携データのcsvデータフレームと、動画データのデータフレームで
mobieIdが同じ物を結合しています。
これを実行すると下記の様に出力されます。

+-------+--------------------+----+--------------------+------+
|movieId|               title|year|              genres|tmdbId|
+-------+--------------------+----+--------------------+------+
|      1|           Toy Story|1995|[adventure, anima...|   862|
|      2|             Jumanji|1995|[adventure, child...|  8844|
|      3|    Grumpier Old Men|1995|   [comedy, romance]| 15602|
|      4|   Waiting to Exhale|1995|[comedy, drama, r...| 31357|
|      5|Father of the Bri...|1995|            [comedy]| 11862|
|      6|                Heat|1995|[action, crime, t...|   949|
|      7|             Sabrina|1995|   [comedy, romance]| 11860|
|      8|        Tom and Huck|1995|[adventure, child...| 45325|
|      9|        Sudden Death|1995|            [action]|  9091|
|     10|           GoldenEye|1995|[action, adventur...|   710|
+-------+--------------------+----+--------------------+------+

もとのデータソースを物理的に変更せずに新たなデータを作ることができました。
このデータを元に機械学習などを実行することができる様になります。
メモリ上にある加工データはhdfsなどにそのまま書き出すことができます。
間違ってもこのまま大量データの処理を一回でしようとはせずに分割して実装・実行する様にしましょう。

ここまでSparkを使った簡単なデータ加工処理を紹介しました。
Sparkはたくさんの機能があり、最近では重宝することも多いものですが
紹介した様なものはデータ処理をする上で基本的なものです。
基本的なものを覚えておくとさまざまな処理に流用することができますので、
覚えて活用してみてください!

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
0