はじめに
auditdで出力されるログの形式が分かりづらくログ構造も中途半端なため、どうにか扱いやすいようにする方法がないかと調べていたのですが、なかなか情報が見つからなかったので、備忘として記事にします。
結果として、auditd関連コマンドの機能としてはJSONでの出力・変換の提供されていないようだが、Pythonでauditdのログを読み込むためのモジュール(auparse)はあるようなので、これとjsonモジュールを使用して、JSONに書き出すことができました。
環境
- Alma Linux 8.10
- Python 3.6
- auditd 3.1.2
準備
auparseモジュールはrpmで提供されているモジュール(python3-audit)に含まれているので、rpmコマンド等から導入されていることを確認する。
# rpm -qa | grep audit
python3-audit-3.1.2-1.el8.x86_64
Alma Linux 8ではデフォルトで導入されていたが、入っていない場合は下記で導入できる。
# dnf install python3-audit
導入確認
Pythonのインタープリターを起動して、auparse
モジュールがインポートできれば問題ない。
$ python3
>>> import auparse
>>>
>>> # helpでモジュールの使い方も参照できる
>>> help(auparse)
変換方法
全量変換
引数でauditdログを指定して、標準出力に変換したJSONのログを書き出すスクリプトを作成してみました。
JSONはauditdの生ログに近い形式となるように、auditログの1レコードごとにJSONオブジェクト化して改行区切りで出力(いわゆるjsonl, JSON-Lines形式)としました。
import os
import sys
import json
import auparse
# 引数のファイルを更新日時順に並び替えておく
auditd_log_files = sorted(sys.argv[1:], key=os.path.getmtime)
aup = auparse.AuParser(
source_type=auparse.AUSOURCE_FILE_ARRAY,
source=auditd_log_files
)
aup.first_record()
# event毎の処理
while True:
# eventのタイムスタンプを取得
ts = aup.get_timestamp()
# record毎の処理
while True:
# JSON変換用の辞書objectを用意
record = {
"time": float("%d.%03d" % (ts.sec, ts.milli)),
"serial": int(ts.serial)
}
# field毎の処理
while True:
# fieldの情報を格納
record[aup.get_field_name()] = aup.interpret_field()
# 次のfieldへ移動、なければ抜ける
if not aup.next_field():
break
# JSON 1個分を書き出し
print(json.dumps(record))
# 次のrecordへ移動、なければ抜ける
if not aup.next_record():
break
# 次のeventへ移動、なければ抜ける
if not aup.parse_next_event():
break
使用イメージ
# python3 auditlog_to_json.py /var/log/audit/audit.log*
{"time": 1725890106.334, "serial": 389, "type": "EXECVE", "argc": "4", "a0": "python3", "a1": "auditlog_to_json.py", "a2": "/var/log/audit/audit.log", "a3": "/var/log/audit/audit.log.1"}
{"time": 1725890106.334, "serial": 389, "type": "PROCTITLE", "proctitle": "python3 auditlog_to_json.py /var/log/audit/audit.log /var/log/audit/audit.log.1"}
出力フォーマットについて
上記コードで出力するJSONは、以下のような形式としている。
項目 | 型 | 内容 |
---|---|---|
time | 数値 | auditdが記録したeventの記録時間 (UNIX time, ミリ秒) |
serial | 数値 | auditdが記録したeventの番号 |
上記以外 | 文字列 | 各フィールドの項目と値 |
JSONオブジェクト化する辞書オブジェクトの組み立て方と、出力タイミングを工夫すれば、event毎に1オブジェクトとしたり、全体を1個のJSONのオブジェクトとしたりもできそう。
フィールドの生の値が欲しい場合
各フィールドの値の取り出しにinterpret_field()
を使用しているが、この関数はUID, GID等が数値ではなく、実際の文字列の名前になる。
生の値が欲しい場合は、値の取り出しにget_field_str()
を使う必要がある。
生の値のみ問題ないという場合には上記コードのinterpret_field()
をget_field_str()
に変えればよいが、全量を生の値で取り出すと、不便になる項目もあるため、fieldの種類の応じて必要に応じて切り替えるようにするなどの工夫が必要になる。
生の値と文字列化した値が異なる場合に、単純にfield名に_rawのsuffixつけたものを付与して両方をJSONに書き出すというだけであれば、field毎の処理の部分を以下のような感じにすればログ量は大分増えてしまうものの、実現はできる。
# field毎の処理
while True:
field_name = aup.get_field_name()
field_value = aup.interpret_field()
field_value_raw = aup.get_field_str()
record[field_name] = field_value
if field_value != field_value_raw:
record["%s_raw" % field_name] = field_value_raw
if not aup.next_field():
break
さいごに
今回はJSONの中身の項目がすべて文字列型だったりと、JSONとしていけていない部分もあるものの、各種プログラミング言語のライブラリやjq
などのコマンドで扱えるようになるため、とりあえずログの取り回しはし易くなりました。
参考