LoginSignup
3
5

More than 5 years have passed since last update.

Apache Sparkを使ってWindowsファイルサーバーの監査ログをちょこっとだけ分析してみる

Last updated at Posted at 2016-03-27

最近はやりのApache Spark。この流れに乗らなければならないと思い、Windowsファイルサーバーの監査ログをSparkでちょこっとだけ分析してみましたので共有します。と言ってもSparkで分析できるようにするためには、(別にSparkに限った話じゃないけれど)データのクレンジングが必要なので、今回はその手順も含めて一からやってみました。

データのクレンジング

監査ログをいきなりSparkに投入することはできません。今回は evtx 形式の監査ログから必要な項目だけ取り出し、JSON 形式に落とすことを最初に行います。
evtx 形式からデータを抽出する方法はいくつかありますが、ここでは PowerShell を使ってやる方法を提示します。

前提事項

項目 対象
監査ログのファイル名 Archive-Security-yyyy-mm-dd-HH-MM-SS-xxx.evtx
抽出対象イベントID 4656(ファイルのオープン)、4663(ファイルアクセス)、4690(ファイルコピー)
抽出する内容 タイムスタンプ(time)、ユーザー名(user)、フォルダ・ファイル名(target)、操作内容(ope)

分析対象イベントIDは、他のイベントID、例えば5145(詳細なファイル共有)でも構いません。これらのイベントIDからどんな情報が取得できるのか、どんなときにログに書き込まれるのか確認し、必要に応じてスクリプトを書き換えてください。

抽出するスクリプト

指定したディレクトリに決まったサイズで作成された監査ログが複数含まれていることを想定しています。

# filename ExtractDataIntoJson.ps1
# usage: powershell .¥ExtractDataIntoJson.ps1 TARGET_FOLDER
Param([String] $dir)
Get-ChildItem $dir | Sort-Object Name | ForEach-Object {
    $file = $_
    $file -match "Archive-Security-(2016-..-..)-.+.evtx"
    $date = $matches[1]
    $filename = $dir + "\" + $file
    $accessfile = "D:\results_access." + $date + ".json"
    $copyfile = "D:\results_copy." + $date + ".json"

    Get-WinEvent -FilterHashTable @{Path=$filename; ID=4656,4663,4690} | ForEach-Object {
        $xml = [Xml]$_.ToXml()
        $id = $xml.Event.System.EventID
        $t = [DateTime]$xml.Event.System.TimeCreated.SystemTime
        $user = $xml.Event.EventData.Data[1]."#text"
        $target = $xml.Event.EventData.Data[6]."#text" -replace "\\","/"
        if ($id -eq 4663) {
            $ope = $xml.Event.EventData.Data[8]."#text".TrimStart().TrimEnd() -replace "%",""
            Write-Output "{`"time`":`"$t`",`"eventid`":`"$id`",`"sid`":`"$sid`",`"user`":`"$user`",`"target`":`"$target`",`"ope`":[$ope]}" | Out-File $accessfile -Append -encoding UTF8
        }
        if ($id -eq 4656) {
            $ope = $xml.Event.EventData.Data[9]."#text".TrimStart().TrimEnd() -replace "\n","," -replace "\t","" -replace "%","" -replace "\s",""
            Write-Output "{`"time`":`"$t`",`"eventid`":`"$id`",`"sid`":`"$sid`",`"user`":`"$user`",`"target`":`"$target`",`"ope`":[$ope]}" | Out-File $accessfile -Append -encoding UTF8
        }
        if ($id -eq 4690) {
            Write-Output "{`"time`":`"$t`",`"eventid`":`"$id`",`"user`":`"$user`"}" | Out-File $copyfile -Append -encoding UTF8
        }
    }
}

これで監査ログから下記のようなJSON形式でデータを抽出することができました。

{"time":"01/30/2016 01:33:05","eventid":"4663","user":"userA","target":"E:/WORK/xxxx/xxxx/xxxx/","ope":[1538]}
{"time":"01/30/2016 01:33:05","eventid":"4663","user":"userB","target":"E:/WORK/xxxx/xxxx/yyyy","ope":[1538,1541,4416,4419,4423]}

なお、上記スクリプトは監査ログファイルの大きさにもよりますが、時間がかなりかかります。
のんびりやってられない方は LogParser 等他の方法を検討したほうがいいでしょう。PowerShellの何倍も早く処理が終わります。

ちょこっと分析

それではちょこっとだけ分析してみましょう。今回は監査ログからユニークユーザー数とログに記録されたユーザー別のアクセス数、コピー数、それとファイルごとのアクセス数を出してみます。なお、アクセス数・コピー数は単純にログに記録された項目をカウントしています。監査ログにはユーザーの1アクションでも大量にログが記録されるため、1アクション=1アクセスには結びつきません。この分析では、正確な数字ではなくあくまで傾向を見るものと捉える必要があります。

分析の前に

Sparkをインストールしましょう。インストール手順については、ググればたくさんでてきますので割愛します。
今回、分析は Scala で書いています。といっても簡単な処理しか書いてませんので特に難しくないと思います。また、Sparkは shell で実行ではなく、Scalaのプログラムをコンパイルし、spark-submit で実行する方法をとっています。

分析するプログラム

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import java.io.PrintWriter
import java.io.File

object MyApp{
  def getListOfFiles(dir: String):List[File] = {
    val d = new File(dir)
    if (d.exists && d.isDirectory) {
      d.listFiles.filter(_.isFile).toList.filter(_.getName().endsWith("json"))
    } else {
      List[File]()
    }
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyApp").set("spark.driver.maxResultSize", "2g")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val files: List[File] = getListOfFiles(args(0))
    files.map{ f =>
      val df = sqlContext.read.json(f.toString).cache

      if (df.count != 0) {
        val ufile = new PrintWriter(f.toString().replaceAll(".json","_result_user.csv"))
        // ユニークユーザー数
        // select count(distinct user) from json_item
        val uniq_user = df.select("user").distinct.count
        ufile.write(s"$uniq_user\n")
        ufile.write("\n")

        // ユーザー別アクセス数計算
        ufile.write("access count\n")
        // select user, count(1) as value from json_item where eventid = 4663 group by user order by value desc
        val users_access = df.filter("eventid=4663").groupBy("user").count().sort($"count".desc).collect()
        users_access.foreach({ i => var u: Any = i(0); var c: Any = i(1); ufile.write(s"$u,$c\n") })
        ufile.write("\n")

        // ユーザー別コピー件数計算
        ufile.write("copy count\n")
        // select user, count(1) as value from json_item where eventid = 4690 group by user order by value desc
        val users_copy = df.filter("eventid=4690").groupBy("user").count().sort($"count".desc).collect()
        users_copy.foreach({ i => var u: Any = i(0); var c: Any = i(1); ufile.write(s"$u,$c\n") })
        ufile.close()

        // ファイル数別アクセス数計算
        val ffile = new PrintWriter(f.toString().replaceAll(".json","_result_file.csv"))
        // select target, value from (select target, count(1) as value from json_item where array_contains(ope, 4416) and not array_contains(ope, 4423) group by target) as t where target rlike '.+[^/]+\\.[a-zA-Z^/]{3,4}$' order by value desc limit 100
        val file_count = df.select("target","ope").filter(array_contains($"ope", 4416)).filter(!array_contains($"ope", 4423)).filter($"target".rlike(".+[^/]+\\.[a-zA-Z^/]{3,4}$")).groupBy("target").count().sort($"count".desc).limit(100).collect()
        file_count.foreach({ i => var fl: Any = i(0); var c: Any = i(1); ffile.write(s"$fl,$c\n") })
        ffile.close()
      }
    }
  }
}

上記プログラムのコンパイルと実行方法

Scalaとsbtをインストールしておきましょう。
sbt でビルドする前にパッケージファイルを作成します。

name := "MyApp"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.0",
  "org.apache.spark" %% "spark-sql"  % "1.6.0"
)

上記 sbt ファイル、ソースファイルの配置は下記のとおりです。

MyApp [folder]
 |— myapp.sbt
 |— src [folder]
   |— main [folder]
     |— scala [folder]
       |— myapp.scala

ビルドは以下のようにします。
事前に MyApp [folder] にディレクトリを変更しておきます。

sbt package

上記でjarファイルが生成されます。
実行方法は下記のとおり。対象フォルダには、クレンジング時に生成されたJSONファイルが保存されているものとします。

spark-submit —class “MyApp” —master “local[*]”  —driver-memory 10G target/scala-2.10/myapp.jar 対象のフォルダ

注意:起動時に driver が利用できるメモリを10GBに指定しています。環境にあわせて変更しましょう。メモリ量によってDFがキャッシュに乗らない可能性があります。

注意

動作は確認していますが、保証するものではありません。実際お試しになる場合は正しく動作しない可能性もありますので、ご留意ください。
Scala、Spark初心者であるため、内容で間違っている部分等ありましたらご指摘いただけるとうれしいです。

参考

PowerShellのサイト、Scalaのサイト、Sparkの公式サイト、Stack Overflow、その他多くのみなさんのサイトを参考にさせていただきました。特にSparkの公式サイトは何度も何度も参照しました。情報公開してくれているみなさま方、ありがとうございました。

3
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
5