LoginSignup
12
10

More than 5 years have passed since last update.

インフラエンジニアだけどSparkStreamingとKafkaをつかってアクセスログを集計してみた

Posted at
  • インフラエンジニアっぽくNginxのアクセスログをリアルタイム集計してみる
  • SampleがScalaが多いのでScalaで書いてみた(初Scala)
  • なうな感じでNginx=>Fluent=>Kafka=>SparkStreaming
  • Scala汚いのはゆるしてね
  • 基本的にWorkCountのsampleをごにょごにょしただけ
  • とりあえず集計してみる

Sampleログ

こちらを使わせてもらってLTSVのログを用意しました。
最後の行だけ魔改造してます。

sample_apache_gen.rb
puts "time:#{Time.at(now).strftime('%d/%b/%Y:%H:%M:%S %z')}\thost:#{record['host']}\tforwardedfor:#{record['host']}\treq:#{record['method']} #{record['path']} HTTP/1.1\tstatus:#{record['code']}\tsize:#{record['size']}\treferer:#{record['referer']}\tua:#{record['agent']}"
sample.log
time:22/Dec/2016:18:07:56 +0900 host:164.81.181.112     forwardedfor:164.81.181.112     req:GET /category/office HTTP/1.1      status:200      size:124        referer:/item/games/3481        ua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3
time:22/Dec/2016:18:07:59 +0900 host:196.93.44.211      forwardedfor:196.93.44.211      req:GET /category/electronics?from=10 HTTP/1.1 status:200      size:136        referer:/category/electronics   ua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1
time:22/Dec/2016:18:08:02 +0900 host:20.171.223.57      forwardedfor:20.171.223.57      req:GET /category/finance HTTP/1.1     status:200      size:78 referer:/category/office        ua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)

Fluent=>Kafka部分

fluent-plugin-kafkaを使ってログをKafkaに送り込む

  • 全メッセージをStringとして送り込む
  • parseはFluentではなくSparkでやらせる(どっちがいいかは知らないけどSlcalaの方が早いイメージ+勉強のため)

td-agent-gem install fluent-plugin-kafka してtd-agent.confをおく

td-agent.conf
<match **>
  @type kafka
  brokers 10.0.0.65:9092
  zookeeper 10.0.0.65:2181
  default_topic nginx
</match>

Kafka準備

ダウンロードして設置

$ cd /opt
$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
$ tar xfvz kafka_2.10-0.10.1.0.tgz
$ ln -s kafka_2.10-0.10.1.0 kafka
$ cd kafka

Zookeeperを普通に起動

$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
$ jps -v |grep zookeeper
3839 QuorumPeerMain -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties

Kafkaを普通に起動

$ ./bin/kafka-server-start.sh -daemon config/server.properties
$ jps -v |grep Kafka
28603 Kafka -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties

メッセージを流し込んで見る

コンソール出力するWorker起動

$ ./bin/kafka-console-consumer.sh --consumer-property=config/consumer.properties --zookeeper.0.0.65:2181 --topic nginx

Fluentにデータを流し込んで見る

$ head sample.log| /opt/td-agent/embendfluent-cat --none data.nginx

Worker側のコンソールで出力されたらOK

{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:05:07+0900"}
{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:05:07+0900"}

Sparkの準備

こちらでも書いたので省略
spark-2.0.0.tgzを取ってきて解凍してるだけ

Scalaの環境作成

ScalaとSBTのインストール

  • sbt 0.13.12
  • scala 2.11.6-6

https://dl.bintray.com/sbt/debian からaptインストール

$ apt install scala=2.11.6-6 sbt=0.13.12

ディレクトリ作成

$ mkdir -p sample/src/main/scala/

build.sbt作成

$ cat sample/build.sbt 
import scala.util.Properties

name := "Test"
version := "1.0"
scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.0.0"
libraryDependencies += "net.liftweb" % "lift-json_2.11" % "3.0.1"
libraryDependencies += "com.github.seratch" % "ltsv4s_2.11" % "1.0.+"

assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

assemblyMergeStrategy in assemblyに関してはassembly実行時にいろいろエラーになってしまったのでコピペしてます(よくわかってない)

assembly使うためにプラグイン追加

$ mkdir -p sample/project/
$ cat sample/project/plugins.sbt 
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
  addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Kafka取得してprintするだけのコード

sample通り

sample/src/main/scala/Test.scala
package com.test.spark

import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWorker {
  def main(args: Array[String]) {
    // zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec
    if (args.length < 5) {
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads, sec) = args
    val secSleep    = sec.toInt
    val topicMap    = topics.split(",").map((_, numThreads.toInt)).toMap
    val sparkConf   = new SparkConf().setAppName("KafkaWorker")
    val ssc         = new StreamingContext(sparkConf, Seconds(secSleep))
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    ssc.checkpoint("checkpoint")

    kafkaStream.foreachRDD{ rdd => 
      println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
      rdd.foreach(print)
      println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

コンパイル

$ cd sample/
$ sbt assembly #大量にwarnがでるけどMergingなので気にしない
$ ll target/scala-2.11/Test-assembly-1.0.jar 
-rw-r--r-- 1 root root 112868840 Dec 26 11:24 target/scala-2.11/Test-assembly-1.0.jar

SparkでJarを実行

引数はSourceに書いた通りに
zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec の順番で

$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker  /root/sample/target/scala-2.11/Testsembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 11:31:05 JST 2016 ###
### END Mon Dec 26 11:31:05 JST 2016 ###

### Start Mon Dec 26 11:31:10 JST 2016 ###
### END Mon Dec 26 11:31:10 JST 2016 ###

引数に書いた5秒おきに処理しているっぽいことを確認

データを流し込んで見る

$ head sample.log| /agent/embedded/bin/fluent-cat --none data.nginx
### Start Mon Dec 26 11:32:20 JST 2016 ###
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:02 +0900\thost:20.171.223.57\tforwardedfor:20.171.223.57\treq:GET /category/finance HTTP/1.1\tstatus:200\tsize:78\treferer:/category/office\tua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:06 +0900\thost:212.159.169.49\tforwardedfor:212.159.169.49\treq:GET /item/computers/2268 HTTP/1.1\tstatus:200\tsize:139\treferer:/item/networking/248\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:07 +0900\thost:140.69.110.95\tforwardedfor:140.69.110.95\treq:GET /category/books HTTP/1.1\tstatus:200\tsize:109\treferer:-\tua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:09 +0900\thost:172.18.127.139\tforwardedfor:172.18.127.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:135\treferer:-\tua:Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:12 +0900\thost:120.222.102.169\tforwardedfor:120.222.102.169\treq:POST /search/?c=Computers+Electronics HTTP/1.1\tstatus:200\tsize:128\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:116.150.211.139\tforwardedfor:116.150.211.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:55\treferer:-\tua:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:192.42.23.199\tforwardedfor:192.42.23.199\treq:GET /category/networking HTTP/1.1\tstatus:200\tsize:59\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:220.84.166.98\tforwardedfor:220.84.166.98\treq:GET /category/toys HTTP/1.1\tstatus:200\tsize:124\treferer:/item/office/4833\tua:Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","timestamp":"2016-12-26T11:32:16+0900"}
### END Mon Dec 26 11:32:20 JST 2016 ###

ちゃんとKafkaからfetchしてprintしている。

集計してみる

アクセスログのリクエストパスの1階層目 GET /category/office HTTP/1.1category を5秒毎に集計してみる
iconとかjsとかエラーレスポンスは除外したいので条件は status == 200 かつ size > 100 でフィルタリングしてみる

Source

sample/src/main/scala/Test.scala
package com.test.spark

import java.util.HashMap
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.util.parsing.json.JSON
import net.liftweb._
import net.liftweb.json._
import com.github.seratch.ltsv4s._

object KafkaWorker {
  case class FluentEvent(
      timestamp: String,
      message: String
    )

  def main(args: Array[String]) {
    // zkQuorum(127.0.0.1:2181), group(test), topics(imp), numThreads(2), Sec
    if (args.length < 5) {
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads, sec) = args
    val secSleep    = sec.toInt
    val topicMap    = topics.split(",").map((_, numThreads.toInt)).toMap
    val sparkConf   = new SparkConf().setAppName("KafkaWorker")
    val ssc         = new StreamingContext(sparkConf, Seconds(secSleep))
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    ssc.checkpoint("checkpoint")

    val nginxStream = kafkaStream.map(convertFluentToMap(_))
    // ステータスコード200以上でresponsが100Byte以上のリクエストをパス(第1階層)のみ抽出
    val pathsStream = nginxStream.map{nginxRecord =>
      if (nginxRecord("size").toInt >= 100 && nginxRecord("status").toInt == 200 ){
        reqToPath(nginxRecord("req")).split("/")(1)
      }
    }
    // path毎にcountする
    val countPath = pathsStream.map((_, 1))
      .reduceByKeyAndWindow(_ + _, Seconds(secSleep))
      .map{case (path, count) => (count, path)}
      .transform(_.sortByKey(false))

    // OutPut
    countPath.foreachRDD{ rdd => 
      println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
      val path = rdd.take(10) 
      path.foreach{case (count, tag) => 
        tag match {
          case tag: String => println("%s count (%s)".format(count, tag))
          case _ => println("%s count not match".format(count))
        }
      }
      println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
    }

    ssc.start()
    ssc.awaitTermination()
  }
  def parseNginxLtsv(record: String) = { LTSV.parseLine(record) }
  def parseFluentJson(record: String) = {
    implicit val formats = DefaultFormats
    parse(record).extract[FluentEvent].message
  }
  def convertFluentToMap(record: String) = { parseNginxLtsv(parseFluentJson(record)) }
  def reqToPath(record: String) = { record.split(" ")(1) }
}

動かしてみる

$ sbt assembly
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker  /root/sample/target/scala-2.11/Test-assembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 12:32:25 JST 2016 ###                                      
879 count not match
285 count (category)
190 count (item)
48 count (search)
### END Mon Dec 26 12:32:25 JST 2016 ###

### Start Mon Dec 26 12:32:30 JST 2016 ###
802 count not match
267 count (category)
175 count (item)
38 count (search)
### END Mon Dec 26 12:32:30 JST 2016 ###

### Start Mon Dec 26 12:32:35 JST 2016 ###
895 count not match
321 count (category)
181 count (item)
53 count (search)
### END Mon Dec 26 12:32:35 JST 2016 ###

時間あったらFluentのDataCounterとかNorikuraとかと性能試験とかしてみたい。

12
10
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
10