Overview
タイトル通りです.
具体的にはEvtxtoElkを使用してimportします.
# dockerでelasticsearch, kibanaを建てる
$ docker run -p 9200:9200 -p 9300:9300 -p 5601:5601 sebp/elk
# ライブラリのインストール
$ pip install evtxtoelk
from evtxtoelk import EvtxToElk
EvtxToElk.evtx_to_elk('example.evtx', 'http://localhost:9200')
2019/10/06 追記:
100倍早くしました
Windows EventLogをElasticSearchにimportする(のを100倍早くする)
Description
DFIRの際に必須なイベントログ調査ですが, 僕はざっくりデータを眺めるときは log2timeline/plaso でcsvに変換して ripgrep で検索を行っていました.
目で見るだけならこれでも十分なのですが, プログラムである程度自動化をするにあたって, pandasをデータベース的に使ったり, SQLiteで検索をするのはパフォーマンスが非常に悪いです.
できれば最低でも1クエリ0.1秒以内で帰ってきてほしいですが, 巨大なイベントログの検索にはSQLiteでも2秒程度かかってしまいました.
やっぱ餅は餅屋, ということでビッグデータ解析に適しているElasticSearchさんで試したところ0.02秒程度といい感じです! ripgrep並の速度を手に入れました!
インデックスにやたら時間がかかるのが難点ですが, プログラム回してほっとけばそのうち終わるので検索のたびにイライラするより遥かに素晴らしいです.
EvtxtoElkの詳しい使い方は作者のブログに書いてあったり書いてなかったりするので読んでください.
ここではブログに書いてなかった(たぶん)実行オプション
についても書いておきます.
ちなみにevtx自体は, AccessData - FTK Imagerなりlog2timeline/plasoを利用して抜き出すと良いと思います.
Example
ぼくは解析したいファイルが複数あったのでこんな感じのスクリプトを書いて実行しました
イベントログのディレクトリ構成
data/
├── server01/
│ └── Security.evtx
│
├── server02/
│ └── Security.evtx
│
└── server03/
└── Security.evtx
スクリプト(import_evtx.py)
# coding: utf-8
import sys
from pathlib import Path
from evtxtoelk import EvtxToElk
def main():
evtx_files = Path(sys.argv[1]).glob('**/*.evtx')
for evtx in evtx_files:
print(f"index: {evtx}")
print(f"index-name: {evtx.parent.stem}\n")
EvtxToElk.evtx_to_elk(evtx, 'http://localhost:9200', elk_index=evtx.parent.stem, bulk_queue_len_threshold=3000)
if __name__ == '__main__':
main()
オプション解説
elk_index
インデックスの名前
デフォルトだとhostlogs
が使われるようになっていました.
今回はどのサーバのログかわかるようにしたいのでevtxが入っているディレクトリの名前をindex名にするよう変更
bulk_queue_len_threshold
bulkinsertのバッファサイズ
デフォルトは500
です
いじんなくてもそこまで速度変わらないらしいですが, 標準出力がうるさくなるので今回は3000としました.
実行コマンド
$ python import_evtx.py data