ログを集計する方法としてSparkを使ってみる。
前準備
Spark本体のダウンロードとビルド
ここに書いてある通りまずはspark
のビルドをする
maven
はbrew install maven
しとておく事
mavenのバージョンとか
$ mvn -v
Apache Maven 3.1.1 (0728685237757ffbf44136acec0402957f723d9a; 2013-09-18 00:22:22+0900)
Maven home: /usr/local/Cellar/maven/3.1.1/libexec
Java version: 1.6.0_65, vendor: Apple Inc.
Java home: /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
Default locale: ja_JP, platform encoding: SJIS
OS name: "mac os x", version: "10.9.2", arch: "x86_64", family: "mac"
※ 自分のsparkのバージョンは1.1系
clone後にビルド
$ git clone git://github.com/apache/spark.git
$ cd spark
$ mvn -DskipTests clean package
この時OutOfMemory
がでたので調整して再度ビルド
mavenのメモリ調整
$ export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=512m"
$ mvn -DskipTests clean package
ひたすら待って,いつのまにかビルドが正常に終了している事を確認する。
spark-shell
に入れるか確認
$ ./bin/spark-shell
scala> sc.parallelize(1 to 1000).count()
scala> :quit
Error
がでずになんか動いてればおk(ぁ
Sampleのログデータの準備
/path/to/logs/
以下にタブ区切りのデータを用意
test.txt
2015-01-01 10:00 user1 http://spark.apache.org/releases/spark-release-1-2-1.html
2015-01-01 10:00 user1 https://github.com/apache/spark/releases
2015-01-02 11:00 user3 http://spark.apache.org/releases/spark-release-1-2-1.html
2015-01-01 11:00 user3 http://spark-project.org/download/
2015-01-01 11:00 user3 http://spark-project.org/download/
2015-01-01 11:00 user2 http://spark.apache.org/docs/latest/
ログの集計
ログを集計するアプリを作成する。
プロジェクトの場所: /path/to/spark-sample
sbtの設定。sparkのライブラリをインストールしておく事。
(プロジェクトの構成とかはIntellij
まかせ)
build.sbt
name := "spark-sample"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"
プログラムの本体
SimpleApp
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "/path/to/logs/test.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile) // sparkからログの取得
val counts = logData.filter(_.nonEmpty) // 空の行は集計しない
.map(line => line .split("\t")(1)) // タブ区切りの1つ目をkeyにする(user)
.map(word => (word, 1)) // keyに対してvalueを1にする
.reduceByKey(_ + _) // valueを足す集計処理
// valueの値が高い順にソート
val sortCount = counts.collect.toSeq.sortBy(_._2).toSeq.reverse
// 出力
for (i <- sortCount.take(30))
println(i)
// 集計した値をさらにsumというkeyで集計(ユニーク数となる)
val countSum = counts.map(word => ("sum", 1))
.reduceByKey(_ + _)
for (i <- countSum.take(10))
println("unique_sum: " + i)
}
}
sbt package
でビルド。
その後,スパークで実際に動かしてみる。
$ /path/to/spark/bin/spark-submit --class "SimpleApp" target/scala-2.10/spark-sample_2.10-1.0.jar
最初にビルドしたspark
のディレクトリとtarget
以下は適宜修正。
集計結果
xxxxx
(user3,3)
(user1,2)
(user2,1)
xxxx
unique_sum: (sum,3)
まとめ
-
scala
が初心者のためそこがつらかった。 - 今後はより実践的なログ集計のプログラムを作る。
- 突っ込み,要望,感想募集中ー。