#概要
Scala Advent Calendar 2014の18日目です。遅れて申し訳ありません。
アプリケーションログをパースしてelasticsearchに登録、Kibanaで可視化する、というシステムを設計しています。
リアルタイムで高速に大量のログをパースする方法が必要だったので、Sparkを試してみようと思い、調べた結果をまとめてみました。
#試した環境
- Ubuntu 14.04.1 LTS
- Java 1.8.0_20
- Scala 2.10.4
- Spark 1.1.1
Sparkのインストール方法などは、公式のドキュメントを参照してください。
#Apacheのアクセスログをパースしてみる
LogFormatは以下の形式である前提です。
LogFormat "%h %l %u %t %>s %X %T %B \"%r\" \"%{Referer}i\" \"%{User-Agent}i\""
具体的にはこのようなものです。
192.168.1.10 - - [20/Oct/2014:09:00:03 +0900] 200 + 0 1194 "POST /hoge/hoge HTTP/1.1" "http://hoge.co.jp/hoge/view?aaa=1234" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.202 Safari/535.1"
##Parserを書く
Parserは以下です。
以下の状態でsbt packageを実行し、jarファイルを作ります。
package parser
import model.AccessLog
import util.parsing.combinator._
import java.text.{SimpleDateFormat}
import java.util.{Locale, TimeZone}
object AccessLogParser extends RegexParsers {
override val whiteSpace = """[ \t]+""".r
def eol: Parser[Any] = """\r?\n""".r ^^ { s => "" }
def line: Parser[AccessLog] =
ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent <~ rest ^^ {
case ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent =>
AccessLog(ipAddress, ident, user, time, status, bytes, elapsedTime, method, uri, version, referrer, userAgent)
}
def ipAddress: Parser[String] = """[^ ]+""".r
def ident: Parser[String] = """[(?:\w+)-]""".r
def user: Parser[String] = """[(?:\w+)-]""".r
def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { convertToIso8601(_) }
def status: Parser[Int] = """\d+""".r ^^ { _.toInt }
def connStatus: Parser[String] = """\S+""".r
def elapsedTime: Parser[Int] = """\d+""".r ^^ { _.toInt }
def bytes: Parser[Int] = """[^ ]+""".r ^^ { case "-" => 0; case s => s.toInt }
def method: Parser[String] = "\"" ~> """[A-Z]+""".r
def uri: Parser[String] = """\S+""".r
def version: Parser[String] = """[^ "]+""".r <~ "\""
def referrer: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def userAgent: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def rest: Parser[String] = """[^$]*""".r
def parse(log: String): ParseResult[AccessLog] = parseAll(line, log)
def convertToIso8601(strDate: String) = {
val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
df.setTimeZone(TimeZone.getTimeZone("UTC"))
val date = df.parse(strDate)
df.applyPattern("YYYY-MM-dd'T'hh:mm:ss Z")
df.format(date).replace(" ", "")
}
}
case class AccessLog(
ipAddress: String = "",
ident: String = "",
user: String = "",
time: String = "",
status: Int = 0,
bytes: Int = 0,
elapsedTime: Int = 0,
method: String = "",
uri: String = "",
version: String = "",
referrer: String = "",
userAgent: String = "")
name := "LogParser"
version := "1.0"
scalaVersion := "2.10.4"
##SparkのREPLで動かしてみる
SparkのREPLを起動します。(ここでは4コアで起動しています。)
なお、前の手順で作ったjarファイルを/pathToSparkHome/jarに配置しています。
$ cd /pathToSparkHome/
$ ./bin/spark-shell --master local[4] --jars jar/logparser_2.10-1.0.jar
/pathToAccesslog/accesslogに保存してある1GB程度のログファイルをパースします。
有効なレコードをカウント、Status400のリクエストをカウントしています。
scala> import parser._
import parser._
scala> val file = sc.textFile("/pathToAccesslog/accesslog")
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.1 MB)
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(4959) called with curMem=32728, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35772 (size: 4.8 KB, free: 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/19 10:54:22 INFO SparkContext: Created broadcast 0 from textFile at <console>:15
file: org.apache.spark.rdd.RDD[String] = /home/ysksuzuki/access.log MappedRDD[1] at textFile at <console>:15
scala> val records = file.map{ line =>
| AccessLogParser.parse(line).getOrElse(AccessLog())
| }.filter(!_.ipAddress.isEmpty)
records: org.apache.spark.rdd.RDD[parser.AccessLog] = FilteredRDD[3] at filter at <console>:19
scala> records.count
(省略)
res0: Long = 2767874
scala> records.filter(_.status == 400).count
(省略)
res1: Long = 93
#試してみた結果
1GBのアクセスログをパースするのに30秒程でした。
ローカル環境で単体のマシンで試しただけなのですが、Sparkの機能の片鱗は感じられたかと思います。
Spark-hadoopを使ってelasticsearchにデータを登録するまでやりたかったのですが、
思ったように動いてくれなかったので諦めました。こっちはもう少し調べてみたいと思います。