はじめに
自分用メモも兼ねて、デジタルアーツ社のWebプロキシ製品i-Filterのアクセスログをセキュリティログ分析のために正規化する方法を投稿します^^
利用した環境
product | version |
---|---|
i-Filter | 9.00 R01 |
Logstash | 6.5.4 |
Java JDK | 1.8.0_191 |
OS(EC2) | Amazon Linux2 |
前提
i-FilterのアクセスログをLogstashがインストールされているEC2の/var/log/proxy/ifilter
配下にファイル(access.log*)として配置しています。
translate filter plugin
はデフォルトでインストールされていません。
$ sudo /opt/logstash/bin/plugin install logstash-filter-translate
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one,
then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Validating logstash-filter-translate
Installing logstash-filter-translate
Installation successful
でインストールします。
/usr/share/logstash/Gemfile
にインストールされているLogstash Pluginsが記載されています。
オンラインで追加pluginが導入出来ないオフライン環境でのインストールでは少し手間が掛かります(泣)
【参考URL】
・Offline Plugin Management
i-FilterのURLアクセスログのフォーマット
1行に1件のログを出力します。ログは、スペース区切りのTSV形式のログフォーマットとなります。
【ログフォーマット】
{ログ出力時刻(月)} {ログ出力時刻(日)} {ログ出力時刻(時刻)} {Proxyプロセス番号} {i-FILTERバージョン} {接続先サーバーのIPアドレス:ポート番号} {クライアントIPアドレス} {コンピューター名} {認証ユーザー名} [{アクセス開始日時} {タイムゾーン}] {HTTPレスポンスコード} {HTTPレスポンスサイズ} {HTTPリクエストサイズ} {フィルターアクション} {オブジェクトID} {フィルター理由番号} {URLカテゴリリスト} {WebサービスID} {WebサービスサブID} {グループ名} {ウイルス名} {SSLパラメーター暗号化} {POST時のファイル情報} "{リクエスト情報(HTTPメソッド)} {リクエスト情報(URL)} {リクエスト情報(HTTPバージョン)}" {HTTPリファラー} {HTTP User-Agent} {HTTP Content-Type} {チェックサム}
【ログサンプル】
Apr 1 15:09:09 18051 900/01/ 192.168.1.91:8000 192.168.1.24 - username [1/Apr/2014:15:09:08 +0900] 503 0 385 block 9 51 51 -1 -1 default - 0 - "GET http://www.abcd.local HTTP/1.0" http://www.abcd.local/search Mozilla%2F5.0 text%2Fhtml%3B+charset%3Dutf-8 63EBCF4D
Logstashとは
Elasticsearchを開発しオープンソースとして提供しているElastic社のETLツールになります。
日本国内ではFluentdが有名で利用者も多いですが、グローバルではLogstashを利用している人は8割を超えています。
実施内容
アクセスログの読み込み処理は大まかに次のような内容となります。
- アクセスログを読み込む (file input)
- 読み取ったアクセスログを各フィールドに分割 (csv filter)
- HTTPリクエストフィールドのさらなる分割 (grok filter)
- フィールドタイプの変更 (mutate filter)
- URLカテゴリ名の日本語化 (translate filter)
- タイムスタンプの正規化 (grok filter & date filter)
- ユーザエージェントの正規化 (useragent filter)
- 重複排除フィールドの生成 (fingerprint filter)
- 不要なフィールドの削除 (mutate filter)
- Elasticsearchへの出力 (elasticsearch output)
設定ファイルは次のようなものになります。
input {
file {
path => "/var/log/proxy/ifilter/access.log*"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
}
}
filter {
csv {
columns =>
[
"month",
"day",
"time",
"proxy_process_id",
"i-filter_version",
"top_proxy_address",
"client_ip",
"computer_name",
"username",
"access_date",
"timezone",
"http_response_code",
"http_response_size",
"http_request_size",
"filter_action",
"object_id",
"filter_cause_id",
"url_category_list",
"web_service_id",
"web_service_sub_id",
"group_name",
"virus_name",
"ssl_paramater_encryption",
"post_file_info",
"http_request_info",
"http_referrer",
"user_agent",
"content_type",
"checksum"
]
separator => " "
}
grok {
match => {
"http_request_info" => "%{WORD:http_method} (?<proto>\w+)://(?<fqdn>[^:]+):?(?<port>\d+)? %{NOTSPACE:http_version}"
}
tag_on_failure => [ "_http_request_info_parse_failure" ]
remove_field => [ "http_request_info" ]
}
mutate {
convert => {
"http_request_size" => "float"
"http_response_size" => "float"
}
}
translate {
field => "url_category_list"
destination => "url_category_list"
dictionary_path => "/etc/logstash/translate/proxy_url_categoryid.yml"
override => "true"
}
grok {
match => {
"access_date" => "\[%{NOTSPACE:timestamp}"
}
tag_on_failure => ["_access_date_parse_failure"]
remove_field => [ "access_date" ]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss", "d/MMM/yyyy:HH:mm:ss" ]
locale => "en"
timezone => "Asia/Tokyo"
remove_field => [ "timestamp" ]
}
useragent {
source => "user_agent"
target => "useragent"
}
fingerprint {
source => "message"
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
mutate {
remove_field =>
[
"month",
"day",
"time",
"proxy_process_id",
"i-filter_version",
"top_proxy_address",
"computer_name",
"timezone",
"web_service_id",
"web_service_sub_id",
"virus_name",
"ssl_paramater_encryption",
"user_agent",
"content_type",
"checksum",
"message"
]
}
}
output {
elasticsearch {
hosts => [ "ElasticsearchのIPアドレス:9200" ]
document_id => "%{[@metadata][fingerprint]}"
index => "ifilter-%{+YYYY.MM.dd}"
template_name => "ifilter"
}
}
i-Filterのアクセスログのサンプルを利用すると以下のように正規化されます。
{
"group_name" => "default",
"host" => "<EC2のホスト名>",
"object_id" => "9",
"filter_cause_id" => "51",
"http_method" => "GET",
"http_version" => "HTTP/1.0",
"http_response_size" => 0.0,
"path" => "/var/log/proxy/ifilter/access.log000001",
"client_ip" => "192.168.1.24",
"@version" => "1",
"post_file_info" => "-",
"http_request_size" => 385.0,
"@timestamp" => 2014-04-01T15:09:08.000Z,
"proto" => "http",
"http_response_code" => "503",
"username" => "username",
"useragent" => {
"device" => "Other",
"os" => "Other",
"os_name" => "Other",
"name" => "Other",
"build" => ""
},
"filter_action" => "block",
"http_referrer" => "http://www.abcd.local/search",
"fqdn" => "www.abcd.local",
"url_category_list" => "コンピューターサプライ"
}
1. アクセスログを読み込む (file input)
今回は/var/log/proxy/ifilter
配下に置かれたアクセスログのファイルを取り込みます。
ファイルに書かれたデータをLogstashで取り込む場合はfile input
を用いて取り込みます。
-
path
で取り込む対象ファイルのパスを指定します。 -
start_position
はLogstashプロセスが起動後にファイルのどこからログを読み込むかを指定します。 -
sincedb_path
は明示的にsincedbファイル(offset値)のパスを指定していますが、省略可です。
input {
file {
path => "/var/log/proxy/ifilter/access.log*"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb"
}
}
start_position
でファイルの先頭から常にログを読み取りますが、sincedbでどこまで読み込んだか記憶しているため、ログデータを重複して取り込むことなくLogstashを再起動することが可能です。
2. 読み取ったアクセスログを各フィールドに分割 (csv filter)
i-FliterのアクセスログはTSVフォーマットで区切り値は半角スペースが利用されています。
アクセス内容に関わらずフィールドの数は固定です。
-
columns
でログの先頭から順番にフィールド名を指定します。何も指定しないとcolumn1
から順番に通し番号のフィールド名となります。 -
separator
でフィールド間の区切り値を指定します。(今回は半角スペースを指定しています)
filter {
csv {
columns =>
[
"month",
"day",
"time",
"proxy_process_id",
"i-filter_version",
"top_proxy_address",
"client_ip",
"computer_name",
"username",
"access_date",
"timezone",
"http_response_code",
"http_response_size",
"http_request_size",
"filter_action",
"object_id",
"filter_cause_id",
"url_category_list",
"web_service_id",
"web_service_sub_id",
"group_name",
"virus_name",
"ssl_paramater_encryption",
"post_file_info",
"http_request_info",
"http_referrer",
"user_agent",
"content_type",
"checksum"
]
separator => " "
}
3. HTTPリクエストフィールドのさらなる分割 (grok filter)
csv filter
でフィールド分割した状態ではhttp_request_info
という1つのフィールドにHTTPメソッド、URL、HTTPバージョンがvalueとして含まれています。それぞれの項目をフィールド化することでHTTPメソッドによる分析等が可能となります。
http_request_info
のvauleをうまくマッチさせるには正規表現を駆使する必要があります。その場合にgrok filter
を活用します。
-
match
でhttp_request_info
フィールドに含まれるvalueをWORD:http_method
という記述方法でWORDパターンにマッチした文字列をhttp_method
というフィールド名に指定しています。 - URL部分の
http
またはhttps
をproto
というフィールド名に指定しています。 -
://
以降をfqdn
というフィールド名に指定し、ポート番号がある場合は追加でport
というフィールド名に指定しています。 -
NOTSPACE:http_version
という記述方法でNOTSPACEパターンにマッチした文字列をhttp_version
というフィールド名に指定しています。 -
tag_on_failure
はパターンにマッチせずに正規化に失敗した場合に_http_request_info_parse_failure
というタグを付与します。(どこでGrokが失敗したか判断し易くするためです) -
remove_field
で分解前の分析に不要なフィールド(今回はhttp_request_info
)を削除します。
grok {
match => {
"http_request_info" => "%{WORD:http_method} (?<proto>\w+)://(?<fqdn>[^:]+):?(?<port>\d+)? %{NOTSPACE:http_version}"
}
tag_on_failure => [ "_http_request_info_parse_failure" ]
remove_field => [ "http_request_info" ]
}
4. フィールドタイプの変更 (mutate filter)
csv filter
でフィールド分割した状態ではhttp_request_size
やhttp_response_size
のvalueが数値ではあるもののstring
のデータ型となっています。異常なトラフィック量の通信を見つけたい場合、これらのフィールドは数値データとして扱いたいものです。
※ip
やbyte
等、mutate filterでは変換出来ないデータ型もあります。(2019/2/24追記)
※string
、integer
、float
、boolean
、integer_eu
、float_eu
のみ変換可能です。(2019/2/24追記)
上記以外の変換の場合、Index Template
でデータ型を定義することで変換が可能です。
-
convert
で変換したいフィールド名と変換後のデータ型(今回はfloat)を指定します。
mutate {
convert => {
"http_request_size" => "float"
"http_response_size" => "float"
}
}
【参考URL】
・データ型:数値
・Index Templates
5. URLカテゴリ名の日本語化 (translate filter)
ログデータをElasticsearchを取り込みKibanaから分析に利用する場合、URLカテゴリ名が区分値データであると一体何に違反してURLフィルタに引っかかったのかがアナリストからは分かりにくいという問題があります。
そこでtranslate filter
を利用して、予め作成したyaml形式のカテゴリリストとの突合をして区分値データを日本語のカテゴリ名に置換します。
-
field
で置換したい区分値データの入っているフィールド名を指定します。 - 上書きで置換する場合は
destination
でfield
と同じ名前を入力し、override
でtrue
とします。 -
dictionary_path
に作成したURLカテゴリリストのファイルパスを指定します。
translate {
field => "url_category_list"
destination => "url_category_list"
dictionary_path => "/etc/logstash/translate/proxy_url_categoryid.yml"
override => "true"
}
今回はリストの件数が多いため、以下のファイルを事前に作成して突合する方式としています。
「i-FILTER」Ver.9 カテゴリパラメーター一覧も参考にして分析し易いリストを作成してみてください。
"33" ポルノ_アダルトサイト
"34" ヌード_アダルトグッズ
"35" グラビア_写真集
"36" 性教育_性の話
"37" 暴力_猟奇描写
"38" 犯罪_武器凶器
"39" 麻薬_薬品薬物
"40" カルト_テロリズム
"41" ハッキング_クラッキング
"42" 不正プログラム配布_リンク集
"43" 違法ソフト_反社会行為
"44" フィッシング詐欺
"45" クラッシャーサイト
"46" ギャンブル
"47" 懸賞_くじ
"48" アルコール_タバコ
"49" 宗教
"50" 求人情報
"51" コンピューターサプライ
"52" オフィスサプライ
"53" 消費者金融_個人ローン
"54" 不動産
"55" 結婚相談_斡旋
"56" 出会い
"57" Webメール
"58" 自殺
"59" 家出
"60" チャット
"61" メールマガジン
"62" 会員向け掲示板
"63" ソーシャルブックマーク
"64" IT情報掲示板
"65" 芸能
"66" 映画_演劇
"67" 音楽
"68" TV_ラジオ
"69" 漫画_アニメ
"70" ゲーム
"71" スポーツ
"72" 占い_超常現象
"73" 電子書籍_小説投稿サイト
"74" 旅行_観光
"75" アミューズメント施設
"76" 旅客鉄道
"77" グルメ
"78" ライブカメラ
"79" オンライントレード
"80" インターネットバンキング
"81" 金融_投資情報
"82" 保険商品
"83" オンラインストレージ
"84" 法人向けオンラインストレージ
"85" 動画配信
"86" 音楽配信
"87" オークション
"88" オンラインショッピング
"89" ポイントサービス
"90" クーポン総合サイト
"91" クレジットカード_オンライン決済_電子マネー
"92" 誹謗_中傷
"93" 主張
"94" いたずら
"95" ニュース
"96" 画像_動画検索エンジン
"97" 地図_位置情報
"98" 掲示板
"99" SNS
"100" ブログ
"101" タウン情報
"102" メッセンジャー
"103" インターネット電話
"104" P2Pファイル共有
"105" スケジューラ
"106" リモートアクセス
"107" オンライン会議
"108" グループウェア
"109" RSSリーダー
"110" Webアプリケーション
"111" ビジネス向けWebアプリケーション
"112" 総合ソフトウェアダウンロード
"113" 趣味_同好
"114" ポータル
"115" 検索エンジン
"116" アップローダー
"117" 迷惑メールリンク
"118" 製品サポート_修正プログラム
"119" Web翻訳・URL変換
"120" テキスト翻訳
"121" プロキシ情報
"122" プロバイダー
"123" ホスティング
"124" ダイナミックDNS
"125" 広告_バナー
"126" 政府_自治体
"127" 軍事_防衛関連
"128" 政治_政治家
"129" 学校_教育施設
"130" 上場企業
"131" 緊急
"132" 特殊
"152" 脅威情報サイト
translate filter
では各LogstashマシンのOS上にリストファイルを保持してマッチさせる必要があります。ファイルに対するアクセスになりますので、ディスクに対するReadIOが性能に影響します。
そのような問題を解消したい場合にはRDBのテーブルにリストを格納し、そのデータをjdbc_streaming filter
を用いて解消することが可能です。RDBに対して、取得したデータを各Logstashのメモリ上に保持し、定期的に更新することが可能です。
6. タイムスタンプの正規化 (grok filter & date filter)
Logstashはデフォルトではログを取り込んだ時刻を@timestamp
として刻印する仕様となっています。
生のログに記載された時刻を分析に利用したい場合は@timestamp
に置き換えたい時刻フィールドをdate filter
で指定します。
利用したい時刻フィールドのaccess_date
に[
という邪魔な文字列があるため、grok filter
を利用して外します。
-
match
でaccess_date
フィールドに含まれるvalueを\(エスケープ文字)[
と以降の文字列をNOTSPACEパターンで指定してtimestamp
フィールド名に指定しています。 - 利用したい時刻フィールド(今回はtimestamp)と時刻フォーマット(複数指定可で順にマッチングします)を
match
で指定します。 - 時刻フィールドに
Apr
等の英語表記が出てくる場合はlocale
でen
と指定することで認識してくれます。 -
timezone
は取り込む時刻情報のタイムゾーンに合わせて指定することで複数の異なる時刻情報のログを取り込む場合に威力を発揮してくれます。 -
@timastamp
に置換後に元の時刻フィールド(access_date、timestamp)が不要であれば、remove_field
で削除します。
grok {
match => {
"access_date" => "\[%{NOTSPACE:timestamp}"
}
tag_on_failure => ["_access_date_parse_failure"]
remove_field => [ "access_date" ]
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss", "d/MMM/yyyy:HH:mm:ss" ]
locale => "en"
timezone => "Asia/Tokyo"
remove_field => [ "timestamp" ]
}
7. ユーザエージェントの正規化 (useragent filter)
外部からのサイバー攻撃により仕込まれたマルウェアがLAN内で動作している場合、マルウェアが外部のC&Cサーバと通信する時にユーザエージェント名に独自の文字列やエージェント名が空欄状態でWeb通信を行うことがあります。
ユーザエージェントをログから分析することで不正な通信の検知に利用することが出来るため、useragent filter
を活用します。
-
source
にcsv filter
で分割した時のユーザエージェントのフィールド名を指定します。 -
target
は元のユーザエージェントのフィールドを残したい場合に別フィールド名を指定することでフィールド生成することが出来ます。
useragent {
source => "user_agent"
target => "useragent"
}
利用アプリケーション名、OS名やバージョン等の情報を提供してくれます。
"useragent": {
"name": "Chrome",
"os": "Mac OS X 10.6.8",
"os_name": "Mac OS X",
"os_major": "10",
"os_minor": "6",
"device": "Other",
"major": "12",
"minor": "0",
"patch": "742"
}
8. 重複排除フィールドの生成 (fingerprint filter)
特定フィールド(複数指定可)の値をハッシュ化させるのがfingerprint filter
になります。
ログの取得タイミングによって誤って同じログを重複して取得してしまう場合や一度取り込んだログの部分欠損があって再度取り込み直すような場合に便利です。
セキュリティユースケースの場合、ログの重複や欠損は致命的ですが、環境や取り込み方によってはそのようなシチュエーションもあろうかと思います。
-
source
にハッシュ化したいフィールドを指定します。同じmessageのログは基本的にはない前提としてmessageを指定しています。十分ではない場合は別のフィールドと組み合わせることも可能です。 -
target
にはハッシュ化された値を格納するフィールドを指定します。[@metadata]をフィールド名の先頭に付加するとElasticsearchにIndexされないフィールドにすることが出来ます。 -
method
はハッシュ化に利用するハッシュ関数を指定します。ハッシュ化処理はCPUリソースをそれなりに消費するため、比較的軽めな関数を指定してみました。
fingerprint {
source => "message"
target => "[@metadata][fingerprint]"
method => "MURMUR3"
}
【参考URL】
・ハッシュ関数ベンチマーク
・Logstashで二重に登録しないために
9. 不要なフィールドの削除 (mutate filter)
csv filterで分割したフィールドの中でログの分析に明らかに不要だと思われるフィールドを削除します。
-
mutate filter
のremove_field
で不要なフィールド名(複数フィールドの場合はカンマ区切り)を指定します。
mutate {
remove_field =>
[
"month",
"day",
"time",
"proxy_process_id",
"i-filter_version",
"top_proxy_address",
"computer_name",
"timezone",
"web_service_id",
"web_service_sub_id",
"virus_name",
"ssl_paramater_encryption",
"content_type",
"checksum",
"message"
]
}
特に分割前の元となっているmessage
を削除することでElasticsearchに取り込まれたログの容量は半分近く削減されます。ログをコスト効率よく長期的に保管するには、なるべくログを軽くしておくことをお勧めします。
新たな攻撃手法が発見されるとこれまで目にも留めていなかったフィールドが必要になることがあります。
あとからログを取り込み直すことは一苦労です。何でもかんでも削除すると痛い目に合いますので、ある程度見極めは必要です。
10. Elasticsearchへの出力 (elasticsearch output)
最後に正規化したアクセスログの出力先として、Elasticsearchにoutputします。
-
host
はElasticsearchのIPアドレスまたはホスト名、:
でポート番号(デフォルト9200)を指定します。 -
document_id
は取り込むログが重複しないように生成したハッシュ値フィールド([@metadata][fingerprint])を指定します。同じmessageにならない限り同じIDは生成されないはずです。 -
index
はデフォルトだとlogstash-%{+YYYY.MM.dd}"となりますが、ログ形式ごとに分けるために指定しています。しかしElasticsearchはUTC時刻でしか認識しないため、日本で利用すると日次でのIndex切り替えがAM9時になるという課題があります(泣) -
template_name
はIndex Settings(スキーマ定義)をテンプレート化している場合に指定します。(必須ではありません)
output {
elasticsearch {
hosts => [ "ElasticsearchのIPアドレス:9200" ]
index => "ifilter-%{+YYYY.MM.dd}"
document_id => "%{[@metadata][fingerprint]}"
template_name => "ifilter"
}
}
プロキシログの分析観点
正規化したログから以下のような観点で不正な通信(サイバー攻撃や内部不正による情報漏洩)の監査に利用することが出来ます。
- URLからC&Cサーバとの不正な外部向け通信をチェック
- ユーザエージェントのアプリケーション名やOS名からマルウェアによる不正な外部向け通信をチェック
- フィルターアクションがBlockでURLカテゴリリストに脅威情報サイトからC&Cサーバとの不正な外部向け通信をチェック
- HTTPメソッドがPOSTでHTTPリクエストサイズが非常に大きな通信をチェック(外部ストレージ向けに大きなデータ送信等)
- 上記を検出した場合の調査にクライアントIPアドレスや認証ユーザー名を利用して端末を特定
おわりに
いかがでしたでしょうか。
2015年以降、標的型攻撃で狙われることが他人事ではない企業ネットワークにおいて
社内LANにWebプロキシが設置されることがサイバーセキュリティ対策の基本となってきています。
そんなWebプロキシ製品の中でも国産メーカーのデジタルアーツ社による日本のWeb利用状況に合わせて用意されたURLフィルタであるi-Filterを利用中のユーザも多いと思います。
またクラウドサービス利用が急速に増加しているため、アクセス数に比例してアクセスログの量も数GB/日では収まらないケースもあり、不正な通信を分析する環境を整えることは急務となっています。
そんな同じ悩みを持つ方も多いのではないかと思い、なにかヒントになるナレッジとなればと^^:
不明な点、誤植などありましたら、コメントをお願いします。