More than 1 year has passed since last update.

概要

前回の投稿で Apache Spark 使ってアクセスログを解析して、CSVファイル出力をやってみましたが、
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。

今度はアクセスログじゃなくてMySQLのデータを Apache Spark 使って集計出来るかを試してみました。

経緯

user 1 --- n entry

のようなテーブル関連で各userレコードに関連するentryの数とそのentryのキーなどの情報を知りたくて、
またCSVファイル(w)で出力してチーム内で共有しようと思ったのです。
ただ、userテーブルのレコード数が80万件近くあり、普通に slick 使った集計スクリプトだと、クソ遅くてやってられん。(← ク○コードだからかも。。)

そこで。。
分散処理できないかなー

Apache Spark でできないかなー

Spark SQL ってものがあるし、MySQLのデータインポートとかしたらできるのかなー

JdbcRDDっていうのがあるじゃん!
https://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.rdd.JdbcRDD

やってみる

実装

お試しでやってみたかったので、シンプルに下記のようなテーブルでJdbcRDDがどのように動作するか試してみます。

sample テーブル

id token
28 token1
29 token2
... ...
842105 tokenx

レコード数は下記の通りです。

mysql> select count(*) from sample;
+----------+
| count(*) |
+----------+
|   841830 |
+----------+

実装してみたコード

Invastigation.scala
package sample

import java.sql.{Connection, DriverManager, ResultSet}

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.JdbcRDD

object Invastigation {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Invastigation")
    val context = new SparkContext(conf)

    val dbDriver = "com.mysql.jdbc.Driver"
    val dbUrl = "jdbc:mysql://localhost:3306/jdbcrdd?useUnicode=true&characterEncoding=UTF-8"
    val dbUsername = "hogehoge"
    val dbPassword = "piyopiyo"

    val jdbcConnection = () => {
      Class.forName(dbDriver).newInstance
      DriverManager.getConnection(dbUrl, dbUsername, dbPassword)
    }

    val mysqlRDD = new JdbcRDD(
      context,
      jdbcConnection,
      "select id, token from sample where ? <= id and id >= ?",
      28,
      842105,
      10,
      r => r.getLong("id") + ", " + r.getString("token")
    )

    val results = mysqlRDD.collect.toList
    println(s"results size = ${results.size}")
  }

}

実行

上記のコードを実行して前回と同じようにspark-submit使って実行してみます。

$ spark-submit --class sample.Invastigation --master "local[2]" --jars /path/to/mysql/mysql-connector-
java/jars/mysql-connector-java-5.1.33.jar ${PROJECT_ROOT}/target/scala-2.11/invastigation_2.11-1.0.0.jar

ここで注意しなくてはいけないのは、
今回パーティション数(JdbcRDDをnewするときに渡す第5引数)を 10 で指定しましたが、
レコード数が多いからか OutOfMemoryError が発生してしまいました。
heap size が足りないとのことで、下記の対応をしました。(Mac OSXの場合)

$ cd /usr/local/Cellar/apache-spark/1.2.0/libexec/conf
$ cp spark-defaults.conf.template spark-defaults.conf
$ vi spark-defaults.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
 spark.driver.memory              2g ← コメントアウト外した
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

結果

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

...(snip)...

15/03/12 10:24:09 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/03/12 10:24:09 INFO DAGScheduler: Stage 0 (collect at Invastigation.scala:37) finished in 26.398 s
15/03/12 10:24:12 INFO DAGScheduler: Job 0 finished: collect at Invastigation.scala:37, took 29.914454 s
results size = 3089533

えっ?(つд⊂)ゴシゴシ
結果のサイズが 3089533 ってどうゆうこと?!
sample テーブルのカウント数と一致する
と思ってたのに。。

パーティション数を 1 に変更して再度試してみると。。

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

...(snip)...

results size = 0

今度は 0 になった!Σ(゚д゚)

まとめ

半日くらい色々試してみましたが、
JdbcRDD のリストのサイズと実テーブルのカウント数が合わない現象の原因は突き止められず。
実際どういったSQLクエリが実行されているか分かればよかったんですが、そこまでできずタイムアウトです。。

JdbcRDD 使っていたり、知っていたりする人、コメントくだしあ(´・ω・`)

けっきょく、JdbcRDD使ったコードを載せて終わってしまった。。

追記(2015/03/19)

上記のカウント違いは単純にSQLの where 句の条件が違うというくだらないミスでした。。

select id, token from sample where ? <= id and id <= ?

こうしたらカウント数が一致しました。
(@nnn_stream さんありがとうございました!)

Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account log in.