- インフラエンジニアっぽくNginxのアクセスログをリアルタイム集計してみる
- SampleがScalaが多いのでScalaで書いてみた(初Scala)
- なうな感じでNginx=>Fluent=>Kafka=>SparkStreaming
- Scala汚いのはゆるしてね
- 基本的にWorkCountのsampleをごにょごにょしただけ
- とりあえず集計してみる
Sampleログ
こちらを使わせてもらってLTSVのログを用意しました。
最後の行だけ魔改造してます。
puts "time:#{Time.at(now).strftime('%d/%b/%Y:%H:%M:%S %z')}\thost:#{record['host']}\tforwardedfor:#{record['host']}\treq:#{record['method']} #{record['path']} HTTP/1.1\tstatus:#{record['code']}\tsize:#{record['size']}\treferer:#{record['referer']}\tua:#{record['agent']}"
time:22/Dec/2016:18:07:56 +0900 host:164.81.181.112 forwardedfor:164.81.181.112 req:GET /category/office HTTP/1.1 status:200 size:124 referer:/item/games/3481 ua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3
time:22/Dec/2016:18:07:59 +0900 host:196.93.44.211 forwardedfor:196.93.44.211 req:GET /category/electronics?from=10 HTTP/1.1 status:200 size:136 referer:/category/electronics ua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1
time:22/Dec/2016:18:08:02 +0900 host:20.171.223.57 forwardedfor:20.171.223.57 req:GET /category/finance HTTP/1.1 status:200 size:78 referer:/category/office ua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)
Fluent=>Kafka部分
fluent-plugin-kafkaを使ってログをKafkaに送り込む
- 全メッセージをStringとして送り込む
- parseはFluentではなくSparkでやらせる(どっちがいいかは知らないけどSlcalaの方が早いイメージ+勉強のため)
td-agent-gem install fluent-plugin-kafka
してtd-agent.confをおく
<match **>
@type kafka
brokers 10.0.0.65:9092
zookeeper 10.0.0.65:2181
default_topic nginx
</match>
Kafka準備
ダウンロードして設置
$ cd /opt
$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
$ tar xfvz kafka_2.10-0.10.1.0.tgz
$ ln -s kafka_2.10-0.10.1.0 kafka
$ cd kafka
Zookeeperを普通に起動
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ jps -v |grep zookeeper
3839 QuorumPeerMain -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/zookeeper-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
Kafkaを普通に起動
$ ./bin/kafka-server-start.sh -daemon config/server.properties
$ jps -v |grep Kafka
28603 Kafka -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
メッセージを流し込んで見る
コンソール出力するWorker起動
$ ./bin/kafka-console-consumer.sh --consumer-property=config/consumer.properties --zookeeper.0.0.65:2181 --topic nginx
Fluentにデータを流し込んで見る
$ head sample.log| /opt/td-agent/embendfluent-cat --none data.nginx
Worker側のコンソールで出力されたらOK
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:05:07+0900"}
{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:05:07+0900"}
Sparkの準備
こちらでも書いたので省略
spark-2.0.0.tgzを取ってきて解凍してるだけ
Scalaの環境作成
ScalaとSBTのインストール
- sbt 0.13.12
- scala 2.11.6-6
https://dl.bintray.com/sbt/debian からaptインストール
$ apt install scala=2.11.6-6 sbt=0.13.12
ディレクトリ作成
$ mkdir -p sample/src/main/scala/
build.sbt作成
$ cat sample/build.sbt
import scala.util.Properties
name := "Test"
version := "1.0"
scalaVersion := "2.11.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.0.0"
libraryDependencies += "net.liftweb" % "lift-json_2.11" % "3.0.1"
libraryDependencies += "com.github.seratch" % "ltsv4s_2.11" % "1.0.+"
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
assemblyMergeStrategy in assemblyに関してはassembly実行時にいろいろエラーになってしまったのでコピペしてます(よくわかってない)
assembly使うためにプラグイン追加
$ mkdir -p sample/project/
$ cat sample/project/plugins.sbt
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Kafka取得してprintするだけのコード
sample通り
package com.test.spark
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWorker {
def main(args: Array[String]) {
// zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec
if (args.length < 5) {
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, sec) = args
val secSleep = sec.toInt
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val sparkConf = new SparkConf().setAppName("KafkaWorker")
val ssc = new StreamingContext(sparkConf, Seconds(secSleep))
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
ssc.checkpoint("checkpoint")
kafkaStream.foreachRDD{ rdd =>
println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
rdd.foreach(print)
println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
}
ssc.start()
ssc.awaitTermination()
}
}
コンパイル
$ cd sample/
$ sbt assembly #大量にwarnがでるけどMergingなので気にしない
$ ll target/scala-2.11/Test-assembly-1.0.jar
-rw-r--r-- 1 root root 112868840 Dec 26 11:24 target/scala-2.11/Test-assembly-1.0.jar
SparkでJarを実行
引数はSourceに書いた通りに
zkQuorum(127.0.0.1:2181), group(test), topics(nginx), numThreads(2), Sec の順番で
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker /root/sample/target/scala-2.11/Testsembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 11:31:05 JST 2016 ###
### END Mon Dec 26 11:31:05 JST 2016 ###
### Start Mon Dec 26 11:31:10 JST 2016 ###
### END Mon Dec 26 11:31:10 JST 2016 ###
引数に書いた5秒おきに処理しているっぽいことを確認
データを流し込んで見る
$ head sample.log| /agent/embedded/bin/fluent-cat --none data.nginx
### Start Mon Dec 26 11:32:20 JST 2016 ###
{"message":"time:22/Dec/2016:18:07:56 +0900\thost:164.81.181.112\tforwardedfor:164.81.181.112\treq:GET /category/office HTTP/1.1\tstatus:200\tsize:124\treferer:/item/games/3481\tua:Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:07:59 +0900\thost:196.93.44.211\tforwardedfor:196.93.44.211\treq:GET /category/electronics?from=10 HTTP/1.1\tstatus:200\tsize:136\treferer:/category/electronics\tua:Mozilla/5.0 (Windows NT 6.1; WOW64; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:02 +0900\thost:20.171.223.57\tforwardedfor:20.171.223.57\treq:GET /category/finance HTTP/1.1\tstatus:200\tsize:78\treferer:/category/office\tua:Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:06 +0900\thost:212.159.169.49\tforwardedfor:212.159.169.49\treq:GET /item/computers/2268 HTTP/1.1\tstatus:200\tsize:139\treferer:/item/networking/248\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:07 +0900\thost:140.69.110.95\tforwardedfor:140.69.110.95\treq:GET /category/books HTTP/1.1\tstatus:200\tsize:109\treferer:-\tua:Mozilla/5.0 (Macintosh; Intel Mac OS X 10.6; rv:9.0.1) Gecko/20100101 Firefox/9.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:09 +0900\thost:172.18.127.139\tforwardedfor:172.18.127.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:135\treferer:-\tua:Mozilla/5.0 (Windows NT 6.0; rv:10.0.1) Gecko/20100101 Firefox/10.0.1","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:12 +0900\thost:120.222.102.169\tforwardedfor:120.222.102.169\treq:POST /search/?c=Computers+Electronics HTTP/1.1\tstatus:200\tsize:128\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:116.150.211.139\tforwardedfor:116.150.211.139\treq:GET /category/electronics HTTP/1.1\tstatus:200\tsize:55\treferer:-\tua:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; GTB7.2; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C)","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:192.42.23.199\tforwardedfor:192.42.23.199\treq:GET /category/networking HTTP/1.1\tstatus:200\tsize:59\treferer:-\tua:Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7","timestamp":"2016-12-26T11:32:16+0900"}{"message":"time:22/Dec/2016:18:08:16 +0900\thost:220.84.166.98\tforwardedfor:220.84.166.98\treq:GET /category/toys HTTP/1.1\tstatus:200\tsize:124\treferer:/item/office/4833\tua:Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","timestamp":"2016-12-26T11:32:16+0900"}
### END Mon Dec 26 11:32:20 JST 2016 ###
ちゃんとKafkaからfetchしてprintしている。
集計してみる
アクセスログのリクエストパスの1階層目 GET /category/office HTTP/1.1
の category
を5秒毎に集計してみる
iconとかjsとかエラーレスポンスは除外したいので条件は status == 200
かつ size > 100
でフィルタリングしてみる
Source
package com.test.spark
import java.util.HashMap
import java.util.Date
import java.util.Calendar
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import scala.util.parsing.json.JSON
import net.liftweb._
import net.liftweb.json._
import com.github.seratch.ltsv4s._
object KafkaWorker {
case class FluentEvent(
timestamp: String,
message: String
)
def main(args: Array[String]) {
// zkQuorum(127.0.0.1:2181), group(test), topics(imp), numThreads(2), Sec
if (args.length < 5) {
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads, sec) = args
val secSleep = sec.toInt
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val sparkConf = new SparkConf().setAppName("KafkaWorker")
val ssc = new StreamingContext(sparkConf, Seconds(secSleep))
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
ssc.checkpoint("checkpoint")
val nginxStream = kafkaStream.map(convertFluentToMap(_))
// ステータスコード200以上でresponsが100Byte以上のリクエストをパス(第1階層)のみ抽出
val pathsStream = nginxStream.map{nginxRecord =>
if (nginxRecord("size").toInt >= 100 && nginxRecord("status").toInt == 200 ){
reqToPath(nginxRecord("req")).split("/")(1)
}
}
// path毎にcountする
val countPath = pathsStream.map((_, 1))
.reduceByKeyAndWindow(_ + _, Seconds(secSleep))
.map{case (path, count) => (count, path)}
.transform(_.sortByKey(false))
// OutPut
countPath.foreachRDD{ rdd =>
println("### Start %s ###".format(Calendar.getInstance.getTime.toString))
val path = rdd.take(10)
path.foreach{case (count, tag) =>
tag match {
case tag: String => println("%s count (%s)".format(count, tag))
case _ => println("%s count not match".format(count))
}
}
println("### END %s ###\n".format(Calendar.getInstance.getTime.toString))
}
ssc.start()
ssc.awaitTermination()
}
def parseNginxLtsv(record: String) = { LTSV.parseLine(record) }
def parseFluentJson(record: String) = {
implicit val formats = DefaultFormats
parse(record).extract[FluentEvent].message
}
def convertFluentToMap(record: String) = { parseNginxLtsv(parseFluentJson(record)) }
def reqToPath(record: String) = { record.split(" ")(1) }
}
動かしてみる
$ sbt assembly
$ cd /opt/spark
$ ./bin/spark-submit --class com.test.spark.KafkaWorker /root/sample/target/scala-2.11/Test-assembly-1.0.jar 127.0.0.1:2181 test nginx 2 5
### Start Mon Dec 26 12:32:25 JST 2016 ###
879 count not match
285 count (category)
190 count (item)
48 count (search)
### END Mon Dec 26 12:32:25 JST 2016 ###
### Start Mon Dec 26 12:32:30 JST 2016 ###
802 count not match
267 count (category)
175 count (item)
38 count (search)
### END Mon Dec 26 12:32:30 JST 2016 ###
### Start Mon Dec 26 12:32:35 JST 2016 ###
895 count not match
321 count (category)
181 count (item)
53 count (search)
### END Mon Dec 26 12:32:35 JST 2016 ###
時間あったらFluentのDataCounterとかNorikuraとかと性能試験とかしてみたい。