Scala
Spark

Apache Spark を使ってアクセスログを解析して、その結果をCSVファイルに出力してみた。

More than 3 years have passed since last update.


概要

次世代分散処理エンジンとして流行ってますね。Apache Spark。

https://spark.apache.org/

https://github.com/apache/spark

モチベーションが下がりかかってるところに、ちょうどアクセスログを解析して〜みたいなことをやろうとしていたので、実際のアクセスログを解析し、アクセス数を集計して、CSVファイルに出力するということを試してみました。


集計とCSV出力

今回は対象のアクセスログから「/hoge」のURIに対して日毎にどれくらいのアクセスがあったかを集計し、その結果をCSV出力してます。

こんな感じの出力結果にしたいです。

  /hoge,1,3,5,60,100,20000,294,...(対象日付分できる)

そして、実際のコードはこんな感じ。

import java.io.File

import java.text.SimpleDateFormat
import scala.util.parsing.combinator._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.{MappedRDD, RDD}

object SparkAnalysis {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Analysis")
val context = new SparkContext(conf)
val (logDir, csvFilePath) = (args(0), args(1))

val records = logFiles(logDir).map { logFile =>
val logData = context.textFile(logFile)
logData.map(line => LogParser.parse(line).getOrElse(Log())).filter(!_.time.isEmpty)
}.reduce(_ ++ _)

val filteredRecords = records.filter(_.url == "/hoge")
val partitions = filteredRecords.mapPartitions { records =>
records.map { record => (record.time, 1) }
}
val counts = partitions.repartition(1).reduceByKey((a, b) => a + b).sortBy(_._1).map(_._2).glom.map(a => "/hoge," + a.mkString(","))

writeCSV(csvFilePath, counts)
}

private def logFiles(logDir: String): Seq[String] =
new File(logDir).listFiles.map(_.getPath))

private def writeCSV(csvFilePath: String, countData: RDD[String]): Unit = {
val tempFilePath = "/tmp/spark_temp"
FileUtil.fullyDelete(new File(tempFilePath))
FileUtil.fullyDelete(new File(csvFilePath))

countData.saveAsTextFile(tempFilePath)
merge(tempFilePath, csvFilePath)
}

private def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

}

case class Log(time: String = "", url: String = "")

object LogParser extends RegexParsers {

def parse(log: String): ParseResult[Log] = parseAll(line, log)

private def line: Parser[Log] =
time ~ logLevel ~ method ~ url ~ routes ~ controller ~ returned ~ status ~ in ~ procTime ~ where ~ requestId ~ and ~ remoteAddress ~ and ~ userAgent ^^ {
case time ~ logLevel ~ method ~ url ~ routes ~ controller ~ returned ~ status ~ in ~ procTime ~ where ~ requestId ~ and1 ~ remoteAddress ~ and2 ~ userAgent => Log(time, url)
}

private def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { dayFloor }
private def logLevel: Parser[String] = "[" ~> """\S+""".r <~ "]"
private def method: Parser[String] = """[A-Z]+""".r
private def url: Parser[String] = """\S+""".r
private def routes: Parser[String] = "routes".r
private def controller: Parser[String] = """\S+""".r
private def returned: Parser[String] = "returned".r
private def status: Parser[Int] = """\S+""".r ^^ { status => status.split("=")(1).toInt }
private def in: Parser[String] = "in".r
private def procTime: Parser[String] = """\S+""".r
private def where: Parser[String] = "where".r
private def requestId: Parser[Long] = "[" ~> """\S+=\d+""".r <~ "]" ^^ { requestId => requestId.split("=")(1).toLong }
private def and: Parser[String] = "and".r
private def remoteAddress: Parser[String] = """\S+""".r ^^ { remoteAddress => remoteAddress.split("=")(1) }
private def userAgent: Parser[String] = """[^"]+""".r

private def dayFloor(timestamp: String): String = {
val dateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS").parse(timestamp)

new SimpleDateFormat("yyyy-MM-dd").format(dateTime)
}

}


ハマった点

「val counts」のところで、「repartition(1)」を指定していますが、ここはあえて Spark のパーティションを1に指定しています。

ここを指定しないとどうなるか。。

パーティションを指定しない場合の出力結果

/hoge,1

/hoge,3
/hoge,5
/hoge,60
/hoge,100
/hoge,20000
/hoge,294
...

縦に並んでしまいます。こうはなってほしくない。。

これの原因ですが、Spark は RDD というインスタンスの内部にパーティションを持っており、パーティション毎に処理を実行していくので、

org.apache.spark.rdd.RDD#saveAsTextFile もパーティション毎にファイルを作成してしまいます。

実際にファイルを見てみるとこんな感じでした。

$ ls /tmp/spark_temp

_SUCCESS part-00004 part-00009 part-00014 part-00019 part-00024 part-00029 part-00034 part-00039 part-00044 part-00049 part-00054
part-00000 part-00005 part-00010 part-00015 part-00020 part-00025 part-00030 part-00035 part-00040 part-00045 part-00050 part-00055
part-00001 part-00006 part-00011 part-00016 part-00021 part-00026 part-00031 part-00036 part-00041 part-00046 part-00051 part-00056
part-00002 part-00007 part-00012 part-00017 part-00022 part-00027 part-00032 part-00037 part-00042 part-00047 part-00052 part-00057
part-00003 part-00008 part-00013 part-00018 part-00023 part-00028 part-00033 part-00038 part-00043 part-00048 part-00053

このファイルたちを最終的に Hadoop の FileUtil#copyMerge で一つのファイルにがっちゃんこしてるから縦に並んでしまいました。。

なので、パーティションのサイズを1で固定してしまって、1つのパーティション内で

集計とCSV出力を行う形にしました。

(パーティションが1つなのでログのサイズがバカでかいものだった場合の処理速度とか気になりますが、今回はログサイズも大したことなかったので妥協。。)

パーティション固定しなくてもこんないい方法あるよーとかあったら教えていただけると嬉しいです!!


まとめ


  • Apache Spark の内側を少し知れた。(パーティションとかログ見たりとか)

  • scala の RegexParser が理解しにくすぎワロタ。

  • やっぱり新しいことは楽しい!!モチベーションの向上になった。

  • SparkStreaming の方も時間があったら試してみる。


追記(2015/03/02)

Github にサンプルコード上げてみました。

上記のサンプルコードから更に下記の機能を追加しています。


  • URLをカンマ区切りで複数指定できるように修正。

  • Apache Spark のスクリプトを実行してCSVファイルを出力するまでの実行スクリプトを追加。

https://github.com/uriborn/apache-spark-csv-sample


参考にしたサイト

http://qiita.com/aoiroaoino/items/1463362db165d5b08eba

http://architects.dzone.com/articles/spark-write-csv-file

http://spark.apache.org/docs/1.2.0/programming-guide.html

http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/partition.html