ScalaDay 18

Apache SparkのScala shellを試す

More than 3 years have passed since last update.


概要

Scala Advent Calendar 2014の18日目です。遅れて申し訳ありません。

アプリケーションログをパースしてelasticsearchに登録、Kibanaで可視化する、というシステムを設計しています。

リアルタイムで高速に大量のログをパースする方法が必要だったので、Sparkを試してみようと思い、調べた結果をまとめてみました。


試した環境


  • Ubuntu 14.04.1 LTS

  • Java 1.8.0_20

  • Scala 2.10.4

  • Spark 1.1.1

Sparkのインストール方法などは、公式のドキュメントを参照してください。


Apacheのアクセスログをパースしてみる

LogFormatは以下の形式である前提です。


LogFormat

LogFormat "%h %l %u %t %>s %X %T %B \"%r\" \"%{Referer}i\" \"%{User-Agent}i\""


具体的にはこのようなものです。


example

192.168.1.10 - - [20/Oct/2014:09:00:03 +0900] 200 + 0 1194 "POST /hoge/hoge HTTP/1.1" "http://hoge.co.jp/hoge/view?aaa=1234" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.202 Safari/535.1"



Parserを書く

Parserは以下です。

以下の状態でsbt packageを実行し、jarファイルを作ります。


AccessLogParser.scala


package parser

import model.AccessLog
import util.parsing.combinator._
import java.text.{SimpleDateFormat}
import java.util.{Locale, TimeZone}

object AccessLogParser extends RegexParsers {

override val whiteSpace = """[ \t]+""".r

def eol: Parser[Any] = """\r?\n""".r ^^ { s => "" }
def line: Parser[AccessLog] =
ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent <~ rest ^^ {
case ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent =>
AccessLog(ipAddress, ident, user, time, status, bytes, elapsedTime, method, uri, version, referrer, userAgent)
}

def ipAddress: Parser[String] = """[^ ]+""".r
def ident: Parser[String] = """[(?:\w+)-]""".r
def user: Parser[String] = """[(?:\w+)-]""".r
def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { convertToIso8601(_) }
def status: Parser[Int] = """\d+""".r ^^ { _.toInt }
def connStatus: Parser[String] = """\S+""".r
def elapsedTime: Parser[Int] = """\d+""".r ^^ { _.toInt }
def bytes: Parser[Int] = """[^ ]+""".r ^^ { case "-" => 0; case s => s.toInt }
def method: Parser[String] = "\"" ~> """[A-Z]+""".r
def uri: Parser[String] = """\S+""".r
def version: Parser[String] = """[^ "]+""".r <~ "\""
def referrer: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def userAgent: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
def rest: Parser[String] = """[^$]*""".r

def parse(log: String): ParseResult[AccessLog] = parseAll(line, log)

def convertToIso8601(strDate: String) = {
val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
df.setTimeZone(TimeZone.getTimeZone("UTC"))
val date = df.parse(strDate)
df.applyPattern("YYYY-MM-dd'T'hh:mm:ss Z")
df.format(date).replace(" ", "")
}
}

case class AccessLog(
ipAddress: String = "",
ident: String = "",
user: String = "",
time: String = "",
status: Int = 0,
bytes: Int = 0,
elapsedTime: Int = 0,
method: String = "",
uri: String = "",
version: String = "",
referrer: String = "",
userAgent: String = "")



build.sbt


name := "LogParser"

version := "1.0"

scalaVersion := "2.10.4"



SparkのREPLで動かしてみる

SparkのREPLを起動します。(ここでは4コアで起動しています。)

なお、前の手順で作ったjarファイルを/pathToSparkHome/jarに配置しています。

$ cd /pathToSparkHome/

$ ./bin/spark-shell --master local[4] --jars jar/logparser_2.10-1.0.jar

/pathToAccesslog/accesslogに保存してある1GB程度のログファイルをパースします。

有効なレコードをカウント、Status400のリクエストをカウントしています。

scala> import parser._

import parser._

scala> val file = sc.textFile("/pathToAccesslog/accesslog")
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.1 MB)
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(4959) called with curMem=32728, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35772 (size: 4.8 KB, free: 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/19 10:54:22 INFO SparkContext: Created broadcast 0 from textFile at <console>:15
file: org.apache.spark.rdd.RDD[String] = /home/ysksuzuki/access.log MappedRDD[1] at textFile at <console>:15

scala> val records = file.map{ line =>
| AccessLogParser.parse(line).getOrElse(AccessLog())
| }.filter(!_.ipAddress.isEmpty)
records: org.apache.spark.rdd.RDD[parser.AccessLog] = FilteredRDD[3] at filter at <console>:19

scala> records.count

(省略)

res0: Long = 2767874

scala> records.filter(_.status == 400).count

(省略)

res1: Long = 93


試してみた結果

1GBのアクセスログをパースするのに30秒程でした。

ローカル環境で単体のマシンで試しただけなのですが、Sparkの機能の片鱗は感じられたかと思います。

Spark-hadoopを使ってelasticsearchにデータを登録するまでやりたかったのですが、

思ったように動いてくれなかったので諦めました。こっちはもう少し調べてみたいと思います。