16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache SparkのScala shellを試す

Last updated at Posted at 2014-12-19

#概要

Scala Advent Calendar 2014の18日目です。遅れて申し訳ありません。

アプリケーションログをパースしてelasticsearchに登録、Kibanaで可視化する、というシステムを設計しています。
リアルタイムで高速に大量のログをパースする方法が必要だったので、Sparkを試してみようと思い、調べた結果をまとめてみました。

#試した環境

  • Ubuntu 14.04.1 LTS
  • Java 1.8.0_20
  • Scala 2.10.4
  • Spark 1.1.1

Sparkのインストール方法などは、公式のドキュメントを参照してください。

#Apacheのアクセスログをパースしてみる

LogFormatは以下の形式である前提です。

LogFormat
LogFormat "%h %l %u %t %>s %X %T %B \"%r\" \"%{Referer}i\" \"%{User-Agent}i\""

具体的にはこのようなものです。

example
192.168.1.10 - - [20/Oct/2014:09:00:03 +0900] 200 + 0 1194 "POST /hoge/hoge HTTP/1.1" "http://hoge.co.jp/hoge/view?aaa=1234" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/14.0.835.202 Safari/535.1"

##Parserを書く

Parserは以下です。
以下の状態でsbt packageを実行し、jarファイルを作ります。

AccessLogParser.scala

package parser

import model.AccessLog
import util.parsing.combinator._
import java.text.{SimpleDateFormat}
import java.util.{Locale, TimeZone}

object AccessLogParser extends RegexParsers {

  override val whiteSpace = """[ \t]+""".r

  def eol: Parser[Any] = """\r?\n""".r ^^ { s => "" }
  def line: Parser[AccessLog] =
    ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent <~ rest ^^ {
      case ipAddress ~ ident ~ user ~ time ~ status ~ connStatus ~ elapsedTime ~ bytes ~ method ~ uri ~ version ~ referrer ~ userAgent =>
        AccessLog(ipAddress, ident, user, time, status, bytes, elapsedTime, method, uri, version, referrer, userAgent)
    }

  def ipAddress: Parser[String] = """[^ ]+""".r
  def ident: Parser[String] = """[(?:\w+)-]""".r
  def user: Parser[String] = """[(?:\w+)-]""".r
  def time: Parser[String] = "[" ~> """\S+ [^ \]]+""".r <~ "]" ^^ { convertToIso8601(_) }
  def status: Parser[Int] = """\d+""".r ^^ { _.toInt }
  def connStatus: Parser[String] = """\S+""".r
  def elapsedTime: Parser[Int] = """\d+""".r ^^ { _.toInt }
  def bytes: Parser[Int] = """[^ ]+""".r ^^ { case "-" => 0; case s => s.toInt }
  def method: Parser[String] = "\"" ~> """[A-Z]+""".r
  def uri: Parser[String] = """\S+""".r
  def version: Parser[String] = """[^ "]+""".r <~ "\""
  def referrer: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
  def userAgent: Parser[String] = "\"" ~> """[^"]+""".r <~ "\""
  def rest: Parser[String] = """[^$]*""".r

  def parse(log: String): ParseResult[AccessLog]  = parseAll(line, log)

  def convertToIso8601(strDate: String) = {
    val df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
    df.setTimeZone(TimeZone.getTimeZone("UTC"))
    val date = df.parse(strDate)
    df.applyPattern("YYYY-MM-dd'T'hh:mm:ss Z")
    df.format(date).replace(" ", "")
  }
}

case class AccessLog(
  ipAddress: String = "",
  ident: String = "",
  user: String = "",
  time: String = "",
  status: Int = 0,
  bytes: Int = 0,
  elapsedTime: Int = 0,
  method: String = "",
  uri: String = "",
  version: String = "",
  referrer: String = "",
  userAgent: String = "")

build.sbt

name := "LogParser"

version := "1.0"

scalaVersion := "2.10.4"

##SparkのREPLで動かしてみる

SparkのREPLを起動します。(ここでは4コアで起動しています。)
なお、前の手順で作ったjarファイルを/pathToSparkHome/jarに配置しています。

$ cd /pathToSparkHome/
$ ./bin/spark-shell --master local[4] --jars jar/logparser_2.10-1.0.jar

/pathToAccesslog/accesslogに保存してある1GB程度のログファイルをパースします。
有効なレコードをカウント、Status400のリクエストをカウントしています。

scala> import parser._
import parser._

scala> val file = sc.textFile("/pathToAccesslog/accesslog")
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(32728) called with curMem=0, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.1 MB)
14/12/19 10:54:22 INFO MemoryStore: ensureFreeSpace(4959) called with curMem=32728, maxMem=278019440
14/12/19 10:54:22 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35772 (size: 4.8 KB, free: 265.1 MB)
14/12/19 10:54:22 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
14/12/19 10:54:22 INFO SparkContext: Created broadcast 0 from textFile at <console>:15
file: org.apache.spark.rdd.RDD[String] = /home/ysksuzuki/access.log MappedRDD[1] at textFile at <console>:15

scala> val records = file.map{ line =>
     | AccessLogParser.parse(line).getOrElse(AccessLog())
     | }.filter(!_.ipAddress.isEmpty)
records: org.apache.spark.rdd.RDD[parser.AccessLog] = FilteredRDD[3] at filter at <console>:19

scala> records.count

(省略)

res0: Long = 2767874

scala> records.filter(_.status == 400).count

(省略)

res1: Long = 93

#試してみた結果

1GBのアクセスログをパースするのに30秒程でした。
ローカル環境で単体のマシンで試しただけなのですが、Sparkの機能の片鱗は感じられたかと思います。

Spark-hadoopを使ってelasticsearchにデータを登録するまでやりたかったのですが、
思ったように動いてくれなかったので諦めました。こっちはもう少し調べてみたいと思います。

16
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
16

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?