1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

SparkR で timestamp 型のカラムがうまく表示されない不具合を魔改造でどうにかする #rstatsj

Last updated at Posted at 2015-07-29

Spark 1.5.0 でバグ修正されたようです。2015/09/09

今までファイルで扱っていたアクセスログをすべて Parquet にして SparkR で集計するようにしています。
しかし、DataFrame に java.sql.Timestamp 由来のカラムが一つでもあると、SparkR がエラーを出します。

ちょっと再現してみましょう。

まずは、Scala で java.sql.Timestamp のカラムだけからなる DataFrame を作り、Parquet で保存します。

spark-shell
$ spark-shell
scala> import java.sql.Timestamp
scala> case class TimestampTest(timestamp: Timestamp)
scala> import sqlContext.implicits._
scala> val rdd = sc.parallelize(1 to 3).map(i => TimestampTest(Timestamp.valueOf("2015-01-01 00:00:0" + i)))
scala> val dataframe = rdd.toDF
scala> dataframe.write.parquet("timestamp-test.parquet")

これを R 側から読み込んで DataFrame に戻します。

R
library(SparkR)

sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)

df <- parquetFile(sqlContext, "timestamp-test.parquet")
print(df)
結果
DataFrame[timestamp:timestamp]

ちゃんと DataFrame として読み込めたようです。

しかし、この DataFrame に対して head() をしてみると、

R
head(df)
Error in as.data.frame.default(x[[i]], optional = TRUE) : 
  cannot coerce class ""jobj"" to a data.frame

エラーが出ます。

原因は、SparkR では timestamp 型として java.sql.Time のみをサポートしており、java.sql.Timestamp は Java オブジェクトとしてそのまま保持されてしまうためのようです。
しかし、Parquet は timestamp 型として java.sql.Timestamp しかサポートしておらず、java.sql.Time では保存できません。

これは困りました。

しょうがないので、SparkR を魔改造して collect() 関数の中身を、java.sql.Timestamp を見つけたら POSIXct へ変換するように書き換えてしまいましょう。
これによって、head()take() などの関連する関数の挙動も変わります。

R
# Override SparkR::collect --------------------------------------------------------  
collect <- function(x, stringsAsFactors = FALSE) {
  # listCols is a list of raw vectors, one per column
  listCols <- SparkR:::callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
  cols <- lapply(listCols, function(col) {
    objRaw <- rawConnection(col)
    numRows <- SparkR:::readInt(objRaw)
    col <- SparkR:::readCol(objRaw, numRows)
    close(objRaw)
    ### begin added area to read Timestamp ###
    if(is.list(col) && length(col) > 0) {
      obj <- col[[1]]
      class <- SparkR:::callJMethod(obj, "getClass")
      class_name <- SparkR:::callJMethod(class, "getName")
      if(class_name == "java.sql.Timestamp") {
        times <- lapply(col, function(x) {
          SparkR:::callJMethod(x, "getTime")
        })
        times <- unlist(times, use.names = FALSE) / 1000
        col <- as.POSIXct(times / 1000, origin = "1970-01-01")
      }
    }
    ### end added area ###
    col
  })
  names(cols) <- columns(x)
  do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
}
SparkR_env <- asNamespace("SparkR")
environment(collect) <- SparkR_env
invisible(eval(setMethod("collect", signature(x = "DataFrame"), collect), SparkR_env))

それでは再び head() を実行してみましょう。

R
head(df)
結果
            timestamp
1 2015-01-01 00:00:01
2 2015-01-01 00:00:02
3 2015-01-01 00:00:03

できました!

というわけで、この機能を SparkRext パッケージに追加しました。

Enjoy!

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?