LoginSignup
1
0

More than 5 years have passed since last update.

Apache Sparkでlog集計と集計結果のDB Insertを実施してみた

Last updated at Posted at 2018-04-14

Sparkというものを最近になって触る機会があったのでちょっとお試しして見た。
大規模データをいろいろ編集できるらしいがその威力はいかに。。
最下部リンクにてlog集計をされているかたがサンプルあげてくれていたので、すこし編集させて頂き
Mysqlへデータを書き込みされるように作って見ました。

github

Sparkのバージョンが低かったので、最低限のバージョンまでアップした。

build.sbt

name := "spark_csv_sample"

version := "1.0.0"

scalaVersion := "2.11.2"

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0",
  "org.apache.spark" % "spark-core_2.11" % "1.6.0",
  "org.apache.spark" %% "spark-sql" % "2.2.0",
  "mysql" % "mysql-connector-java" % "5.1.46",
  "org.mortbay.jetty" % "jetty" % "6.1.26"
)

MysqlへのDB Insert

SparkCSVSample.scala

  private def writeDB(countData: RDD[Option[String]]): Unit = {

    // 環境変数使いたいけど一旦はこれで。。
    val url = "jdbc:mysql://localhost:3306/nano_planner_dev"
    val username = "root"
    val password = ""

    // パーティション単位で Insert 〜 Closeされるのが正しいとどっかのブログに書いてったので、これでいいかなあ。
    countData.foreachPartition(iter => {
      using(getDbConnection(url,username,password)) { implicit conn =>
        iter.foreach {
          case Some(s) =>
            s.split(",") match {
              case  Array(lodDate, url, cnt) =>
                //洗い替えできるようにREPLACEを使いました
                val del = conn.prepareStatement ("REPLACE INTO logs (log_date, url, count) VALUES (?,?,?) ")
                del.setTimestamp(1,new Timestamp(new SimpleDateFormat("yyyy-MM-dd").parse(lodDate).getTime()))
                del.setString(2, url)
                del.setInt(3, cnt.toInt)
                del.executeUpdate
              case _ => //なんかエラーログが出るようにしたほうがいいだろうなあ
            }
          case None => //なんかエラーログが出るようにしたほうがいいだろうなあ
        }
      }
    })
  }

  def getDbConnection(url: String, username: String, password: String): Connection = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    java.sql.DriverManager.getConnection(url,username,password)
  }

  // auto close されるようにローンパターンを適用
  def using[X <: {def close()}, A](resource : X)(f : X => A): A =
    try { f(resource)
    } finally { resource.close()}
ddl

CREATE TABLE `logs` (
  `log_date` datetime NOT NULL,
  `url` varchar(255) NOT NULL,
  `count` bigint(20) NOT NULL,
  PRIMARY KEY (`log_date`,`url`)
);

参考サイト:すみません。以下のサイトのプログラムをベースにさせてもらってます。Sparkのこと全然わかんなかったので大変たすかりました。
Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0