Web Serverのアクセスログをリアルタイムストリーミング分析するというのはよくある利用シーンだと想定されます。今回の記事は、Apache Flumeと呼ばれるログ収集基盤とE-MapReduceクラスターのSpark Streaming分析基盤の統合手法について、ご説明させて頂きたいと思います。

- 前提
- EMR-3.16.0
- クラスタータイプは Hadoop
- ハードウェア構成(Header)はecs.sn1ne.2xlargeを1台
- ハードウェア構成(Worker)はecs.sn1ne.2xlargeを3台
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
# uname -r
3.10.0-693.2.2.el7.x86_64
# flume-ng version
Flume 1.8.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Wed Nov 28 11:09:28 CST 2018
From source with checksum 63b5d03c9afd862ff786f7826ffe55d0
# hadoop version
Hadoop 2.7.2
Subversion http://gitlab.alibaba-inc.com/soe/emr-hadoop.git -r d2cd70f951304b8ca3d12991262e7e0d321abefc
Compiled by root on 2018-11-30T09:31Z
Compiled with protoc 2.5.0
From source with checksum 4447ed9f24dcd981df7daaadd5bafc0
This command was run using /opt/apps/ecm/service/hadoop/2.7.2-1.3.2/package/hadoop-2.7.2-1.3.2/share/hadoop/common/hadoop-common-2.7.2.jar
- Flumeの設定
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/spool
a1.sources.r1.fileHeader = true
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 9906
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- Spark Streamingについて
DStreamのバッチ間隔 | 1秒 |
スライディング間隔 | 1秒 |
ウィンドウサイズ | 300 秒 |
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
parts = [
r'(?P<host>\S+)',
r'\S+',
r'(?P<user>\S+)',
r'\[(?P<time>.+)\]',
r'"(?P<request>.+)"',
r'(?P<status>[0-9]+)',
r'(?P<size>\S+)',
r'"(?P<referer>.*)"',
r'"(?P<agent>.*)"',
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')
def extractURLRequest(line):
exp = pattern.match(line)
if exp:
request = exp.groupdict()["request"]
if request:
requestFields = request.split()
if (len(requestFields) > 1):
return requestFields[1]
if __name__ == "__main__":
sc = SparkContext(appName="StreamingFlumeLogAggregator")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
flumeStream = FlumeUtils.createStream(ssc, "localhost", 9906)
lines = flumeStream.map(lambda x: x[1])
urls = lines.map(extractURLRequest)
urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)
sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
sortedResults.pprint()
ssc.checkpoint("/root/checkpoint")
ssc.start()
ssc.awaitTermination()
- 実行する
# bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1 -Dflume.root.logger=INFO,console
spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 SparkFlume.py
# tail access_log.txt
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:54 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:55 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:56 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:57 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:58 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
46.166.139.20 - - [06/Dec/2015:03:14:59 +0000] "POST /xmlrpc.php HTTP/1.0" 200 370 "-" "Mozilla/4.0 (compatible: MSIE 7.0; Windows NT 6.0)"
下記のように、1秒間ごとにリアルタイム処理することができるようになりました。アクセス先URLごとのアクセス数(Top10)が一覧で出力されました。
-------------------------------------------
Time: 2019-03-15 14:02:19
-------------------------------------------
(u'/xmlrpc.php', 8509)
(u'/wp-login.php', 1798)
(u'/', 119)
(u'/robots.txt', 44)
(u'/blog/', 36)
(u'/page-sitemap.xml', 29)
(u'/post-sitemap.xml', 29)
(u'/category-sitemap.xml', 29)
(u'/sitemap_index.xml', 29)
(u'http://51.254.206.142/httptest.php', 26)
...
-------------------------------------------
Time: 2019-03-15 14:02:20
-------------------------------------------
(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...
-------------------------------------------
Time: 2019-03-15 14:02:21
-------------------------------------------
(u'/xmlrpc.php', 68415)
(u'/wp-login.php', 1923)
(u'/', 440)
(u'/blog/', 138)
(u'/robots.txt', 123)
(u'/post-sitemap.xml', 118)
(u'/sitemap_index.xml', 118)
(u'/page-sitemap.xml', 117)
(u'/category-sitemap.xml', 117)
(u'/orlando-headlines/', 95)
...
- 最後