概要
前回の投稿で Apache Spark 使ってアクセスログを解析して、CSVファイル出力をやってみましたが、
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。
今度はアクセスログじゃなくてMySQLのデータを Apache Spark 使って集計出来るかを試してみました。
経緯
user 1 --- n entry
のようなテーブル関連で各userレコードに関連するentryの数とそのentryのキーなどの情報を知りたくて、
またCSVファイル(w)で出力してチーム内で共有しようと思ったのです。
ただ、userテーブルのレコード数が80万件近くあり、普通に slick 使った集計スクリプトだと、クソ遅くてやってられん。(← ク○コードだからかも。。)
そこで。。
分散処理できないかなー
↓
Apache Spark でできないかなー
↓
Spark SQL ってものがあるし、MySQLのデータインポートとかしたらできるのかなー
↓
JdbcRDDっていうのがあるじゃん!
https://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.rdd.JdbcRDD
↓
やってみる
実装
お試しでやってみたかったので、シンプルに下記のようなテーブルでJdbcRDDがどのように動作するか試してみます。
sample テーブル
id | token |
---|---|
28 | token1 |
29 | token2 |
... | ... |
842105 | tokenx |
レコード数は下記の通りです。
mysql> select count(*) from sample;
+----------+
| count(*) |
+----------+
| 841830 |
+----------+
実装してみたコード
package sample
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.rdd.JdbcRDD
object Invastigation {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Invastigation")
val context = new SparkContext(conf)
val dbDriver = "com.mysql.jdbc.Driver"
val dbUrl = "jdbc:mysql://localhost:3306/jdbcrdd?useUnicode=true&characterEncoding=UTF-8"
val dbUsername = "hogehoge"
val dbPassword = "piyopiyo"
val jdbcConnection = () => {
Class.forName(dbDriver).newInstance
DriverManager.getConnection(dbUrl, dbUsername, dbPassword)
}
val mysqlRDD = new JdbcRDD(
context,
jdbcConnection,
"select id, token from sample where ? <= id and id >= ?",
28,
842105,
10,
r => r.getLong("id") + ", " + r.getString("token")
)
val results = mysqlRDD.collect.toList
println(s"results size = ${results.size}")
}
}
実行
上記のコードを実行して前回と同じようにspark-submit使って実行してみます。
$ spark-submit --class sample.Invastigation --master "local[2]" --jars /path/to/mysql/mysql-connector-
java/jars/mysql-connector-java-5.1.33.jar ${PROJECT_ROOT}/target/scala-2.11/invastigation_2.11-1.0.0.jar
ここで注意しなくてはいけないのは、
今回パーティション数(JdbcRDDをnewするときに渡す第5引数)を 10 で指定しましたが、
レコード数が多いからか OutOfMemoryError が発生してしまいました。
heap size が足りないとのことで、下記の対応をしました。(Mac OSXの場合)
$ cd /usr/local/Cellar/apache-spark/1.2.0/libexec/conf
$ cp spark-defaults.conf.template spark-defaults.conf
$ vi spark-defaults.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 2g ← コメントアウト外した
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
結果
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
...(snip)...
15/03/12 10:24:09 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/03/12 10:24:09 INFO DAGScheduler: Stage 0 (collect at Invastigation.scala:37) finished in 26.398 s
15/03/12 10:24:12 INFO DAGScheduler: Job 0 finished: collect at Invastigation.scala:37, took 29.914454 s
results size = 3089533
えっ?(つд⊂)ゴシゴシ
結果のサイズが 3089533 ってどうゆうこと?!
sample テーブルのカウント数と一致する
と思ってたのに。。
パーティション数を 1 に変更して再度試してみると。。
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
...(snip)...
results size = 0
今度は 0 になった!Σ(゚д゚)
まとめ
半日くらい色々試してみましたが、
JdbcRDD のリストのサイズと実テーブルのカウント数が合わない現象の原因は突き止められず。
実際どういったSQLクエリが実行されているか分かればよかったんですが、そこまでできずタイムアウトです。。
JdbcRDD 使っていたり、知っていたりする人、コメントくだしあ(´・ω・`)
けっきょく、JdbcRDD使ったコードを載せて終わってしまった。。
追記(2015/03/19)
上記のカウント違いは単純にSQLの where 句の条件が違うというくだらないミスでした。。
select id, token from sample where ? <= id and id <= ?
こうしたらカウント数が一致しました。
(@nnn_stream さんありがとうございました!)