8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Spark の JdbcRDD を使ってみた結果

Last updated at Posted at 2015-03-19

概要

前回の投稿で Apache Spark 使ってアクセスログを解析して、CSVファイル出力をやってみましたが、
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。

今度はアクセスログじゃなくてMySQLのデータを Apache Spark 使って集計出来るかを試してみました。

経緯

user 1 --- n entry

のようなテーブル関連で各userレコードに関連するentryの数とそのentryのキーなどの情報を知りたくて、
またCSVファイル(w)で出力してチーム内で共有しようと思ったのです。
ただ、userテーブルのレコード数が80万件近くあり、普通に slick 使った集計スクリプトだと、クソ遅くてやってられん。(← ク○コードだからかも。。)

そこで。。
分散処理できないかなー

Apache Spark でできないかなー

Spark SQL ってものがあるし、MySQLのデータインポートとかしたらできるのかなー

JdbcRDDっていうのがあるじゃん!
https://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.rdd.JdbcRDD

やってみる

実装

お試しでやってみたかったので、シンプルに下記のようなテーブルでJdbcRDDがどのように動作するか試してみます。

sample テーブル

id token
28 token1
29 token2
... ...
842105 tokenx

レコード数は下記の通りです。

mysql> select count(*) from sample;
+----------+
| count(*) |
+----------+
|   841830 |
+----------+

実装してみたコード

Invastigation.scala
package sample

import java.sql.{Connection, DriverManager, ResultSet}

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.JdbcRDD

object Invastigation {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Invastigation")
    val context = new SparkContext(conf)

    val dbDriver = "com.mysql.jdbc.Driver"
    val dbUrl = "jdbc:mysql://localhost:3306/jdbcrdd?useUnicode=true&characterEncoding=UTF-8"
    val dbUsername = "hogehoge"
    val dbPassword = "piyopiyo"

    val jdbcConnection = () => {
      Class.forName(dbDriver).newInstance
      DriverManager.getConnection(dbUrl, dbUsername, dbPassword)
    }

    val mysqlRDD = new JdbcRDD(
      context,
      jdbcConnection,
      "select id, token from sample where ? <= id and id >= ?",
      28,
      842105,
      10,
      r => r.getLong("id") + ", " + r.getString("token")
    )

    val results = mysqlRDD.collect.toList
    println(s"results size = ${results.size}")
  }

}

実行

上記のコードを実行して前回と同じようにspark-submit使って実行してみます。

$ spark-submit --class sample.Invastigation --master "local[2]" --jars /path/to/mysql/mysql-connector-
java/jars/mysql-connector-java-5.1.33.jar ${PROJECT_ROOT}/target/scala-2.11/invastigation_2.11-1.0.0.jar

ここで注意しなくてはいけないのは、
今回パーティション数(JdbcRDDをnewするときに渡す第5引数)を 10 で指定しましたが、
レコード数が多いからか OutOfMemoryError が発生してしまいました。
heap size が足りないとのことで、下記の対応をしました。(Mac OSXの場合)

$ cd /usr/local/Cellar/apache-spark/1.2.0/libexec/conf
$ cp spark-defaults.conf.template spark-defaults.conf
$ vi spark-defaults.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
 spark.driver.memory              2g ← コメントアウト外した
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

結果

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

...(snip)...

15/03/12 10:24:09 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/03/12 10:24:09 INFO DAGScheduler: Stage 0 (collect at Invastigation.scala:37) finished in 26.398 s
15/03/12 10:24:12 INFO DAGScheduler: Job 0 finished: collect at Invastigation.scala:37, took 29.914454 s
results size = 3089533

えっ?(つд⊂)ゴシゴシ
結果のサイズが 3089533 ってどうゆうこと?!
sample テーブルのカウント数と一致する
と思ってたのに。。

パーティション数を 1 に変更して再度試してみると。。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

...(snip)...

results size = 0

今度は 0 になった!Σ(゚д゚)

まとめ

半日くらい色々試してみましたが、
JdbcRDD のリストのサイズと実テーブルのカウント数が合わない現象の原因は突き止められず。
実際どういったSQLクエリが実行されているか分かればよかったんですが、そこまでできずタイムアウトです。。

JdbcRDD 使っていたり、知っていたりする人、コメントくだしあ(´・ω・`)

けっきょく、JdbcRDD使ったコードを載せて終わってしまった。。

追記(2015/03/19)

上記のカウント違いは単純にSQLの where 句の条件が違うというくだらないミスでした。。

select id, token from sample where ? <= id and id <= ?

こうしたらカウント数が一致しました。
(@nnn_stream さんありがとうございました!)

8
7
4

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?