Sparkというものを最近になって触る機会があったのでちょっとお試しして見た。
大規模データをいろいろ編集できるらしいがその威力はいかに。。
最下部リンクにてlog集計をされているかたがサンプルあげてくれていたので、すこし編集させて頂き
Mysqlへデータを書き込みされるように作って見ました。
[github]
(https://github.com/hisahisa/apache-spark-csv-sample)
Sparkのバージョンが低かったので、最低限のバージョンまでアップした。
build.sbt
name := "spark_csv_sample"
version := "1.0.0"
scalaVersion := "2.11.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-hdfs" % "2.6.0",
"org.apache.spark" % "spark-core_2.11" % "1.6.0",
"org.apache.spark" %% "spark-sql" % "2.2.0",
"mysql" % "mysql-connector-java" % "5.1.46",
"org.mortbay.jetty" % "jetty" % "6.1.26"
)
MysqlへのDB Insert
SparkCSVSample.scala
private def writeDB(countData: RDD[Option[String]]): Unit = {
// 環境変数使いたいけど一旦はこれで。。
val url = "jdbc:mysql://localhost:3306/nano_planner_dev"
val username = "root"
val password = ""
// パーティション単位で Insert 〜 Closeされるのが正しいとどっかのブログに書いてったので、これでいいかなあ。
countData.foreachPartition(iter => {
using(getDbConnection(url,username,password)) { implicit conn =>
iter.foreach {
case Some(s) =>
s.split(",") match {
case Array(lodDate, url, cnt) =>
//洗い替えできるようにREPLACEを使いました
val del = conn.prepareStatement ("REPLACE INTO logs (log_date, url, count) VALUES (?,?,?) ")
del.setTimestamp(1,new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse(lodDate).getTime()))
del.setString(2, url)
del.setInt(3, cnt.toInt)
del.executeUpdate
case _ => //なんかエラーログが出るようにしたほうがいいだろうなあ
}
case None => //なんかエラーログが出るようにしたほうがいいだろうなあ
}
}
})
}
def getDbConnection(url: String, username: String, password: String): Connection = {
Class.forName("com.mysql.jdbc.Driver").newInstance()
java.sql.DriverManager.getConnection(url,username,password)
}
// auto close されるようにローンパターンを適用
def using[X <: {def close()}, A](resource : X)(f : X => A): A =
try { f(resource)
} finally { resource.close()}
ddl
CREATE TABLE `logs` (
`log_date` datetime NOT NULL,
`url` varchar(255) NOT NULL,
`count` bigint(20) NOT NULL,
PRIMARY KEY (`log_date`,`url`)
);
参考サイト:すみません。以下のサイトのプログラムをベースにさせてもらってます。Sparkのこと全然わかんなかったので大変たすかりました。
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。