Scala
Spark

Apache Sparkでlog集計と集計結果のDB Insertを実施してみた

Sparkというものを最近になって触る機会があったのでちょっとお試しして見た。
大規模データをいろいろ編集できるらしいがその威力はいかに。。
最下部リンクにてlog集計をされているかたがサンプルあげてくれていたので、すこし編集させて頂き
Mysqlへデータを書き込みされるように作って見ました。

github

Sparkのバージョンが低かったので、最低限のバージョンまでアップした。

build.sbt
name := "spark_csv_sample"

version := "1.0.0"

scalaVersion := "2.11.2"

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0",
  "org.apache.spark" % "spark-core_2.11" % "1.6.0",
  "org.apache.spark" %% "spark-sql" % "2.2.0",
  "mysql" % "mysql-connector-java" % "5.1.46",
  "org.mortbay.jetty" % "jetty" % "6.1.26"
)

MysqlへのDB Insert

SparkCSVSample.scala
  private def writeDB(countData: RDD[Option[String]]): Unit = {

    // 環境変数使いたいけど一旦はこれで。。
    val url = "jdbc:mysql://localhost:3306/nano_planner_dev"
    val username = "root"
    val password = ""

    // パーティション単位で Insert 〜 Closeされるのが正しいとどっかのブログに書いてったので、これでいいかなあ。
    countData.foreachPartition(iter => {
      using(getDbConnection(url,username,password)) { implicit conn =>
        iter.foreach {
          case Some(s) =>
            s.split(",") match {
              case  Array(lodDate, url, cnt) =>
                //洗い替えできるようにREPLACEを使いました
                val del = conn.prepareStatement ("REPLACE INTO logs (log_date, url, count) VALUES (?,?,?) ")
                del.setTimestamp(1,new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse(lodDate).getTime()))
                del.setString(2, url)
                del.setInt(3, cnt.toInt)
                del.executeUpdate
              case _ => //なんかエラーログが出るようにしたほうがいいだろうなあ
            }
          case None => //なんかエラーログが出るようにしたほうがいいだろうなあ
        }
      }
    })
  }

  def getDbConnection(url: String, username: String, password: String): Connection = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    java.sql.DriverManager.getConnection(url,username,password)
  }

  // auto close されるようにローンパターンを適用
  def using[X <: {def close()}, A](resource : X)(f : X => A): A =
    try { f(resource)
    } finally { resource.close()}
ddl
CREATE TABLE `logs` (
  `log_date` datetime NOT NULL,
  `url` varchar(255) NOT NULL,
  `count` bigint(20) NOT NULL,
  PRIMARY KEY (`log_date`,`url`)
);

参考サイト:すみません。以下のサイトのプログラムをベースにさせてもらってます。Sparkのこと全然わかんなかったので大変たすかりました。
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。