概要
次世代分散処理エンジンとして流行ってますね。Apache Spark。
https://spark.apache.org/
https://github.com/apache/spark
モチベーションが下がりかかってるところに、ちょうどアクセスログを解析して〜みたいなことをやろうとしていたので、実際のアクセスログを解析し、アクセス数を集計して、CSVファイルに出力するということを試してみました。
集計とCSV出力
今回は対象のアクセスログから「/hoge」のURIに対して日毎にどれくらいのアクセスがあったかを集計し、その結果をCSV出力してます。
こんな感じの出力結果にしたいです。
  /hoge,1,3,5,60,100,20000,294,...(対象日付分できる)
そして、実際のコードはこんな感じ。
import java.io.File
import java.text.SimpleDateFormat
import scala.util.parsing.combinator._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.{MappedRDD, RDD}
object SparkAnalysis {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Analysis")
    val context = new SparkContext(conf)
    val (logDir, csvFilePath) = (args(0), args(1))
    val records = logFiles(logDir).map { logFile =>
      val logData = context.textFile(logFile)
      logData.map(line => LogParser.parse(line).getOrElse(Log())).filter(!_.time.isEmpty)
    }.reduce(_ ++ _)
    
    val filteredRecords = records.filter(_.url == "/hoge")
    val partitions = filteredRecords.mapPartitions { records =>
      records.map { record => (record.time, 1) }
    }
    val counts = partitions.repartition(1).reduceByKey((a, b) => a + b).sortBy(_._1).map(_._2).glom.map(a => "/hoge," + a.mkString(","))
    writeCSV(csvFilePath, counts)
  }
  
  private def logFiles(logDir: String): Seq[String] =
    new File(logDir).listFiles.map(_.getPath))
  
  private def writeCSV(csvFilePath: String, countData: RDD[String]): Unit = {
    val tempFilePath = "/tmp/spark_temp"
    FileUtil.fullyDelete(new File(tempFilePath))
    FileUtil.fullyDelete(new File(csvFilePath))
    countData.saveAsTextFile(tempFilePath)
    merge(tempFilePath, csvFilePath)
  }
  
  private def merge(srcPath: String, dstPath: String): Unit = {
    val hadoopConfig = new Configuration
    val hdfs = FileSystem.get(hadoopConfig)
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
  }
  
}
case class Log(time: String = "", url: String = "")
object LogParser extends RegexParsers {
  def parse(log: String): ParseResult[Log] = parseAll(line, log)
  private def line: Parser[Log] =
    time ~ logLevel ~ method ~ url ~ routes ~ controller ~ returned ~ status ~ in ~ procTime ~ where ~ requestId ~ and ~ remoteAddress ~ and ~ userAgent ^^ {
      case time ~ logLevel ~ method ~ url ~ routes ~ controller ~ returned ~ status ~ in ~ procTime ~ where ~ requestId ~ and1 ~ remoteAddress ~ and2 ~ userAgent => Log(time, url)
    }
  private def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { dayFloor }
  private def logLevel: Parser[String] = "[" ~> """\S+""".r <~ "]"
  private def method: Parser[String] = """[A-Z]+""".r
  private def url: Parser[String] = """\S+""".r
  private def routes: Parser[String] = "routes".r
  private def controller: Parser[String] = """\S+""".r
  private def returned: Parser[String] = "returned".r
  private def status: Parser[Int] = """\S+""".r ^^ { status => status.split("=")(1).toInt }
  private def in: Parser[String] = "in".r
  private def procTime: Parser[String] = """\S+""".r
  private def where: Parser[String] = "where".r
  private def requestId: Parser[Long] = "[" ~> """\S+=\d+""".r <~ "]" ^^ { requestId => requestId.split("=")(1).toLong }
  private def and: Parser[String] = "and".r
  private def remoteAddress: Parser[String] = """\S+""".r ^^ { remoteAddress => remoteAddress.split("=")(1) }
  private def userAgent: Parser[String] = """[^"]+""".r
  private def dayFloor(timestamp: String): String = {
    val dateTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS").parse(timestamp)
    new SimpleDateFormat("yyyy-MM-dd").format(dateTime)
  }
}
ハマった点
「val counts」のところで、「repartition(1)」を指定していますが、ここはあえて Spark のパーティションを1に指定しています。
ここを指定しないとどうなるか。。
パーティションを指定しない場合の出力結果
/hoge,1
/hoge,3
/hoge,5
/hoge,60
/hoge,100
/hoge,20000
/hoge,294
...
縦に並んでしまいます。こうはなってほしくない。。
これの原因ですが、Spark は RDD というインスタンスの内部にパーティションを持っており、パーティション毎に処理を実行していくので、
org.apache.spark.rdd.RDD#saveAsTextFile もパーティション毎にファイルを作成してしまいます。
実際にファイルを見てみるとこんな感じでした。
$ ls /tmp/spark_temp
_SUCCESS    part-00004  part-00009  part-00014  part-00019  part-00024  part-00029  part-00034  part-00039  part-00044  part-00049  part-00054
part-00000  part-00005  part-00010  part-00015  part-00020  part-00025  part-00030  part-00035  part-00040  part-00045  part-00050  part-00055
part-00001  part-00006  part-00011  part-00016  part-00021  part-00026  part-00031  part-00036  part-00041  part-00046  part-00051  part-00056
part-00002  part-00007  part-00012  part-00017  part-00022  part-00027  part-00032  part-00037  part-00042  part-00047  part-00052  part-00057
part-00003  part-00008  part-00013  part-00018  part-00023  part-00028  part-00033  part-00038  part-00043  part-00048  part-00053
このファイルたちを最終的に Hadoop の FileUtil#copyMerge で一つのファイルにがっちゃんこしてるから縦に並んでしまいました。。
なので、パーティションのサイズを1で固定してしまって、1つのパーティション内で
集計とCSV出力を行う形にしました。
(パーティションが1つなのでログのサイズがバカでかいものだった場合の処理速度とか気になりますが、今回はログサイズも大したことなかったので妥協。。)
パーティション固定しなくてもこんないい方法あるよーとかあったら教えていただけると嬉しいです!!
まとめ
- Apache Spark の内側を少し知れた。(パーティションとかログ見たりとか)
- scala の RegexParser が理解しにくすぎワロタ。
- やっぱり新しいことは楽しい!!モチベーションの向上になった。
- SparkStreaming の方も時間があったら試してみる。
追記(2015/03/02)
Github にサンプルコード上げてみました。
上記のサンプルコードから更に下記の機能を追加しています。
- URLをカンマ区切りで複数指定できるように修正。
- Apache Spark のスクリプトを実行してCSVファイルを出力するまでの実行スクリプトを追加。
参考にしたサイト
http://qiita.com/aoiroaoino/items/1463362db165d5b08eba
http://architects.dzone.com/articles/spark-write-csv-file
http://spark.apache.org/docs/1.2.0/programming-guide.html
http://www.ne.jp/asahi/hishidama/home/tech/scala/spark/partition.html